Coverage for src/CSET/operators/__init__.py: 89%
102 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-01 08:37 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-01 08:37 +0000
1# Copyright 2022-2023 Met Office and contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import warnings
22from pathlib import Path
23from typing import Union
25from iris import FUTURE
27# Import operators here so they are exported for use by recipes.
28import CSET.operators
29from CSET._common import parse_recipe
30from CSET.operators import (
31 aggregate,
32 collapse,
33 constraints,
34 convection,
35 filters,
36 misc,
37 plot,
38 read,
39 regrid,
40 write,
41)
43# Exported operators & functions to use elsewhere.
44__all__ = [
45 "aggregate",
46 "collapse",
47 "constraints",
48 "convection",
49 "execute_recipe_parallel",
50 "execute_recipe_collate",
51 "filters",
52 "get_operator",
53 "misc",
54 "plot",
55 "read",
56 "regrid",
57 "write",
58]
60# Stop iris giving a warning whenever it loads something.
61FUTURE.datum_support = True
62# Stop iris giving a warning whenever it saves something.
63FUTURE.save_split_attrs = True
66def get_operator(name: str):
67 """Get an operator by its name.
69 Parameters
70 ----------
71 name: str
72 The name of the desired operator.
74 Returns
75 -------
76 function
77 The named operator.
79 Raises
80 ------
81 ValueError
82 If name is not an operator.
84 Examples
85 --------
86 >>> CSET.operators.get_operator("read.read_cubes")
87 <function read_cubes at 0x7fcf9353c8b0>
88 """
89 logging.debug("get_operator(%s)", name)
90 try:
91 name_sections = name.split(".")
92 operator = CSET.operators
93 for section in name_sections:
94 operator = getattr(operator, section)
95 if callable(operator):
96 return operator
97 else:
98 raise AttributeError
99 except (AttributeError, TypeError) as err:
100 raise ValueError(f"Unknown operator: {name}") from err
103def _write_metadata(recipe: dict):
104 """Write a meta.json file in the CWD."""
105 # TODO: Investigate whether we might be better served by an SQLite database.
106 metadata = recipe.copy()
107 # Remove steps, as not needed, and might contain non-serialisable types.
108 metadata.pop("parallel", None)
109 metadata.pop("steps", None)
110 metadata.pop("collate", None)
111 metadata.pop("post-steps", None)
112 with open("meta.json", "wt", encoding="UTF-8") as fp:
113 json.dump(metadata, fp)
114 os.sync()
115 # Stat directory to force NFS to synchronise metadata.
116 os.stat(Path.cwd())
119def _step_parser(step: dict, step_input: any) -> str:
120 """Execute a recipe step, recursively executing any sub-steps."""
121 logging.debug("Executing step: %s", step)
122 kwargs = {}
123 for key in step.keys():
124 if key == "operator":
125 operator = get_operator(step["operator"])
126 logging.info("operator: %s", step["operator"])
127 elif isinstance(step[key], dict) and "operator" in step[key]:
128 logging.debug("Recursing into argument: %s", key)
129 kwargs[key] = _step_parser(step[key], step_input)
130 else:
131 kwargs[key] = step[key]
132 logging.debug("args: %s", kwargs)
133 logging.debug("step_input: %s", step_input)
134 # If first argument of operator is explicitly defined, use that rather
135 # than step_input. This is known through introspection of the operator.
136 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
137 logging.debug("first_arg: %s", first_arg)
138 if first_arg not in kwargs:
139 logging.debug("first_arg not in kwargs, using step_input.")
140 return operator(step_input, **kwargs)
141 else:
142 logging.debug("first_arg in kwargs.")
143 return operator(**kwargs)
146def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path):
147 """Execute the steps in a recipe."""
148 original_working_directory = Path.cwd()
149 os.chdir(output_directory)
150 try:
151 logger = logging.getLogger()
152 diagnostic_log = logging.FileHandler(
153 filename="CSET.log", mode="w", encoding="UTF-8"
154 )
155 diagnostic_log.addFilter(lambda record: record.levelno >= logging.INFO)
156 diagnostic_log.setFormatter(
157 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
158 )
159 logger.addHandler(diagnostic_log)
160 # Create metadata file used by some steps.
161 if style_file: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 recipe["style_file_path"] = str(style_file)
163 _write_metadata(recipe)
164 # Execute the recipe.
165 for step in steps:
166 step_input = _step_parser(step, step_input)
167 logging.info("Recipe output:\n%s", step_input)
168 finally:
169 os.chdir(original_working_directory)
172def execute_recipe_parallel(
173 recipe_yaml: Union[Path, str],
174 input_directory: Path,
175 output_directory: Path,
176 recipe_variables: dict = None,
177 style_file: Path = None,
178) -> None:
179 """Parse and executes the parallel steps from a recipe file.
181 Parameters
182 ----------
183 recipe_yaml: Path or str
184 Path to a file containing, or string of, a recipe's YAML describing the
185 operators that need running. If a Path is provided it is opened and
186 read.
187 input_file: Path
188 Pathlike to netCDF (or something else that iris read) file to be used as
189 input.
190 output_directory: Path
191 Pathlike indicating desired location of output.
192 recipe_variables: dict
193 Dictionary of variables for the recipe.
195 Raises
196 ------
197 FileNotFoundError
198 The recipe or input file cannot be found.
199 FileExistsError
200 The output directory as actually a file.
201 ValueError
202 The recipe is not well formed.
203 TypeError
204 The provided recipe is not a stream or Path.
205 """
206 if recipe_variables is None:
207 recipe_variables = {}
208 recipe = parse_recipe(recipe_yaml, recipe_variables)
209 step_input = Path(input_directory).absolute()
210 # Create output directory, and an inter-cycle intermediate directory.
211 try:
212 (output_directory / "intermediate").mkdir(parents=True, exist_ok=True)
213 except (FileExistsError, NotADirectoryError) as err:
214 logging.error("Output directory is a file. %s", output_directory)
215 raise err
216 # If parallel doesn't exist try steps.
217 try:
218 steps = recipe["parallel"]
219 except KeyError:
220 if "steps" in recipe:
221 warnings.warn(
222 "'steps' recipe key is deprecated. Use 'parallel' instead.",
223 DeprecationWarning,
224 stacklevel=1,
225 )
226 steps = recipe["steps"]
227 _run_steps(recipe, steps, step_input, output_directory, style_file)
230def execute_recipe_collate(
231 recipe_yaml: Union[Path, str],
232 output_directory: Path,
233 recipe_variables: dict = None,
234 style_file: Path = None,
235) -> None:
236 """Parse and execute the collation steps from a recipe file.
238 Parameters
239 ----------
240 recipe_yaml: Path or str
241 Path to a file containing, or string of, a recipe's YAML describing the
242 operators that need running. If a Path is provided it is opened and
243 read.
244 output_directory: Path
245 Pathlike indicating desired location of output. Must already exist.
246 recipe_variables: dict
247 Dictionary of variables for the recipe.
249 Raises
250 ------
251 ValueError
252 The recipe is not well formed.
253 TypeError
254 The provided recipe is not a stream or Path.
255 """
256 if recipe_variables is None:
257 recipe_variables = {}
258 output_directory = Path(output_directory).resolve()
259 assert output_directory.is_dir()
260 recipe = parse_recipe(recipe_yaml, recipe_variables)
261 # If collate doesn't exist try post-steps, else treat it as having no steps.
262 try:
263 steps = recipe["collate"]
264 except KeyError:
265 if "post-steps" in recipe:
266 warnings.warn(
267 "'post-steps' recipe key is deprecated. Use 'collate' instead.",
268 DeprecationWarning,
269 stacklevel=1,
270 )
271 steps = recipe.get("post-steps", tuple())
272 _run_steps(recipe, steps, output_directory, output_directory, style_file)