Coverage for src/CSET/operators/__init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 10:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 10:49 +0000
1# © Crown copyright, Met Office (2022-2026) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import zipfile
22from pathlib import Path
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET.operators import (
29 ageofair,
30 aggregate,
31 aviation,
32 collapse,
33 constraints,
34 convection,
35 curvature,
36 ensembles,
37 filters,
38 humidity,
39 imageprocessing,
40 mesoscale,
41 misc,
42 plot,
43 precipitation,
44 pressure,
45 read,
46 regrid,
47 scoreswrappers,
48 temperature,
49 transect,
50 wind,
51 write,
52)
54# Exported operators & functions to use elsewhere.
55__all__ = [
56 "ageofair",
57 "aggregate",
58 "aviation",
59 "collapse",
60 "constraints",
61 "convection",
62 "curvature",
63 "ensembles",
64 "execute_recipe",
65 "filters",
66 "humidity",
67 "get_operator",
68 "imageprocessing",
69 "mesoscale",
70 "misc",
71 "plot",
72 "precipitation",
73 "pressure",
74 "read",
75 "regrid",
76 "temperature",
77 "scoreswrappers",
78 "transect",
79 "wind",
80 "write",
81]
83# Stop iris giving a warning whenever it loads something.
84FUTURE.datum_support = True
85# Stop iris giving a warning whenever it saves something.
86FUTURE.save_split_attrs = True
87# Accept microsecond precision in iris times.
88FUTURE.date_microseconds = True
91def get_operator(name: str):
92 """Get an operator by its name.
94 Parameters
95 ----------
96 name: str
97 The name of the desired operator.
99 Returns
100 -------
101 function
102 The named operator.
104 Raises
105 ------
106 ValueError
107 If name is not an operator.
109 Examples
110 --------
111 >>> CSET.operators.get_operator("read.read_cubes")
112 <function read_cubes at 0x7fcf9353c8b0>
113 """
114 logging.debug("get_operator(%s)", name)
115 try:
116 name_sections = name.split(".")
117 operator = CSET.operators
118 for section in name_sections:
119 operator = getattr(operator, section)
120 if callable(operator):
121 return operator
122 else:
123 raise AttributeError
124 except (AttributeError, TypeError) as err:
125 raise ValueError(f"Unknown operator: {name}") from err
128def _write_metadata(recipe: dict):
129 """Write a meta.json file in the CWD."""
130 metadata = recipe.copy()
131 # Remove steps, as not needed, and might contain non-serialisable types.
132 metadata.pop("steps", None)
133 # To remove long variable names with suffix
134 if "title" in metadata:
135 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "")
136 metadata["title"] = metadata["title"].replace("_radiative_timestep", "")
137 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "")
138 with open("meta.json", "wt", encoding="UTF-8") as fp:
139 json.dump(metadata, fp, indent=2)
142def _step_parser(step: dict, step_input: any) -> str:
143 """Execute a recipe step, recursively executing any sub-steps."""
144 logging.debug("Executing step: %s", step)
145 kwargs = {}
146 for key in step.keys():
147 if key == "operator":
148 operator = get_operator(step["operator"])
149 logging.info("operator: %s", step["operator"])
150 elif isinstance(step[key], dict) and "operator" in step[key]:
151 logging.debug("Recursing into argument: %s", key)
152 kwargs[key] = _step_parser(step[key], step_input)
153 else:
154 kwargs[key] = step[key]
155 logging.debug("args: %s", kwargs)
156 logging.debug("step_input: %s", step_input)
157 # If first argument of operator is explicitly defined, use that rather
158 # than step_input. This is known through introspection of the operator.
159 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
160 logging.debug("first_arg: %s", first_arg)
161 if first_arg not in kwargs:
162 logging.debug("first_arg not in kwargs, using step_input.")
163 return operator(step_input, **kwargs)
164 else:
165 logging.debug("first_arg in kwargs.")
166 return operator(**kwargs)
169def create_diagnostic_archive():
170 """Create archive for easy download of plots and data."""
171 output_directory: Path = Path.cwd()
172 archive_path = output_directory / "diagnostic.zip"
173 with zipfile.ZipFile(
174 archive_path, "w", compression=zipfile.ZIP_DEFLATED
175 ) as archive:
176 for file in output_directory.rglob("*"):
177 # Check the archive doesn't add itself.
178 if not file.samefile(archive_path):
179 archive.write(file, arcname=file.relative_to(output_directory))
182def execute_recipe(
183 recipe: dict,
184 output_directory: Path,
185 style_file: Path = None,
186 plot_resolution: int = None,
187 skip_write: bool = None,
188) -> None:
189 """Parse and executes the steps from a recipe file.
191 Parameters
192 ----------
193 recipe: dict
194 Parsed recipe.
195 output_directory: Path
196 Pathlike indicating desired location of output.
197 style_file: Path, optional
198 Path to a style file.
199 plot_resolution: int, optional
200 Resolution of plots in dpi.
201 skip_write: bool, optional
202 Skip saving processed output alongside plots.
204 Raises
205 ------
206 FileNotFoundError
207 The recipe or input file cannot be found.
208 FileExistsError
209 The output directory as actually a file.
210 ValueError
211 The recipe is not well formed.
212 TypeError
213 The provided recipe is not a stream or Path.
214 """
215 # Create output directory.
216 try:
217 output_directory.mkdir(parents=True, exist_ok=True)
218 except (FileExistsError, NotADirectoryError) as err:
219 logging.error("Output directory is a file. %s", output_directory)
220 raise err
221 steps = recipe["steps"]
223 # Execute the steps in a recipe.
224 original_working_directory = Path.cwd()
225 try:
226 os.chdir(output_directory)
227 logger = logging.getLogger(__name__)
228 diagnostic_log = logging.FileHandler(
229 filename="CSET.log", mode="w", encoding="UTF-8"
230 )
231 diagnostic_log.setFormatter(
232 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
233 )
234 logger.addHandler(diagnostic_log)
235 # Create metadata file used by some steps.
236 if style_file:
237 recipe["style_file_path"] = str(style_file)
238 if plot_resolution:
239 recipe["plot_resolution"] = plot_resolution
240 if skip_write:
241 recipe["skip_write"] = skip_write
242 _write_metadata(recipe)
244 # Execute the recipe.
245 step_input = None
246 for step in steps:
247 step_input = _step_parser(step, step_input)
248 logger.info("Recipe output:\n%s", step_input)
250 logger.info("Creating diagnostic archive.")
251 create_diagnostic_archive()
252 finally:
253 os.chdir(original_working_directory)