Coverage for src/CSET/recipes/__init__.py: 100%

104 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 15:17 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operations on recipes.""" 

16 

17import importlib.resources 

18import logging 

19import sys 

20from collections.abc import Iterator 

21from pathlib import Path 

22from typing import Any 

23 

24from ruamel.yaml import YAML 

25 

26from CSET._common import parse_recipe, slugify 

27from CSET.cset_workflow.lib.python.jinja_utils import get_models as get_models 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32def _version_agnostic_importlib_resources_file() -> Path: 

33 """Transitional wrapper to importlib.resources.files(). 

34 

35 Importlib behaviour changed in 3.12 to avoid circular dependencies. 

36 """ 

37 if sys.version_info.minor >= 12: 

38 input_dir = importlib.resources.files() 

39 else: 

40 import CSET.recipes 

41 

42 input_dir = importlib.resources.files(CSET.recipes) 

43 return input_dir 

44 

45 

46def _recipe_files_in_tree( 

47 recipe_name: str | None = None, input_dir: Path | None = None 

48) -> Iterator[Path]: 

49 """Yield recipe file Paths matching the recipe name.""" 

50 if input_dir is None: 

51 input_dir = _version_agnostic_importlib_resources_file() 

52 for file in input_dir.iterdir(): 

53 logger.debug("Testing %s", file) 

54 if ( 

55 (recipe_name is None or recipe_name == file.name) 

56 and file.is_file() 

57 and file.suffix == ".yaml" 

58 ): 

59 yield file 

60 elif file.is_dir() and file.name[0] != "_": # Excludes __pycache__ 

61 yield from _recipe_files_in_tree(recipe_name, file) 

62 

63 

64def _get_recipe_file(recipe_name: str, input_dir: Path | None = None) -> Path: 

65 """Return a Path to the recipe file.""" 

66 if input_dir is None: 

67 input_dir = _version_agnostic_importlib_resources_file() 

68 file = input_dir / recipe_name 

69 logger.debug("Getting recipe: %s", file) 

70 if not file.is_file(): 

71 raise FileNotFoundError("Recipe file does not exist.", recipe_name) 

72 return file 

73 

74 

75def unpack_recipe(recipe_dir: Path, recipe_name: str) -> None: 

76 """ 

77 Unpacks recipes files into a directory, creating it if it doesn't exist. 

78 

79 Parameters 

80 ---------- 

81 recipe_dir: Path 

82 Path to a directory into which to unpack the recipe files. 

83 recipe_name: str 

84 Name of recipe to unpack. 

85 

86 Raises 

87 ------ 

88 FileExistsError 

89 If recipe_dir already exists, and is not a directory. 

90 

91 OSError 

92 If recipe_dir cannot be created, such as insufficient permissions, or 

93 lack of space. 

94 """ 

95 recipe_dir.mkdir(parents=True, exist_ok=True) 

96 output_file = recipe_dir / recipe_name 

97 logger.debug("Saving recipe to %s", output_file) 

98 if output_file.exists(): 

99 logger.debug("%s already exists in target directory, skipping.", recipe_name) 

100 return 

101 logger.info("Unpacking %s to %s", recipe_name, output_file) 

102 file = _get_recipe_file(next(_recipe_files_in_tree(recipe_name))) 

103 output_file.write_bytes(file.read_bytes()) 

104 

105 

106def list_available_recipes() -> None: 

107 """List available recipes to stdout.""" 

108 print("Available recipes:") 

109 for file in _recipe_files_in_tree(): 

110 print(f"\t{file.name}") 

111 

112 

113def detail_recipe(recipe_name: str) -> None: 

114 """Detail the recipe to stdout. 

115 

116 If multiple recipes match the given name they will all be displayed. 

117 

118 Parameters 

119 ---------- 

120 recipe_name: str 

121 Partial match for the recipe name. 

122 """ 

123 for file in _recipe_files_in_tree(recipe_name): 

124 with YAML(typ="safe", pure=True) as yaml: 

125 recipe = yaml.load(file) 

126 print(f"\n\t{file.name}\n\t{''.join('─' * len(file.name))}\n") 

127 print(recipe.get("description")) 

128 

129 

130class RawRecipe: 

131 """A recipe to be parbaked. 

132 

133 Parameters 

134 ---------- 

135 recipe: str 

136 Name of the recipe file. 

137 model_ids: int | list[int] 

138 Model IDs to set the input paths for. Matches the corresponding workflow 

139 model IDs. 

140 variables: dict[str, Any] aggregation: bool 

141 Recipe variables to be inserted into $VAR placeholders in the recipe. 

142 aggregation: bool 

143 Whether this is an aggregation recipe or just a single case. 

144 

145 Returns 

146 ------- 

147 RawRecipe 

148 """ 

149 

150 recipe: str 

151 model_ids: list[int] 

152 variables: dict[str, Any] 

153 aggregation: bool 

154 

155 def __init__( 

156 self, 

157 recipe: str, 

158 model_ids: int | list[int], 

159 variables: dict[str, Any], 

160 aggregation: bool, 

161 ) -> None: 

162 self.recipe = recipe 

163 self.model_ids = model_ids if isinstance(model_ids, list) else [model_ids] 

164 self.variables = variables 

165 self.aggregation = aggregation 

166 

167 def __str__(self) -> str: 

168 """Return str(self). 

