Coverage for src / CSET / recipes / __init__.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-24 08:36 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operations on recipes.""" 

16 

17import importlib.resources 

18import logging 

19from collections.abc import Iterator 

20from pathlib import Path 

21from typing import Any 

22 

23from ruamel.yaml import YAML 

24 

25from CSET._common import parse_recipe, slugify 

26from CSET.cset_workflow.lib.python.jinja_utils import get_models as get_models 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def _recipe_files_in_tree( 

32 recipe_name: str | None = None, input_dir: Path | None = None 

33) -> Iterator[Path]: 

34 """Yield recipe file Paths matching the recipe name.""" 

35 if input_dir is None: 

36 input_dir = importlib.resources.files() 

37 for file in input_dir.iterdir(): 

38 logger.debug("Testing %s", file) 

39 if ( 

40 (recipe_name is None or recipe_name == file.name) 

41 and file.is_file() 

42 and file.suffix == ".yaml" 

43 ): 

44 yield file 

45 elif file.is_dir() and file.name[0] != "_": # Excludes __pycache__ 

46 yield from _recipe_files_in_tree(recipe_name, file) 

47 

48 

49def _get_recipe_file(recipe_name: str, input_dir: Path | None = None) -> Path: 

50 """Return a Path to the recipe file.""" 

51 if input_dir is None: 

52 input_dir = importlib.resources.files() 

53 file = input_dir / recipe_name 

54 logger.debug("Getting recipe: %s", file) 

55 if not file.is_file(): 

56 raise FileNotFoundError("Recipe file does not exist.", recipe_name) 

57 return file 

58 

59 

60def unpack_recipe(recipe_dir: Path, recipe_name: str) -> None: 

61 """ 

62 Unpacks recipes files into a directory, creating it if it doesn't exist. 

63 

64 Parameters 

65 ---------- 

66 recipe_dir: Path 

67 Path to a directory into which to unpack the recipe files. 

68 recipe_name: str 

69 Name of recipe to unpack. 

70 

71 Raises 

72 ------ 

73 FileExistsError 

74 If recipe_dir already exists, and is not a directory. 

75 

76 OSError 

77 If recipe_dir cannot be created, such as insufficient permissions, or 

78 lack of space. 

79 """ 

80 recipe_dir.mkdir(parents=True, exist_ok=True) 

81 output_file = recipe_dir / recipe_name 

82 logger.debug("Saving recipe to %s", output_file) 

83 if output_file.exists(): 

84 logger.debug("%s already exists in target directory, skipping.", recipe_name) 

85 return 

86 logger.info("Unpacking %s to %s", recipe_name, output_file) 

87 file = _get_recipe_file(next(_recipe_files_in_tree(recipe_name))) 

88 output_file.write_bytes(file.read_bytes()) 

89 

90 

91def list_available_recipes() -> None: 

92 """List available recipes to stdout.""" 

93 print("Available recipes:") 

94 for file in _recipe_files_in_tree(): 

95 print(f"\t{file.name}") 

96 

97 

98def detail_recipe(recipe_name: str) -> None: 

99 """Detail the recipe to stdout. 

100 

101 If multiple recipes match the given name they will all be displayed. 

102 

103 Parameters 

104 ---------- 

105 recipe_name: str 

106 Partial match for the recipe name. 

107 """ 

108 for file in _recipe_files_in_tree(recipe_name): 

109 with YAML(typ="safe", pure=True) as yaml: 

110 recipe = yaml.load(file) 

111 print(f"\n\t{file.name}\n\t{''.join('─' * len(file.name))}\n") 

112 print(recipe.get("description")) 

113 

114 

115class RawRecipe: 

116 """A recipe to be parbaked. 

117 

118 Parameters 

119 ---------- 

120 recipe: str 

121 Name of the recipe file. 

122 model_ids: int | list[int] 

123 Model IDs to set the input paths for. Matches the corresponding workflow 

124 model IDs. 

125 variables: dict[str, Any] aggregation: bool 

126 Recipe variables to be inserted into $VAR placeholders in the recipe. 

127 aggregation: bool 

128 Whether this is an aggregation recipe or just a single case. 

129 

130 Returns 

131 ------- 

132 RawRecipe 

133 """ 

134 

135 recipe: str 

136 model_ids: list[int] 

137 variables: dict[str, Any] 

138 aggregation: bool 

139 

140 def __init__( 

141 self, 

142 recipe: str, 

143 model_ids: int | list[int], 

144 variables: dict[str, Any], 

145 aggregation: bool, 

146 ) -> None: 

147 self.recipe = recipe 

148 self.model_ids = model_ids if isinstance(model_ids, list) else [model_ids] 

149 self.variables = variables 

150 self.aggregation = aggregation 

151 

152 def __str__(self) -> str: 

153 """Return str(self). 

154 

155 Examples 

156 -------- 

157 >>> print(raw_recipe) 

