Coverage for src/CSET/operators/__init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-02 16:30 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-02 16:30 +0000
1# Copyright 2022-2023 Met Office and contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21from pathlib import Path
22from typing import Union
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET._common import parse_recipe
29from CSET.operators import (
30 aggregate,
31 collapse,
32 constraints,
33 convection,
34 filters,
35 misc,
36 plot,
37 read,
38 regrid,
39 write,
40)
42# Exported operators & functions to use elsewhere.
43__all__ = [
44 "aggregate",
45 "collapse",
46 "constraints",
47 "convection",
48 "execute_recipe_parallel",
49 "execute_recipe_collate",
50 "filters",
51 "get_operator",
52 "misc",
53 "plot",
54 "read",
55 "regrid",
56 "write",
57]
59# Stop iris giving a warning whenever it loads something.
60FUTURE.datum_support = True
61# Stop iris giving a warning whenever it saves something.
62FUTURE.save_split_attrs = True
65def get_operator(name: str):
66 """Get an operator by its name.
68 Parameters
69 ----------
70 name: str
71 The name of the desired operator.
73 Returns
74 -------
75 function
76 The named operator.
78 Raises
79 ------
80 ValueError
81 If name is not an operator.
83 Examples
84 --------
85 >>> CSET.operators.get_operator("read.read_cubes")
86 <function read_cubes at 0x7fcf9353c8b0>
87 """
88 logging.debug("get_operator(%s)", name)
89 try:
90 name_sections = name.split(".")
91 operator = CSET.operators
92 for section in name_sections:
93 operator = getattr(operator, section)
94 if callable(operator):
95 return operator
96 else:
97 raise AttributeError
98 except (AttributeError, TypeError) as err:
99 raise ValueError(f"Unknown operator: {name}") from err
102def _write_metadata(recipe: dict):
103 """Write a meta.json file in the CWD."""
104 # TODO: Investigate whether we might be better served by an SQLite database.
105 metadata = recipe.copy()
106 # Remove steps, as not needed, and might contain non-serialisable types.
107 metadata.pop("parallel", None)
108 metadata.pop("collate", None)
109 with open("meta.json", "wt", encoding="UTF-8") as fp:
110 json.dump(metadata, fp)
111 os.sync()
112 # Stat directory to force NFS to synchronise metadata.
113 os.stat(Path.cwd())
116def _step_parser(step: dict, step_input: any) -> str:
117 """Execute a recipe step, recursively executing any sub-steps."""
118 logging.debug("Executing step: %s", step)
119 kwargs = {}
120 for key in step.keys():
121 if key == "operator":
122 operator = get_operator(step["operator"])
123 logging.info("operator: %s", step["operator"])
124 elif isinstance(step[key], dict) and "operator" in step[key]:
125 logging.debug("Recursing into argument: %s", key)
126 kwargs[key] = _step_parser(step[key], step_input)
127 else:
128 kwargs[key] = step[key]
129 logging.debug("args: %s", kwargs)
130 logging.debug("step_input: %s", step_input)
131 # If first argument of operator is explicitly defined, use that rather
132 # than step_input. This is known through introspection of the operator.
133 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
134 logging.debug("first_arg: %s", first_arg)
135 if first_arg not in kwargs:
136 logging.debug("first_arg not in kwargs, using step_input.")
137 return operator(step_input, **kwargs)
138 else:
139 logging.debug("first_arg in kwargs.")
140 return operator(**kwargs)
143def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path):
144 """Execute the steps in a recipe."""
145 original_working_directory = Path.cwd()
146 os.chdir(output_directory)
147 try:
148 logger = logging.getLogger()
149 diagnostic_log = logging.FileHandler(
150 filename="CSET.log", mode="w", encoding="UTF-8"
151 )
152 diagnostic_log.addFilter(lambda record: record.levelno >= logging.INFO)
153 diagnostic_log.setFormatter(
154 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
155 )
156 logger.addHandler(diagnostic_log)
157 # Create metadata file used by some steps.
158 if style_file:
159 recipe["style_file_path"] = str(style_file)
160 _write_metadata(recipe)
161 # Execute the recipe.
162 for step in steps:
163 step_input = _step_parser(step, step_input)
164 logging.info("Recipe output:\n%s", step_input)
165 finally:
166 os.chdir(original_working_directory)
169def execute_recipe_parallel(
170 recipe_yaml: Union[Path, str],
171 input_directory: Path,
172 output_directory: Path,
173 recipe_variables: dict = None,
174 style_file: Path = None,
175) -> None:
176 """Parse and executes the parallel steps from a recipe file.
178 Parameters
179 ----------
180 recipe_yaml: Path or str
181 Path to a file containing, or string of, a recipe's YAML describing the
182 operators that need running. If a Path is provided it is opened and
183 read.
184 input_file: Path
185 Pathlike to netCDF (or something else that iris read) file to be used as
186 input.
187 output_directory: Path
188 Pathlike indicating desired location of output.
189 recipe_variables: dict
190 Dictionary of variables for the recipe.
192 Raises
193 ------
194 FileNotFoundError
195 The recipe or input file cannot be found.
196 FileExistsError
197 The output directory as actually a file.
198 ValueError
199 The recipe is not well formed.
200 TypeError
201 The provided recipe is not a stream or Path.
202 """
203 if recipe_variables is None:
204 recipe_variables = {}
205 recipe = parse_recipe(recipe_yaml, recipe_variables)
206 step_input = Path(input_directory).absolute()
207 # Create output directory, and an inter-cycle intermediate directory.
208 try:
209 (output_directory / "intermediate").mkdir(parents=True, exist_ok=True)
210 except (FileExistsError, NotADirectoryError) as err:
211 logging.error("Output directory is a file. %s", output_directory)
212 raise err
213 steps = recipe["parallel"]
214 _run_steps(recipe, steps, step_input, output_directory, style_file)
217def execute_recipe_collate(
218 recipe_yaml: Union[Path, str],
219 output_directory: Path,
220 recipe_variables: dict = None,
221 style_file: Path = None,
222) -> None:
223 """Parse and execute the collation steps from a recipe file.
225 Parameters
226 ----------
227 recipe_yaml: Path or str
228 Path to a file containing, or string of, a recipe's YAML describing the
229 operators that need running. If a Path is provided it is opened and
230 read.
231 output_directory: Path
232 Pathlike indicating desired location of output. Must already exist.
233 recipe_variables: dict
234 Dictionary of variables for the recipe.
236 Raises
237 ------
238 ValueError
239 The recipe is not well formed.
240 TypeError
241 The provided recipe is not a stream or Path.
242 """
243 if recipe_variables is None:
244 recipe_variables = {}
245 output_directory = Path(output_directory).resolve()
246 assert output_directory.is_dir()
247 recipe = parse_recipe(recipe_yaml, recipe_variables)
248 # If collate doesn't exist treat it as having no steps.
249 steps = recipe.get("collate", [])
250 _run_steps(recipe, steps, output_directory, output_directory, style_file)