169 

170 Examples 

171 -------- 

172 >>> print(raw_recipe) 

173 generic_surface_spatial_plot_sequence.yaml (model 1) 

174 VARNAME air_temperature 

175 MODEL_NAME Model A 

176 METHOD SEQ 

177 SUBAREA_TYPE None 

178 SUBAREA_EXTENT None 

179 """ 

180 recipe = self.recipe if self.recipe else "<unknown>" 

181 plural = "s" if len(self.model_ids) > 1 else "" 

182 ids = " ".join(str(m) for m in self.model_ids) 

183 aggregation = ", Aggregation" if self.aggregation else "" 

184 pad = max([0] + [len(k) for k in self.variables.keys()]) 

185 variables = "".join(f"\n\t{k:<{pad}} {v}" for k, v in self.variables.items()) 

186 return f"{recipe} (model{plural} {ids}{aggregation}){variables}" 

187 

188 def __eq__(self, value: object) -> bool: 

189 """Return self==value.""" 

190 if isinstance(value, self.__class__): 

191 return ( 

192 self.recipe == value.recipe 

193 and self.model_ids == value.model_ids 

194 and self.variables == value.variables 

195 and self.aggregation == value.aggregation 

196 ) 

197 return NotImplemented 

198 

199 def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: 

200 """Pre-process recipe to bake in all variables. 

201 

202 Parameters 

203 ---------- 

204 ROSE_DATAC: Path 

205 Workflow shared per-cycle data location. 

206 SHARE_DIR: Path 

207 Workflow shared data location. 

208 """ 

209 # Ready recipe file to disk. 

210 unpack_recipe(Path.cwd(), self.recipe) 

211 

212 # Collect configuration from environment. 

213 if self.aggregation: 

214 # Construct the location for the recipe. 

215 recipe_dir = ROSE_DATAC / "aggregation_recipes" 

216 # Construct the input data directories for the cycle. 

217 data_dirs = [ 

218 SHARE_DIR / f"cycle/*/data/{model_id}" for model_id in self.model_ids 

219 ] 

220 else: 

221 recipe_dir = ROSE_DATAC / "recipes" 

222 data_dirs = [ROSE_DATAC / f"data/{model_id}" for model_id in self.model_ids] 

223 

224 # Ensure recipe dir exists. 

225 recipe_dir.mkdir(parents=True, exist_ok=True) 

226 

227 # Add input paths to recipe variables. 

228 self.variables["INPUT_PATHS"] = data_dirs 

229 

230 # Parbake this recipe, saving into recipe_dir. 

231 recipe = parse_recipe(Path(self.recipe), self.variables) 

232 output = recipe_dir / f"{slugify(recipe['title'])}.yaml" 

233 with open(output, "wt") as fp: 

234 with YAML(pure=True, output=fp) as yaml: 

235 yaml.dump(recipe) 

236 

237 

238class Config: 

239 """Namespace for easy access to configuration values. 

240 

241 A namespace for easy access to configuration values (via config.variable), 

242 where undefined attributes return an empty list. An empty list evaluates to 

243 False in boolean contexts and can be safely iterated over, so it acts as an 

244 effective unset value. 

245 

246 Parameters 

247 ---------- 

248 config: dict 

249 Configuration key-value pairs. 

250 

251 Example 

252 ------- 

253 >>> conf = Config({"key": "value"}) 

254 >>> conf.key 

255 'value' 

256 >>> conf.missing 

257 [] 

258 """ 

259 

260 d: dict 

261 

262 def __init__(self, config: dict) -> None: 

263 self.d = config 

264 

265 def __getattr__(self, name: str): 

266 """Return an empty list for missing names.""" 

267 return self.d.get(name, []) 

268 

269 def asdict(self) -> dict: 

270 """Return config as a dictionary.""" 

271 return self.d 

272 

273 

274def load_recipes(variables: dict[str, Any]) -> Iterator[RawRecipe]: 

275 """Load recipes enabled by configuration. 

276 

277 Recipes are loaded using all loaders (python modules) in CSET.loaders. Each 

278 of these loaders must define a function with the signature `load(conf: dict) 

279 -> Iterator[RawRecipe]`, which will be called with `variables`. 

280 

281 A minimal example can be found in `CSET.loaders.test`. 

282 

283 Parameters 

284 ---------- 

285 variables: dict[str, Any] 

286 Workflow configuration from ROSE_SUITE_VARIABLES. 

287 

288 Returns 

289 ------- 

290 Iterator[RawRecipe] 

291 Configured recipes. 

292 

293 Raises 

294 ------ 

295 AttributeError 

296 When a loader doesn't provide a `load` function. 

297 """ 

298 # Import here to avoid circular import. 

299 import CSET.loaders 

300 

301 config = Config(variables) 

302 for loader in CSET.loaders.__all__: 

303 logger.info("Loading recipes from %s", loader) 

304 module = getattr(CSET.loaders, loader) 

305 yield from module.load(config)