158 generic_surface_spatial_plot_sequence.yaml (model 1) 

159 VARNAME air_temperature 

160 MODEL_NAME Model A 

161 METHOD SEQ 

162 SUBAREA_TYPE None 

163 SUBAREA_EXTENT None 

164 """ 

165 recipe = self.recipe if self.recipe else "<unknown>" 

166 plural = "s" if len(self.model_ids) > 1 else "" 

167 ids = " ".join(str(m) for m in self.model_ids) 

168 aggregation = ", Aggregation" if self.aggregation else "" 

169 pad = max([0] + [len(k) for k in self.variables.keys()]) 

170 variables = "".join(f"\n\t{k:<{pad}} {v}" for k, v in self.variables.items()) 

171 return f"{recipe} (model{plural} {ids}{aggregation}){variables}" 

172 

173 def __eq__(self, value: object) -> bool: 

174 """Return self==value.""" 

175 if isinstance(value, self.__class__): 

176 return ( 

177 self.recipe == value.recipe 

178 and self.model_ids == value.model_ids 

179 and self.variables == value.variables 

180 and self.aggregation == value.aggregation 

181 ) 

182 return NotImplemented 

183 

184 def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None: 

185 """Pre-process recipe to bake in all variables. 

186 

187 Parameters 

188 ---------- 

189 ROSE_DATAC: Path 

190 Workflow shared per-cycle data location. 

191 SHARE_DIR: Path 

192 Workflow shared data location. 

193 """ 

194 # Ready recipe file to disk. 

195 unpack_recipe(Path.cwd(), self.recipe) 

196 

197 # Collect configuration from environment. 

198 if self.aggregation: 

199 # Construct the location for the recipe. 

200 recipe_dir = ROSE_DATAC / "aggregation_recipes" 

201 # Construct the input data directories for the cycle. 

202 data_dirs = [ 

203 SHARE_DIR / f"cycle/*/data/{model_id}" for model_id in self.model_ids 

204 ] 

205 else: 

206 recipe_dir = ROSE_DATAC / "recipes" 

207 data_dirs = [ROSE_DATAC / f"data/{model_id}" for model_id in self.model_ids] 

208 

209 # Ensure recipe dir exists. 

210 recipe_dir.mkdir(parents=True, exist_ok=True) 

211 

212 # Add input paths to recipe variables. 

213 self.variables["INPUT_PATHS"] = data_dirs 

214 

215 # Parbake this recipe, saving into recipe_dir. 

216 recipe = parse_recipe(Path(self.recipe), self.variables) 

217 output = recipe_dir / f"{slugify(recipe['title'])}.yaml" 

218 with open(output, "wt") as fp: 

219 with YAML(pure=True, output=fp) as yaml: 

220 yaml.dump(recipe) 

221 

222 

223class Config: 

224 """Namespace for easy access to configuration values. 

225 

226 A namespace for easy access to configuration values (via config.variable), 

227 where undefined attributes return an empty list. An empty list evaluates to 

228 False in boolean contexts and can be safely iterated over, so it acts as an 

229 effective unset value. 

230 

231 Parameters 

232 ---------- 

233 config: dict 

234 Configuration key-value pairs. 

235 

236 Example 

237 ------- 

238 >>> conf = Config({"key": "value"}) 

239 >>> conf.key 

240 'value' 

241 >>> conf.missing 

242 [] 

243 """ 

244 

245 d: dict 

246 

247 def __init__(self, config: dict) -> None: 

248 self.d = config 

249 

250 def __getattr__(self, name: str): 

251 """Return an empty list for missing names.""" 

252 return self.d.get(name, []) 

253 

254 def asdict(self) -> dict: 

255 """Return config as a dictionary.""" 

256 return self.d 

257 

258 

259def load_recipes(variables: dict[str, Any]) -> Iterator[RawRecipe]: 

260 """Load recipes enabled by configuration. 

261 

262 Recipes are loaded using all loaders (python modules) in CSET.loaders. Each 

263 of these loaders must define a function with the signature `load(conf: dict) 

264 -> Iterator[RawRecipe]`, which will be called with `variables`. 

265 

266 A minimal example can be found in `CSET.loaders.test`. 

267 

268 Parameters 

269 ---------- 

270 variables: dict[str, Any] 

271 Workflow configuration from ROSE_SUITE_VARIABLES. 

272 

273 Returns 

274 ------- 

275 Iterator[RawRecipe] 

276 Configured recipes. 

277 

278 Raises 

279 ------ 

280 AttributeError 

281 When a loader doesn't provide a `load` function. 

282 """ 

283 # Import here to avoid circular import. 

284 import CSET.loaders 

285 

286 config = Config(variables) 

287 for loader in CSET.loaders.__all__: 

288 logger.info("Loading recipes from %s", loader) 

289 module = getattr(CSET.loaders, loader) 

290 yield from module.load(config)