Coverage for src / CSET / recipes / __init__.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-12 08:38 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operations on recipes.""" 

16 

17import hashlib 

18import importlib.resources 

19import logging 

20from collections.abc import Iterator 

21from io import StringIO 

22from pathlib import Path 

23from typing import Any 

24 

25from ruamel.yaml import YAML 

26 

27from CSET._common import parse_recipe, slugify 

28from CSET.cset_workflow.lib.python.jinja_utils import get_models as get_models 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33def _recipe_files_in_tree( 

34 recipe_name: str | None = None, input_dir: Path | None = None 

35) -> Iterator[Path]: 

36 """Yield recipe file Paths matching the recipe name.""" 

37 if input_dir is None: 

38 input_dir = importlib.resources.files() 

39 for file in input_dir.iterdir(): 

40 logger.debug("Testing %s", file) 

41 if ( 

42 (recipe_name is None or recipe_name == file.name) 

43 and file.is_file() 

44 and file.suffix == ".yaml" 

45 ): 

46 yield file 

47 elif file.is_dir() and file.name[0] != "_": # Excludes __pycache__ 

48 yield from _recipe_files_in_tree(recipe_name, file) 

49 

50 

51def _get_recipe_file(recipe_name: str, input_dir: Path | None = None) -> Path: 

52 """Return a Path to the recipe file.""" 

53 if input_dir is None: 

54 input_dir = importlib.resources.files() 

55 file = input_dir / recipe_name 

56 logger.debug("Getting recipe: %s", file) 

57 if not file.is_file(): 

58 raise FileNotFoundError("Recipe file does not exist.", recipe_name) 

59 return file 

60 

61 

62def unpack_recipe(recipe_dir: Path, recipe_name: str) -> None: 

63 """ 

64 Unpacks recipes files into a directory, creating it if it doesn't exist. 

65 

66 Parameters 

67 ---------- 

68 recipe_dir: Path 

69 Path to a directory into which to unpack the recipe files. 

70 recipe_name: str 

71 Name of recipe to unpack. 

72 

73 Raises 

74 ------ 

75 FileExistsError 

76 If recipe_dir already exists, and is not a directory. 

77 

78 OSError 

79 If recipe_dir cannot be created, such as insufficient permissions, or 

80 lack of space. 

81 """ 

82 recipe_dir.mkdir(parents=True, exist_ok=True) 

83 output_file = recipe_dir / recipe_name 

84 logger.debug("Saving recipe to %s", output_file) 

85 if output_file.exists(): 

86 logger.debug("%s already exists in target directory, skipping.", recipe_name) 

87 return 

88 logger.info("Unpacking %s to %s", recipe_name, output_file) 

89 file = _get_recipe_file(next(_recipe_files_in_tree(recipe_name))) 

90 output_file.write_bytes(file.read_bytes()) 

91 

92 

93def list_available_recipes() -> None: 

94 """List available recipes to stdout.""" 

95 print("Available recipes:") 

96 for file in _recipe_files_in_tree(): 

97 print(f"\t{file.name}") 

98 

99 

100def detail_recipe(recipe_name: str) -> None: 

101 """Detail the recipe to stdout. 

102 

103 If multiple recipes match the given name they will all be displayed. 

104 

105 Parameters 

106 ---------- 

107 recipe_name: str 

108 Partial match for the recipe name. 

109 """ 

110 for file in _recipe_files_in_tree(recipe_name): 

111 with YAML(typ="safe", pure=True) as yaml: 

112 recipe = yaml.load(file) 

113 print(f"\n\t{file.name}\n\t{''.join('─' * len(file.name))}\n") 

114 print(recipe.get("description")) 

115 

116 

117class RawRecipe: 

118 """A recipe to be parbaked. 

119 

120 Parameters 

121 ---------- 

122 recipe: str 

123 Name of the recipe file. 

124 model_ids: int | list[int] 

125 Model IDs to set the input paths for. Matches the corresponding workflow 

126 model IDs. 

127 variables: dict[str, Any] aggregation: bool 

128 Recipe variables to be inserted into $VAR placeholders in the recipe. 

129 aggregation: bool 

130 Whether this is an aggregation recipe or just a single case. 

131 

132 Returns 

133 ------- 

134 RawRecipe 

135 """ 

136 

137 recipe: str 

138 model_ids: list[int] 

139 variables: dict[str, Any] 

140 aggregation: bool 

141 

142 def __init__( 

143 self, 

144 recipe: str, 

145 model_ids: int | list[int], 

146 variables: dict[str, Any], 

147 aggregation: bool, 

148 ) -> None: 

149 self.recipe = recipe 

150 self.model_ids = model_ids if isinstance(model_ids, list) else [model_ids] 

151 self.variables = variables 

152 self.aggregation = aggregation 

153 

154 def __str__(self) -> str: 

155 """Return str(self). 

156 

157 Examples 

158 -------- 

159 >>> print(raw_recipe) 

160 generic_surface_spatial_plot_sequence.yaml (model 1) 

161 VARNAME air_temperature 

162 MODEL_NAME Model A 

163 METHOD SEQ 

