Coverage for src / CSET / operators / __init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-02 17:30 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-02 17:30 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import zipfile
22from pathlib import Path
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET.operators import (
29 ageofair,
30 aggregate,
31 aviation,
32 collapse,
33 constraints,
34 convection,
35 ensembles,
36 filters,
37 imageprocessing,
38 mesoscale,
39 misc,
40 plot,
41 power_spectrum,
42 read,
43 regrid,
44 transect,
45 wind,
46 write,
47)
49# Exported operators & functions to use elsewhere.
50__all__ = [
51 "ageofair",
52 "aggregate",
53 "aviation",
54 "collapse",
55 "constraints",
56 "convection",
57 "ensembles",
58 "execute_recipe",
59 "filters",
60 "get_operator",
61 "imageprocessing",
62 "mesoscale",
63 "misc",
64 "plot",
65 "power_spectrum",
66 "read",
67 "regrid",
68 "transect",
69 "wind",
70 "write",
71]
73# Stop iris giving a warning whenever it loads something.
74FUTURE.datum_support = True
75# Stop iris giving a warning whenever it saves something.
76FUTURE.save_split_attrs = True
77# Accept microsecond precision in iris times.
78FUTURE.date_microseconds = True
81def get_operator(name: str):
82 """Get an operator by its name.
84 Parameters
85 ----------
86 name: str
87 The name of the desired operator.
89 Returns
90 -------
91 function
92 The named operator.
94 Raises
95 ------
96 ValueError
97 If name is not an operator.
99 Examples
100 --------
101 >>> CSET.operators.get_operator("read.read_cubes")
102 <function read_cubes at 0x7fcf9353c8b0>
103 """
104 logging.debug("get_operator(%s)", name)
105 try:
106 name_sections = name.split(".")
107 operator = CSET.operators
108 for section in name_sections:
109 operator = getattr(operator, section)
110 if callable(operator):
111 return operator
112 else:
113 raise AttributeError
114 except (AttributeError, TypeError) as err:
115 raise ValueError(f"Unknown operator: {name}") from err
118def _write_metadata(recipe: dict):
119 """Write a meta.json file in the CWD."""
120 metadata = recipe.copy()
121 # Remove steps, as not needed, and might contain non-serialisable types.
122 metadata.pop("steps", None)
123 # To remove long variable names with suffix
124 if "title" in metadata:
125 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "")
126 metadata["title"] = metadata["title"].replace("_radiative_timestep", "")
127 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "")
128 with open("meta.json", "wt", encoding="UTF-8") as fp:
129 json.dump(metadata, fp, indent=2)
132def _step_parser(step: dict, step_input: any) -> str:
133 """Execute a recipe step, recursively executing any sub-steps."""
134 logging.debug("Executing step: %s", step)
135 kwargs = {}
136 for key in step.keys():
137 if key == "operator":
138 operator = get_operator(step["operator"])
139 logging.info("operator: %s", step["operator"])
140 elif isinstance(step[key], dict) and "operator" in step[key]:
141 logging.debug("Recursing into argument: %s", key)
142 kwargs[key] = _step_parser(step[key], step_input)
143 else:
144 kwargs[key] = step[key]
145 logging.debug("args: %s", kwargs)
146 logging.debug("step_input: %s", step_input)
147 # If first argument of operator is explicitly defined, use that rather
148 # than step_input. This is known through introspection of the operator.
149 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
150 logging.debug("first_arg: %s", first_arg)
151 if first_arg not in kwargs:
152 logging.debug("first_arg not in kwargs, using step_input.")
153 return operator(step_input, **kwargs)
154 else:
155 logging.debug("first_arg in kwargs.")
156 return operator(**kwargs)
159def create_diagnostic_archive():
160 """Create archive for easy download of plots and data."""
161 output_directory: Path = Path.cwd()
162 archive_path = output_directory / "diagnostic.zip"
163 with zipfile.ZipFile(
164 archive_path, "w", compression=zipfile.ZIP_DEFLATED
165 ) as archive:
166 for file in output_directory.rglob("*"):
167 # Check the archive doesn't add itself.
168 if not file.samefile(archive_path):
169 archive.write(file, arcname=file.relative_to(output_directory))
172def execute_recipe(
173 recipe: dict,
174 output_directory: Path,
175 style_file: Path = None,
176 plot_resolution: int = None,
177 skip_write: bool = None,
178) -> None:
179 """Parse and executes the steps from a recipe file.
181 Parameters
182 ----------
183 recipe: dict
184 Parsed recipe.
185 output_directory: Path
186 Pathlike indicating desired location of output.
187 style_file: Path, optional
188 Path to a style file.
189 plot_resolution: int, optional
190 Resolution of plots in dpi.
191 skip_write: bool, optional
192 Skip saving processed output alongside plots.
194 Raises
195 ------
196 FileNotFoundError
197 The recipe or input file cannot be found.
198 FileExistsError
199 The output directory as actually a file.
200 ValueError
201 The recipe is not well formed.
202 TypeError
203 The provided recipe is not a stream or Path.
204 """
205 # Create output directory.
206 try:
207 output_directory.mkdir(parents=True, exist_ok=True)
208 except (FileExistsError, NotADirectoryError) as err:
209 logging.error("Output directory is a file. %s", output_directory)
210 raise err
211 steps = recipe["steps"]
213 # Execute the steps in a recipe.
214 original_working_directory = Path.cwd()
215 try:
216 os.chdir(output_directory)
217 logger = logging.getLogger(__name__)
218 diagnostic_log = logging.FileHandler(
219 filename="CSET.log", mode="w", encoding="UTF-8"
220 )
221 diagnostic_log.setFormatter(
222 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
223 )
224 logger.addHandler(diagnostic_log)
225 # Create metadata file used by some steps.
226 if style_file:
227 recipe["style_file_path"] = str(style_file)
228 if plot_resolution:
229 recipe["plot_resolution"] = plot_resolution
230 if skip_write:
231 recipe["skip_write"] = skip_write
232 _write_metadata(recipe)
234 # Execute the recipe.
235 step_input = None
236 for step in steps:
237 step_input = _step_parser(step, step_input)
238 logger.info("Recipe output:\n%s", step_input)
240 logger.info("Creating diagnostic archive.")
241 create_diagnostic_archive()
242 finally:
243 os.chdir(original_working_directory)