Coverage for src / CSET / recipes / __init__.py: 100%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-12 09:16 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-12 09:16 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operations on recipes."""
17import hashlib
18import importlib.resources
19import logging
20from collections.abc import Iterator
21from io import StringIO
22from pathlib import Path
23from typing import Any
25from ruamel.yaml import YAML
27from CSET._common import parse_recipe, slugify
28from CSET.cset_workflow.lib.python.jinja_utils import get_models as get_models
30logger = logging.getLogger(__name__)
33def _recipe_files_in_tree(
34 recipe_name: str | None = None, input_dir: Path | None = None
35) -> Iterator[Path]:
36 """Yield recipe file Paths matching the recipe name."""
37 if input_dir is None:
38 input_dir = importlib.resources.files()
39 for file in input_dir.iterdir():
40 logger.debug("Testing %s", file)
41 if (
42 (recipe_name is None or recipe_name == file.name)
43 and file.is_file()
44 and file.suffix == ".yaml"
45 ):
46 yield file
47 elif file.is_dir() and file.name[0] != "_": # Excludes __pycache__
48 yield from _recipe_files_in_tree(recipe_name, file)
51def _get_recipe_file(recipe_name: str, input_dir: Path | None = None) -> Path:
52 """Return a Path to the recipe file."""
53 if input_dir is None:
54 input_dir = importlib.resources.files()
55 file = input_dir / recipe_name
56 logger.debug("Getting recipe: %s", file)
57 if not file.is_file():
58 raise FileNotFoundError("Recipe file does not exist.", recipe_name)
59 return file
62def unpack_recipe(recipe_dir: Path, recipe_name: str) -> None:
63 """
64 Unpacks recipes files into a directory, creating it if it doesn't exist.
66 Parameters
67 ----------
68 recipe_dir: Path
69 Path to a directory into which to unpack the recipe files.
70 recipe_name: str
71 Name of recipe to unpack.
73 Raises
74 ------
75 FileExistsError
76 If recipe_dir already exists, and is not a directory.
78 OSError
79 If recipe_dir cannot be created, such as insufficient permissions, or
80 lack of space.
81 """
82 recipe_dir.mkdir(parents=True, exist_ok=True)
83 output_file = recipe_dir / recipe_name
84 logger.debug("Saving recipe to %s", output_file)
85 if output_file.exists():
86 logger.debug("%s already exists in target directory, skipping.", recipe_name)
87 return
88 logger.info("Unpacking %s to %s", recipe_name, output_file)
89 file = _get_recipe_file(next(_recipe_files_in_tree(recipe_name)))
90 output_file.write_bytes(file.read_bytes())
93def list_available_recipes() -> None:
94 """List available recipes to stdout."""
95 print("Available recipes:")
96 for file in _recipe_files_in_tree():
97 print(f"\t{file.name}")
100def detail_recipe(recipe_name: str) -> None:
101 """Detail the recipe to stdout.
103 If multiple recipes match the given name they will all be displayed.
105 Parameters
106 ----------
107 recipe_name: str
108 Partial match for the recipe name.
109 """
110 for file in _recipe_files_in_tree(recipe_name):
111 with YAML(typ="safe", pure=True) as yaml:
112 recipe = yaml.load(file)
113 print(f"\n\t{file.name}\n\t{''.join('─' * len(file.name))}\n")
114 print(recipe.get("description"))
117class RawRecipe:
118 """A recipe to be parbaked.
120 Parameters
121 ----------
122 recipe: str
123 Name of the recipe file.
124 model_ids: int | list[int]
125 Model IDs to set the input paths for. Matches the corresponding workflow
126 model IDs.
127 variables: dict[str, Any] aggregation: bool
128 Recipe variables to be inserted into $VAR placeholders in the recipe.
129 aggregation: bool
130 Whether this is an aggregation recipe or just a single case.
132 Returns
133 -------
134 RawRecipe
135 """
137 recipe: str
138 model_ids: list[int]
139 variables: dict[str, Any]
140 aggregation: bool
142 def __init__(
143 self,
144 recipe: str,
145 model_ids: int | list[int],
146 variables: dict[str, Any],
147 aggregation: bool,
148 ) -> None:
149 self.recipe = recipe
150 self.model_ids = model_ids if isinstance(model_ids, list) else [model_ids]
151 self.variables = variables
152 self.aggregation = aggregation
154 def __str__(self) -> str:
155 """Return str(self).
157 Examples
158 --------
159 >>> print(raw_recipe)
160 generic_surface_spatial_plot_sequence.yaml (model 1)
161 VARNAME air_temperature
162 MODEL_NAME Model A
163 METHOD SEQ
164 SUBAREA_TYPE None
165 SUBAREA_EXTENT None
166 """
167 recipe = self.recipe if self.recipe else "<unknown>"
168 plural = "s" if len(self.model_ids) > 1 else ""
169 ids = " ".join(str(m) for m in self.model_ids)
170 aggregation = ", Aggregation" if self.aggregation else ""
171 pad = max([0] + [len(k) for k in self.variables.keys()])
172 variables = "".join(f"\n\t{k:<{pad}} {v}" for k, v in self.variables.items())
173 return f"{recipe} (model{plural} {ids}{aggregation}){variables}"
175 def __eq__(self, value: object) -> bool:
176 """Return self==value."""
177 if isinstance(value, self.__class__):
178 return (
179 self.recipe == value.recipe
180 and self.model_ids == value.model_ids
181 and self.variables == value.variables
182 and self.aggregation == value.aggregation
183 )
184 return NotImplemented
186 def parbake(self, ROSE_DATAC: Path, SHARE_DIR: Path) -> None:
187 """Pre-process recipe to bake in all variables.
189 Parameters
190 ----------
191 ROSE_DATAC: Path
192 Workflow shared per-cycle data location.
193 SHARE_DIR: Path
194 Workflow shared data location.
195 """
196 # Ready recipe file to disk.
197 unpack_recipe(Path.cwd(), self.recipe)
199 # Collect configuration from environment.
200 if self.aggregation:
201 # Construct the location for the recipe.
202 recipe_dir = ROSE_DATAC / "aggregation_recipes"
203 # Construct the input data directories for the cycle.
204 data_dirs = [
205 SHARE_DIR / f"cycle/*/data/{model_id}" for model_id in self.model_ids
206 ]
207 else:
208 recipe_dir = ROSE_DATAC / "recipes"
209 data_dirs = [ROSE_DATAC / f"data/{model_id}" for model_id in self.model_ids]
211 # Ensure recipe dir exists.
212 recipe_dir.mkdir(parents=True, exist_ok=True)
214 # Add input paths to recipe variables.
215 self.variables["INPUT_PATHS"] = data_dirs
217 # Parbake this recipe, saving into recipe_dir.
218 recipe = parse_recipe(Path(self.recipe), self.variables)
220 # Serialise into memory, as we use the serialised value twice.
221 with StringIO() as s:
222 with YAML(pure=True, output=s) as yaml:
223 yaml.dump(recipe)
224 serialised_recipe = s.getvalue().encode()
225 # Include shortened hash in filename to avoid collisions between recipes
226 # with the same title.
227 digest = hashlib.sha256(serialised_recipe).hexdigest()
228 output_filename = recipe_dir / f"{slugify(recipe['title'])}_{digest[:12]}.yaml"
229 with open(output_filename, "wb") as fp:
230 fp.write(serialised_recipe)
233class Config:
234 """Namespace for easy access to configuration values.
236 A namespace for easy access to configuration values (via config.variable),
237 where undefined attributes return an empty list. An empty list evaluates to
238 False in boolean contexts and can be safely iterated over, so it acts as an
239 effective unset value.
241 Parameters
242 ----------
243 config: dict
244 Configuration key-value pairs.
246 Example
247 -------
248 >>> conf = Config({"key": "value"})
249 >>> conf.key
250 'value'
251 >>> conf.missing
252 []
253 """
255 d: dict
257 def __init__(self, config: dict) -> None:
258 self.d = config
260 def __getattr__(self, name: str):
261 """Return an empty list for missing names."""
262 return self.d.get(name, [])
264 def asdict(self) -> dict:
265 """Return config as a dictionary."""
266 return self.d
269def load_recipes(variables: dict[str, Any]) -> Iterator[RawRecipe]:
270 """Load recipes enabled by configuration.
272 Recipes are loaded using all loaders (python modules) in CSET.loaders. Each
273 of these loaders must define a function with the signature `load(conf: dict)
274 -> Iterator[RawRecipe]`, which will be called with `variables`.
276 A minimal example can be found in `CSET.loaders.test`.
278 Parameters
279 ----------
280 variables: dict[str, Any]
281 Workflow configuration from ROSE_SUITE_VARIABLES.
283 Returns
284 -------
285 Iterator[RawRecipe]
286 Configured recipes.
288 Raises
289 ------
290 AttributeError
291 When a loader doesn't provide a `load` function.
292 """
293 # Import here to avoid circular import.
294 import CSET.loaders
296 config = Config(variables)
297 for loader in CSET.loaders.__all__:
298 logger.info("Loading recipes from %s", loader)
299 module = getattr(CSET.loaders, loader)
300 yield from module.load(config)