Coverage for src/CSET/_common.py: 100%
145 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Common functionality used across CSET."""
17import ast
18import io
19import json
20import logging
21import re
22from collections.abc import Iterable
23from pathlib import Path
24from textwrap import dedent
25from typing import Any
27import ruamel.yaml
30class ArgumentError(ValueError):
31 """Provided arguments are not understood."""
34def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict:
35 """Parse a recipe into a python dictionary.
37 Parameters
38 ----------
39 recipe_yaml: Path | str
40 Path to a file containing, or a string of, a recipe's YAML describing
41 the operators that need running. If a Path is provided it is opened and
42 read.
43 variables: dict
44 Dictionary of recipe variables. If None templating is not attempted.
46 Returns
47 -------
48 recipe: dict
49 The recipe as a python dictionary.
51 Raises
52 ------
53 ValueError
54 If the recipe is invalid. E.g. invalid YAML, missing any steps, etc.
55 TypeError
56 If recipe_yaml isn't a Path or string.
57 KeyError
58 If needed recipe variables are not supplied.
60 Examples
61 --------
62 >>> CSET._common.parse_recipe(Path("myrecipe.yaml"))
63 {'steps': [{'operator': 'misc.noop'}]}
64 """
65 # Ensure recipe_yaml is something the YAML parser can read.
66 if isinstance(recipe_yaml, str):
67 recipe_yaml = io.StringIO(recipe_yaml)
68 elif not isinstance(recipe_yaml, Path):
69 raise TypeError("recipe_yaml must be a str or Path.")
71 # Parse the recipe YAML.
72 with ruamel.yaml.YAML(typ="safe", pure=True) as yaml:
73 try:
74 recipe = yaml.load(recipe_yaml)
75 except ruamel.yaml.parser.ParserError as err:
76 raise ValueError("ParserError: Invalid YAML") from err
78 logging.debug("Recipe before templating:\n%s", recipe)
79 check_recipe_has_steps(recipe)
81 if variables is not None:
82 logging.debug("Recipe variables: %s", variables)
83 recipe = template_variables(recipe, variables)
85 logging.debug("Recipe after templating:\n%s", recipe)
86 return recipe
89def check_recipe_has_steps(recipe: dict):
90 """Check a recipe has the minimum required steps.
92 Checking that the recipe actually has some steps, and providing helpful
93 error messages otherwise. We must have at least a steps step, as that
94 reads the raw data.
96 Parameters
97 ----------
98 recipe: dict
99 The recipe as a python dictionary.
101 Raises
102 ------
103 ValueError
104 If the recipe is invalid. E.g. invalid YAML, missing any steps, etc.
105 TypeError
106 If recipe isn't a dict.
107 KeyError
108 If needed recipe variables are not supplied.
109 """
110 if not isinstance(recipe, dict):
111 raise TypeError("Recipe must contain a mapping.")
112 if "steps" not in recipe:
113 raise ValueError("Recipe must contain a 'steps' key.")
114 try:
115 if len(recipe["steps"]) < 1:
116 raise ValueError("Recipe must have at least 1 step.")
117 except TypeError as err:
118 raise ValueError("'steps' key must contain a sequence of steps.") from err
121def slugify(s: str) -> str:
122 """Turn a string into a version that can be used everywhere.
124 The resultant string will only consist of a-z, 0-9, dots, dashes, and
125 underscores.
126 """
127 return re.sub(r"[^a-z0-9\._-]+", "_", s.casefold()).strip("_")
130def get_recipe_metadata() -> dict:
131 """Get the metadata of the running recipe."""
132 try:
133 with open("meta.json", "rt", encoding="UTF-8") as fp:
134 return json.load(fp)
135 except FileNotFoundError:
136 meta = {}
137 with open("meta.json", "wt", encoding="UTF-8") as fp:
138 json.dump(meta, fp, indent=2)
139 return {}
142def parse_variable_options(
143 arguments: list[str], input_dir: str | list[str] | None = None
144) -> dict:
145 """Parse a list of arguments into a dictionary of variables.
147 The variable name arguments start with two hyphen-minus (`--`), consisting
148 of only capital letters (`A`-`Z`) and underscores (`_`). While the variable
149 name is restricted, the value of the variable can be any string.
151 Parameters
152 ----------
153 arguments: list[str]
154 List of arguments, e.g: `["--LEVEL", "2", "--STASH=m01s01i001"]`
155 input_dir: str | list[str], optional
156 List of input directories to add into the returned variables.
158 Returns
159 -------
160 recipe_variables: dict
161 Dictionary keyed with the variable names.
163 Raises
164 ------
165 ValueError
166 If any arguments cannot be parsed.
167 """
168 # Convert --input_dir=... to INPUT_PATHS recipe variable.
169 if input_dir is not None:
170 abs_paths = [str(Path(p).absolute()) for p in iter_maybe(input_dir)]
171 arguments.append(f"--INPUT_PATHS={abs_paths}")
172 recipe_variables = {}
173 i = 0
174 while i < len(arguments):
175 if re.fullmatch(r"--[A-Z_]+=.*", arguments[i]):
176 key, value = arguments[i].split("=", 1)
177 elif re.fullmatch(r"--[A-Z_]+", arguments[i]):
178 try:
179 key = arguments[i].strip("-")
180 value = arguments[i + 1]
181 except IndexError as err:
182 raise ArgumentError(f"No value for variable {arguments[i]}") from err
183 i += 1
184 else:
185 raise ArgumentError(f"Unknown argument: {arguments[i]}")
186 try:
187 # Remove quotes from arguments, in case left in CSET_ADDOPTS.
188 if re.fullmatch(r"""["'].+["']""", value):
189 value = value[1:-1]
190 recipe_variables[key.strip("-")] = ast.literal_eval(value)
191 # Capture the many possible exceptions from ast.literal_eval
192 except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
193 recipe_variables[key.strip("-")] = value
194 i += 1
195 return recipe_variables
198def template_variables(recipe: dict | list, variables: dict) -> dict:
199 """Insert variables into recipe.
201 Parameters
202 ----------
203 recipe: dict | list
204 The recipe as a python dictionary. It is updated in-place.
205 variables: dict
206 Dictionary of variables for the recipe.
208 Returns
209 -------
210 recipe: dict
211 Filled recipe as a python dictionary.
213 Raises
214 ------
215 KeyError
216 If needed recipe variables are not supplied.
217 """
218 if isinstance(recipe, dict):
219 index = recipe.keys()
220 elif isinstance(recipe, list):
221 # We have to handle lists for when we have one inside a recipe.
222 index = range(len(recipe))
223 else:
224 raise TypeError("recipe must be a dict or list.", recipe)
226 for i in index:
227 if isinstance(recipe[i], (dict, list)):
228 recipe[i] = template_variables(recipe[i], variables)
229 elif isinstance(recipe[i], str):
230 recipe[i] = replace_template_variable(recipe[i], variables)
231 return recipe
234def replace_template_variable(s: str, variables: dict[str, Any]):
235 """Fill all variable placeholders in the string."""
236 for var_name, var_value in variables.items():
237 placeholder = f"${var_name}"
238 # If the value is just the placeholder we directly overwrite it
239 # to keep the value type.
240 if s == placeholder:
241 # Specially handle Paths and lists of Paths.
242 if isinstance(var_value, Path):
243 var_value = str(var_value)
244 if (
245 isinstance(var_value, list)
246 and var_value
247 and isinstance(var_value[0], Path)
248 ):
249 var_value = [str(p) for p in var_value]
250 s = var_value
251 # We have replaced the whole string, so stop here to avoid
252 # interpreting the new value.
253 break
254 else:
255 s = s.replace(placeholder, str(var_value))
256 if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s):
257 raise KeyError("Variable without a value.", s)
258 return s
261################################################################################
262# Templating code taken from the simple_template package under the 0BSD licence.
263# Original at https://github.com/Fraetor/simple_template
264################################################################################
267class TemplateError(KeyError):
268 """Rendering a template failed due a placeholder without a value."""
271def render(template: str, /, **variables) -> str:
272 """Render the template with the provided variables.
274 The template should contain placeholders that will be replaced. These
275 placeholders consist of the placeholder name within double curly braces. The
276 name of the placeholder should be a valid python identifier. Whitespace
277 between the braces and the name is ignored. E.g.: `{{ placeholder_name }}`
279 An exception will be raised if there are placeholders without corresponding
280 values. It is acceptable to provide unused values; they will be ignored.
282 Parameters
283 ----------
284 template: str
285 Template to fill with variables.
287 **variables: Any
288 Keyword arguments for the placeholder values. The argument name should
289 be the same as the placeholder's name. You can unpack a dictionary of
290 value with `render(template, **my_dict)`.
292 Returns
293 -------
294 rendered_template: str
295 Filled template.
297 Raises
298 ------
299 TemplateError
300 Value not given for a placeholder in the template.
301 TypeError
302 If the template is not a string, or a variable cannot be casted to a
303 string.
305 Examples
306 --------
307 >>> template = "<p>Hello {{myplaceholder}}!</p>"
308 >>> simple_template.render(template, myplaceholder="World")
309 "<p>Hello World!</p>"
310 """
312 def isidentifier(s: str):
313 return s.isidentifier()
315 def extract_placeholders():
316 matches = re.finditer(r"{{\s*([^}]+)\s*}}", template)
317 unique_names = {match.group(1) for match in matches}
318 return filter(isidentifier, unique_names)
320 def substitute_placeholder(name):
321 try:
322 value = str(variables[name])
323 except KeyError as err:
324 raise TemplateError("Placeholder missing value", name) from err
325 pattern = r"{{\s*%s\s*}}" % re.escape(name)
326 return re.sub(pattern, value, template)
328 for name in extract_placeholders():
329 template = substitute_placeholder(name)
330 return template
333def render_file(template_path: str, /, **variables) -> str:
334 """Render a template directly from a file.
336 Otherwise the same as `simple_template.render()`.
338 Examples
339 --------
340 >>> simple_template.render_file("/path/to/template.html", myplaceholder="World")
341 "<p>Hello World!</p>"
342 """
343 with open(template_path, "rt", encoding="UTF-8") as fp:
344 template = fp.read()
345 return render(template, **variables)
348def iter_maybe(thing) -> Iterable:
349 """Ensure thing is Iterable. Strings count as atoms."""
350 if isinstance(thing, Iterable) and not isinstance(thing, str):
351 return thing
352 return (thing,)
355def human_sorted(iterable: Iterable, reverse: bool = False) -> list:
356 """Sort such numbers within strings are sorted correctly."""
357 # Adapted from https://nedbatchelder.com/blog/200712/human_sorting.html
359 def alphanum_key(s):
360 """Turn a string into a list of string and number chunks.
362 >>> alphanum_key("z23a")
363 ["z", 23, "a"]
364 """
365 try:
366 return [int(c) if c.isdecimal() else c for c in re.split(r"(\d+)", s)]
367 except TypeError:
368 return s
370 return sorted(iterable, key=alphanum_key, reverse=reverse)
373def combine_dicts(d1: dict, d2: dict) -> dict:
374 """Recursively combines two dictionaries.
376 Duplicate atoms favour the second dictionary.
377 """
378 # Update existing keys.
379 for key in d1.keys() & d2.keys():
380 if isinstance(d1[key], dict):
381 d1[key] = combine_dicts(d1[key], d2[key])
382 else:
383 d1[key] = d2[key]
384 # Add any new keys.
385 for key in d2.keys() - d1.keys():
386 d1[key] = d2[key]
387 return d1
390def sort_dict(d: dict) -> dict:
391 """Recursively sort a dictionary."""
392 # Thank you to https://stackoverflow.com/a/47882384
393 return {
394 k: sort_dict(v) if isinstance(v, dict) else v
395 for k, v in human_sorted(d.items())
396 }
399def sstrip(text):
400 """Dedent and strip text.
402 Parameters
403 ----------
404 text: str
405 The string to strip.
407 Examples
408 --------
409 >>> print(sstrip('''
410 ... foo
411 ... bar
412 ... baz
413 ... '''))
414 foo
415 bar
416 baz
417 """
418 return dedent(text).strip()