Coverage for src / CSET / operators / __init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 09:52 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 09:52 +0000
1# © Crown copyright, Met Office (2022-2026) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import zipfile
22from pathlib import Path
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET.operators import (
29 ageofair,
30 aggregate,
31 aviation,
32 collapse,
33 constraints,
34 convection,
35 ensembles,
36 filters,
37 humidity,
38 imageprocessing,
39 mesoscale,
40 misc,
41 plot,
42 precipitation,
43 pressure,
44 read,
45 regrid,
46 temperature,
47 transect,
48 wind,
49 write,
50)
52# Exported operators & functions to use elsewhere.
53__all__ = [
54 "ageofair",
55 "aggregate",
56 "aviation",
57 "collapse",
58 "constraints",
59 "convection",
60 "ensembles",
61 "execute_recipe",
62 "filters",
63 "humidity",
64 "get_operator",
65 "imageprocessing",
66 "mesoscale",
67 "misc",
68 "plot",
69 "precipitation",
70 "pressure",
71 "read",
72 "regrid",
73 "temperature",
74 "transect",
75 "wind",
76 "write",
77]
79# Stop iris giving a warning whenever it loads something.
80FUTURE.datum_support = True
81# Stop iris giving a warning whenever it saves something.
82FUTURE.save_split_attrs = True
83# Accept microsecond precision in iris times.
84FUTURE.date_microseconds = True
87def get_operator(name: str):
88 """Get an operator by its name.
90 Parameters
91 ----------
92 name: str
93 The name of the desired operator.
95 Returns
96 -------
97 function
98 The named operator.
100 Raises
101 ------
102 ValueError
103 If name is not an operator.
105 Examples
106 --------
107 >>> CSET.operators.get_operator("read.read_cubes")
108 <function read_cubes at 0x7fcf9353c8b0>
109 """
110 logging.debug("get_operator(%s)", name)
111 try:
112 name_sections = name.split(".")
113 operator = CSET.operators
114 for section in name_sections:
115 operator = getattr(operator, section)
116 if callable(operator):
117 return operator
118 else:
119 raise AttributeError
120 except (AttributeError, TypeError) as err:
121 raise ValueError(f"Unknown operator: {name}") from err
124def _write_metadata(recipe: dict):
125 """Write a meta.json file in the CWD."""
126 metadata = recipe.copy()
127 # Remove steps, as not needed, and might contain non-serialisable types.
128 metadata.pop("steps", None)
129 # To remove long variable names with suffix
130 if "title" in metadata:
131 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "")
132 metadata["title"] = metadata["title"].replace("_radiative_timestep", "")
133 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "")
134 with open("meta.json", "wt", encoding="UTF-8") as fp:
135 json.dump(metadata, fp, indent=2)
138def _step_parser(step: dict, step_input: any) -> str:
139 """Execute a recipe step, recursively executing any sub-steps."""
140 logging.debug("Executing step: %s", step)
141 kwargs = {}
142 for key in step.keys():
143 if key == "operator":
144 operator = get_operator(step["operator"])
145 logging.info("operator: %s", step["operator"])
146 elif isinstance(step[key], dict) and "operator" in step[key]:
147 logging.debug("Recursing into argument: %s", key)
148 kwargs[key] = _step_parser(step[key], step_input)
149 else:
150 kwargs[key] = step[key]
151 logging.debug("args: %s", kwargs)
152 logging.debug("step_input: %s", step_input)
153 # If first argument of operator is explicitly defined, use that rather
154 # than step_input. This is known through introspection of the operator.
155 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
156 logging.debug("first_arg: %s", first_arg)
157 if first_arg not in kwargs:
158 logging.debug("first_arg not in kwargs, using step_input.")
159 return operator(step_input, **kwargs)
160 else:
161 logging.debug("first_arg in kwargs.")
162 return operator(**kwargs)
165def create_diagnostic_archive():
166 """Create archive for easy download of plots and data."""
167 output_directory: Path = Path.cwd()
168 archive_path = output_directory / "diagnostic.zip"
169 with zipfile.ZipFile(
170 archive_path, "w", compression=zipfile.ZIP_DEFLATED
171 ) as archive:
172 for file in output_directory.rglob("*"):
173 # Check the archive doesn't add itself.
174 if not file.samefile(archive_path):
175 archive.write(file, arcname=file.relative_to(output_directory))
178def execute_recipe(
179 recipe: dict,
180 output_directory: Path,
181 style_file: Path = None,
182 plot_resolution: int = None,
183 skip_write: bool = None,
184) -> None:
185 """Parse and executes the steps from a recipe file.
187 Parameters
188 ----------
189 recipe: dict
190 Parsed recipe.
191 output_directory: Path
192 Pathlike indicating desired location of output.
193 style_file: Path, optional
194 Path to a style file.
195 plot_resolution: int, optional
196 Resolution of plots in dpi.
197 skip_write: bool, optional
198 Skip saving processed output alongside plots.
200 Raises
201 ------
202 FileNotFoundError
203 The recipe or input file cannot be found.
204 FileExistsError
205 The output directory as actually a file.
206 ValueError
207 The recipe is not well formed.
208 TypeError
209 The provided recipe is not a stream or Path.
210 """
211 # Create output directory.
212 try:
213 output_directory.mkdir(parents=True, exist_ok=True)
214 except (FileExistsError, NotADirectoryError) as err:
215 logging.error("Output directory is a file. %s", output_directory)
216 raise err
217 steps = recipe["steps"]
219 # Execute the steps in a recipe.
220 original_working_directory = Path.cwd()
221 try:
222 os.chdir(output_directory)
223 logger = logging.getLogger(__name__)
224 diagnostic_log = logging.FileHandler(
225 filename="CSET.log", mode="w", encoding="UTF-8"
226 )
227 diagnostic_log.setFormatter(
228 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
229 )
230 logger.addHandler(diagnostic_log)
231 # Create metadata file used by some steps.
232 if style_file:
233 recipe["style_file_path"] = str(style_file)
234 if plot_resolution:
235 recipe["plot_resolution"] = plot_resolution
236 if skip_write:
237 recipe["skip_write"] = skip_write
238 _write_metadata(recipe)
240 # Execute the recipe.
241 step_input = None
242 for step in steps:
243 step_input = _step_parser(step, step_input)
244 logger.info("Recipe output:\n%s", step_input)
246 logger.info("Creating diagnostic archive.")
247 create_diagnostic_archive()
248 finally:
249 os.chdir(original_working_directory)