Coverage for src / CSET / operators / __init__.py: 100%
89 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 16:33 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 16:33 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Subpackage contains all of CSET's operators."""
17import inspect
18import json
19import logging
20import os
21import zipfile
22from pathlib import Path
24from iris import FUTURE
26# Import operators here so they are exported for use by recipes.
27import CSET.operators
28from CSET.operators import (
29 ageofair,
30 aggregate,
31 aviation,
32 collapse,
33 constraints,
34 convection,
35 ensembles,
36 filters,
37 imageprocessing,
38 mesoscale,
39 misc,
40 plot,
41 read,
42 regrid,
43 transect,
44 wind,
45 write,
46)
48# Exported operators & functions to use elsewhere.
49__all__ = [
50 "ageofair",
51 "aggregate",
52 "aviation",
53 "collapse",
54 "constraints",
55 "convection",
56 "ensembles",
57 "execute_recipe",
58 "filters",
59 "get_operator",
60 "imageprocessing",
61 "mesoscale",
62 "misc",
63 "plot",
64 "read",
65 "regrid",
66 "transect",
67 "wind",
68 "write",
69]
71# Stop iris giving a warning whenever it loads something.
72FUTURE.datum_support = True
73# Stop iris giving a warning whenever it saves something.
74FUTURE.save_split_attrs = True
75# Accept microsecond precision in iris times.
76FUTURE.date_microseconds = True
79def get_operator(name: str):
80 """Get an operator by its name.
82 Parameters
83 ----------
84 name: str
85 The name of the desired operator.
87 Returns
88 -------
89 function
90 The named operator.
92 Raises
93 ------
94 ValueError
95 If name is not an operator.
97 Examples
98 --------
99 >>> CSET.operators.get_operator("read.read_cubes")
100 <function read_cubes at 0x7fcf9353c8b0>
101 """
102 logging.debug("get_operator(%s)", name)
103 try:
104 name_sections = name.split(".")
105 operator = CSET.operators
106 for section in name_sections:
107 operator = getattr(operator, section)
108 if callable(operator):
109 return operator
110 else:
111 raise AttributeError
112 except (AttributeError, TypeError) as err:
113 raise ValueError(f"Unknown operator: {name}") from err
116def _write_metadata(recipe: dict):
117 """Write a meta.json file in the CWD."""
118 metadata = recipe.copy()
119 # Remove steps, as not needed, and might contain non-serialisable types.
120 metadata.pop("steps", None)
121 # To remove long variable names with suffix
122 if "title" in metadata:
123 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "")
124 metadata["title"] = metadata["title"].replace("_radiative_timestep", "")
125 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "")
126 with open("meta.json", "wt", encoding="UTF-8") as fp:
127 json.dump(metadata, fp, indent=2)
130def _step_parser(step: dict, step_input: any) -> str:
131 """Execute a recipe step, recursively executing any sub-steps."""
132 logging.debug("Executing step: %s", step)
133 kwargs = {}
134 for key in step.keys():
135 if key == "operator":
136 operator = get_operator(step["operator"])
137 logging.info("operator: %s", step["operator"])
138 elif isinstance(step[key], dict) and "operator" in step[key]:
139 logging.debug("Recursing into argument: %s", key)
140 kwargs[key] = _step_parser(step[key], step_input)
141 else:
142 kwargs[key] = step[key]
143 logging.debug("args: %s", kwargs)
144 logging.debug("step_input: %s", step_input)
145 # If first argument of operator is explicitly defined, use that rather
146 # than step_input. This is known through introspection of the operator.
147 first_arg = next(iter(inspect.signature(operator).parameters.keys()))
148 logging.debug("first_arg: %s", first_arg)
149 if first_arg not in kwargs:
150 logging.debug("first_arg not in kwargs, using step_input.")
151 return operator(step_input, **kwargs)
152 else:
153 logging.debug("first_arg in kwargs.")
154 return operator(**kwargs)
157def create_diagnostic_archive():
158 """Create archive for easy download of plots and data."""
159 output_directory: Path = Path.cwd()
160 archive_path = output_directory / "diagnostic.zip"
161 with zipfile.ZipFile(
162 archive_path, "w", compression=zipfile.ZIP_DEFLATED
163 ) as archive:
164 for file in output_directory.rglob("*"):
165 # Check the archive doesn't add itself.
166 if not file.samefile(archive_path):
167 archive.write(file, arcname=file.relative_to(output_directory))
170def execute_recipe(
171 recipe: dict,
172 output_directory: Path,
173 style_file: Path = None,
174 plot_resolution: int = None,
175 skip_write: bool = None,
176) -> None:
177 """Parse and executes the steps from a recipe file.
179 Parameters
180 ----------
181 recipe: dict
182 Parsed recipe.
183 output_directory: Path
184 Pathlike indicating desired location of output.
185 style_file: Path, optional
186 Path to a style file.
187 plot_resolution: int, optional
188 Resolution of plots in dpi.
189 skip_write: bool, optional
190 Skip saving processed output alongside plots.
192 Raises
193 ------
194 FileNotFoundError
195 The recipe or input file cannot be found.
196 FileExistsError
197 The output directory as actually a file.
198 ValueError
199 The recipe is not well formed.
200 TypeError
201 The provided recipe is not a stream or Path.
202 """
203 # Create output directory.
204 try:
205 output_directory.mkdir(parents=True, exist_ok=True)
206 except (FileExistsError, NotADirectoryError) as err:
207 logging.error("Output directory is a file. %s", output_directory)
208 raise err
209 steps = recipe["steps"]
211 # Execute the steps in a recipe.
212 original_working_directory = Path.cwd()
213 try:
214 os.chdir(output_directory)
215 logger = logging.getLogger(__name__)
216 diagnostic_log = logging.FileHandler(
217 filename="CSET.log", mode="w", encoding="UTF-8"
218 )
219 diagnostic_log.setFormatter(
220 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
221 )
222 logger.addHandler(diagnostic_log)
223 # Create metadata file used by some steps.
224 if style_file:
225 recipe["style_file_path"] = str(style_file)
226 if plot_resolution:
227 recipe["plot_resolution"] = plot_resolution
228 if skip_write:
229 recipe["skip_write"] = skip_write
230 _write_metadata(recipe)
232 # Execute the recipe.
233 step_input = None
234 for step in steps:
235 step_input = _step_parser(step, step_input)
236 logger.info("Recipe output:\n%s", step_input)
238 logger.info("Creating diagnostic archive.")
239 create_diagnostic_archive()
240 finally:
241 os.chdir(original_working_directory)