164 SUBAREA_TYPE None 

165 SUBAREA_EXTENT None 

166 """ 

167 recipe = self.recipe if self.recipe else "<unknown>" 

168 plural = "s" if len(self.model_ids) > 1 else "" 

169 ids = " ".join(str(m) for m in self.model_ids) 

170 aggregation = ", Aggregation" if self.aggregation else "" 

171 pad = max([0] + [len(k) for k in self.variables.keys()]) 

172 variables = "".join(f"\n\t{k:<{pad}} {v}" for k, v in self.variables.items()) 

173 return f"{recipe} (model{plural} {ids}{aggregation}){variables}" 

174 

175 def __eq__(self, value: object) -> bool: 

176 """Return self==value.""" 

177 if isinstance(value, self.__class__): 

178 return ( 

179 self.recipe == value.recipe 

180 and self.model_ids == value.model_ids 

181 and self.variables == value.variables 

182 and self.aggregation == value.aggregation 

183 ) 

184 return NotImplemented 

185 

186 def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: 

187 """Pre-process recipe to bake in all variables. 

188 

189 Parameters 

190 ---------- 

191 ROSE_DATAC: Path 

192 Workflow shared per-cycle data location. 

193 SHARE_DIR: Path 

194 Workflow shared data location. 

195 """ 

196 # Ready recipe file to disk. 

197 unpack_recipe(Path.cwd(), self.recipe) 

198 

199 # Collect configuration from environment. 

200 if self.aggregation: 

201 # Construct the location for the recipe. 

202 recipe_dir = ROSE_DATAC / "aggregation_recipes" 

203 # Construct the input data directories for the cycle. 

204 data_dirs = [ 

205 SHARE_DIR / f"cycle/*/data/{model_id}" for model_id in self.model_ids 

206 ] 

207 else: 

208 recipe_dir = ROSE_DATAC / "recipes" 

209 data_dirs = [ROSE_DATAC / f"data/{model_id}" for model_id in self.model_ids] 

210 

211 # Ensure recipe dir exists. 

212 recipe_dir.mkdir(parents=True, exist_ok=True) 

213 

214 # Add input paths to recipe variables. 

215 self.variables["INPUT_PATHS"] = data_dirs 

216 

217 # Parbake this recipe, saving into recipe_dir. 

218 recipe = parse_recipe(Path(self.recipe), self.variables) 

219 

220 # Serialise into memory, as we use the serialised value twice. 

221 with StringIO() as s: 

222 with YAML(pure=True, output=s) as yaml: 

223 yaml.dump(recipe) 

224 serialised_recipe = s.getvalue().encode() 

225 # Include shortened hash in filename to avoid collisions between recipes 

226 # with the same title. 

227 digest = hashlib.sha256(serialised_recipe).hexdigest() 

228 output_filename = recipe_dir / f"{slugify(recipe['title'])}_{digest[:12]}.yaml" 

229 with open(output_filename, "wb") as fp: 

230 fp.write(serialised_recipe) 

231 

232 

233class Config: 

234 """Namespace for easy access to configuration values. 

235 

236 A namespace for easy access to configuration values (via config.variable), 

237 where undefined attributes return an empty list. An empty list evaluates to 

238 False in boolean contexts and can be safely iterated over, so it acts as an 

239 effective unset value. 

240 

241 Parameters 

242 ---------- 

243 config: dict 

244 Configuration key-value pairs. 

245 

246 Example 

247 ------- 

248 >>> conf = Config({"key": "value"}) 

249 >>> conf.key 

250 'value' 

251 >>> conf.missing 

252 [] 

253 """ 

254 

255 d: dict 

256 

257 def __init__(self, config: dict) -> None: 

258 self.d = config 

259 

260 def __getattr__(self, name: str): 

261 """Return an empty list for missing names.""" 

262 return self.d.get(name, []) 

263 

264 def asdict(self) -> dict: 

265 """Return config as a dictionary.""" 

266 return self.d 

267 

268 

269def load_recipes(variables: dict[str, Any]) -> Iterator[RawRecipe]: 

270 """Load recipes enabled by configuration. 

271 

272 Recipes are loaded using all loaders (python modules) in CSET.loaders. Each 

273 of these loaders must define a function with the signature `load(conf: dict) 

274 -> Iterator[RawRecipe]`, which will be called with `variables`. 

275 

276 A minimal example can be found in `CSET.loaders.test`. 

277 

278 Parameters 

279 ---------- 

280 variables: dict[str, Any] 

281 Workflow configuration from ROSE_SUITE_VARIABLES. 

282 

283 Returns 

284 ------- 

285 Iterator[RawRecipe] 

286 Configured recipes. 

287 

288 Raises 

289 ------ 

290 AttributeError 

291 When a loader doesn't provide a `load` function. 

292 """ 

293 # Import here to avoid circular import. 

294 import CSET.loaders 

295 

296 config = Config(variables) 

297 for loader in CSET.loaders.__all__: 

298 logger.info("Loading recipes from %s", loader) 

299 module = getattr(CSET.loaders, loader) 

300 yield from module.load(config)