Coverage for src/CSET/operators/__init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import zipfile
22from pathlib import Path
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET.operators import (
29 ageofair,
30 aggregate,
31 aviation,
32 collapse,
33 constraints,
34 convection,
35 ensembles,
36 filters,
37 mesoscale,
38 misc,
39 plot,
40 read,
41 regrid,
42 transect,
43 wind,
44 write,
45)
47# Exported operators & functions to use elsewhere.
48__all__ = [
49 "ageofair",
50 "aggregate",
51 "aviation",
52 "collapse",
53 "constraints",
54 "convection",
55 "ensembles",
56 "execute_recipe",
57 "filters",
58 "get_operator",
59 "mesoscale",
60 "misc",
61 "plot",
62 "read",
63 "regrid",
64 "transect",
65 "wind",
66 "write",
67]
69# Stop iris giving a warning whenever it loads something.
70FUTURE.datum_support = True
71# Stop iris giving a warning whenever it saves something.
72FUTURE.save_split_attrs = True
73# Accept microsecond precision in iris times.
74FUTURE.date_microseconds = True
77def get_operator(name: str):
78 """Get an operator by its name.
80 Parameters
81 ----------
82 name: str
83 The name of the desired operator.
85 Returns
86 -------
87 function
88 The named operator.
90 Raises
91 ------
92 ValueError
93 If name is not an operator.
95 Examples
96 --------
97 >>> CSET.operators.get_operator("read.read_cubes")
98 <function read_cubes at 0x7fcf9353c8b0>
99 """
100 logging.debug("get_operator(%s)", name)
101 try:
102 name_sections = name.split(".")
103 operator = CSET.operators
104 for section in name_sections:
105 operator = getattr(operator, section)
106 if callable(operator):
107 return operator
108 else:
109 raise AttributeError
110 except (AttributeError, TypeError) as err:
111 raise ValueError(f"Unknown operator: {name}") from err
114def _write_metadata(recipe: dict):
115 """Write a meta.json file in the CWD."""
116 metadata = recipe.copy()
117 # Remove steps, as not needed, and might contain non-serialisable types.
118 metadata.pop("steps", None)
119 # To remove long variable names with suffix
120 if "title" in metadata:
121 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "")
122 metadata["title"] = metadata["title"].replace("_radiative_timestep", "")
123 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "")
124 with open("meta.json", "wt", encoding="UTF-8") as fp:
125 json.dump(metadata, fp, indent=2)
128def _step_parser(step: dict, step_input: any) -> str:
129 """Execute a recipe step, recursively executing any sub-steps."""
130 logging.debug("Executing step: %s", step)
131 kwargs = {}
132 for key in step.keys():
133 if key == "operator":
134 operator = get_operator(step["operator"])
135 logging.info("operator: %s", step["operator"])
136 elif isinstance(step[key], dict) and "operator" in step[key]:
137 logging.debug("Recursing into argument: %s", key)
138 kwargs[key] = _step_parser(step[key], step_input)
139 else:
140 kwargs[key] = step[key]
141 logging.debug("args: %s", kwargs)
142 logging.debug("step_input: %s", step_input)
143 # If first argument of operator is explicitly defined, use that rather
144 # than step_input. This is known through introspection of the operator.
145 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
146 logging.debug("first_arg: %s", first_arg)
147 if first_arg not in kwargs:
148 logging.debug("first_arg not in kwargs, using step_input.")
149 return operator(step_input, **kwargs)
150 else:
151 logging.debug("first_arg in kwargs.")
152 return operator(**kwargs)
155def create_diagnostic_archive():
156 """Create archive for easy download of plots and data."""
157 output_directory: Path = Path.cwd()
158 archive_path = output_directory / "diagnostic.zip"
159 with zipfile.ZipFile(
160 archive_path, "w", compression=zipfile.ZIP_DEFLATED
161 ) as archive:
162 for file in output_directory.rglob("*"):
163 # Check the archive doesn't add itself.
164 if not file.samefile(archive_path):
165 archive.write(file, arcname=file.relative_to(output_directory))
168def execute_recipe(
169 recipe: dict,
170 output_directory: Path,
171 style_file: Path = None,
172 plot_resolution: int = None,
173 skip_write: bool = None,
174) -> None:
175 """Parse and executes the steps from a recipe file.
177 Parameters
178 ----------
179 recipe: dict
180 Parsed recipe.
181 output_directory: Path
182 Pathlike indicating desired location of output.
183 style_file: Path, optional
184 Path to a style file.
185 plot_resolution: int, optional
186 Resolution of plots in dpi.
187 skip_write: bool, optional
188 Skip saving processed output alongside plots.
190 Raises
191 ------
192 FileNotFoundError
193 The recipe or input file cannot be found.
194 FileExistsError
195 The output directory as actually a file.
196 ValueError
197 The recipe is not well formed.
198 TypeError
199 The provided recipe is not a stream or Path.
200 """
201 # Create output directory.
202 try:
203 output_directory.mkdir(parents=True, exist_ok=True)
204 except (FileExistsError, NotADirectoryError) as err:
205 logging.error("Output directory is a file. %s", output_directory)
206 raise err
207 steps = recipe["steps"]
209 # Execute the steps in a recipe.
210 original_working_directory = Path.cwd()
211 try:
212 os.chdir(output_directory)
213 logger = logging.getLogger(__name__)
214 diagnostic_log = logging.FileHandler(
215 filename="CSET.log", mode="w", encoding="UTF-8"
216 )
217 diagnostic_log.setFormatter(
218 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
219 )
220 logger.addHandler(diagnostic_log)
221 # Create metadata file used by some steps.
222 if style_file:
223 recipe["style_file_path"] = str(style_file)
224 if plot_resolution:
225 recipe["plot_resolution"] = plot_resolution
226 if skip_write:
227 recipe["skip_write"] = skip_write
228 _write_metadata(recipe)
230 # Execute the recipe.
231 step_input = None
232 for step in steps:
233 step_input = _step_parser(step, step_input)
234 logger.info("Recipe output:\n%s", step_input)
236 logger.info("Creating diagnostic archive.")
237 create_diagnostic_archive()
238 finally:
239 os.chdir(original_working_directory)