Coverage for src / CSET / operators / misc.py: 100%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 14:31 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Miscellaneous operators."""
17import itertools
18import logging
19from collections.abc import Iterable
21import iris
22import numpy as np
23from iris.cube import Cube, CubeList
25from CSET._common import is_increasing, iter_maybe
26from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname
27from CSET.operators.regrid import regrid_onto_cube
30def noop(x, **kwargs):
31 """Return its input without doing anything to it.
33 Useful for constructing diagnostic chains.
35 Arguments
36 ---------
37 x: Any
38 Input to return.
40 Returns
41 -------
42 x: Any
43 The input that was given.
44 """
45 return x
48def remove_attribute(
49 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs
50) -> CubeList:
51 """Remove a cube attribute.
53 If the attribute is not on the cube, the cube is passed through unchanged.
55 Arguments
56 ---------
57 cubes: Cube | CubeList
58 One or more cubes to remove the attribute from.
59 attribute: str | Iterable
60 Name of attribute (or Iterable of names) to remove.
62 Returns
63 -------
64 cubes: CubeList
65 CubeList of cube(s) with the attribute removed.
66 """
67 # Ensure cubes is a CubeList.
68 if not isinstance(cubes, CubeList):
69 cubes = CubeList(iter_maybe(cubes))
71 for cube in cubes:
72 for attr in iter_maybe(attribute):
73 cube.attributes.pop(attr, None)
74 return cubes
77def addition(addend_1, addend_2):
78 """Addition of two fields.
80 Parameters
81 ----------
82 addend_1: Cube
83 Any field to have another field added to it.
84 addend_2: Cube
85 Any field to be added to another field.
87 Returns
88 -------
89 Cube
91 Raises
92 ------
93 ValueError, iris.exceptions.NotYetImplementedError
94 When the cubes are not compatible.
96 Notes
97 -----
98 This is a simple operator designed for combination of diagnostics or
99 creating new diagnostics by using recipes.
101 Examples
102 --------
103 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v)
105 """
106 return addend_1 + addend_2
109def subtraction(minuend, subtrahend):
110 """Subtraction of two fields.
112 Parameters
113 ----------
114 minuend: Cube
115 Any field to have another field subtracted from it.
116 subtrahend: Cube
117 Any field to be subtracted from to another field.
119 Returns
120 -------
121 Cube
123 Raises
124 ------
125 ValueError, iris.exceptions.NotYetImplementedError
126 When the cubes are not compatible.
128 Notes
129 -----
130 This is a simple operator designed for combination of diagnostics or
131 creating new diagnostics by using recipes. It can be used for model
132 differences to allow for comparisons between the same field in different
133 models or model configurations.
135 Examples
136 --------
137 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B)
139 """
140 return minuend - subtrahend
143def division(numerator, denominator):
144 """Division of two fields.
146 Parameters
147 ----------
148 numerator: Cube
149 Any field to have the ratio taken with respect to another field.
150 denominator: Cube
151 Any field used to divide another field or provide the reference
152 value in a ratio.
154 Returns
155 -------
156 Cube
158 Raises
159 ------
160 ValueError
161 When the cubes are not compatible.
163 Notes
164 -----
165 This is a simple operator designed for combination of diagnostics or
166 creating new diagnostics by using recipes.
168 Examples
169 --------
170 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux)
172 """
173 return numerator / denominator
176def multiplication(
177 multiplicand: Cube | CubeList, multiplier: Cube | CubeList
178) -> Cube | CubeList:
179 """Multiplication of two fields.
181 Parameters
182 ----------
183 multiplicand: Cube | CubeList
184 Any field to be multiplied by another field.
185 multiplier: Cube | CubeList
186 Any field to be multiplied to another field.
188 Returns
189 -------
190 Cube | CubeList
191 The result of multiplicand x multiplier.
193 Raises
194 ------
195 ValueError
196 When the cubes are not compatible.
198 Notes
199 -----
200 This is a simple operator designed for combination of diagnostics or
201 creating new diagnostics by using recipes. CubeLists are multiplied
202 on a strict ordering (e.g. first cube with first cube).
204 Examples
205 --------
206 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties)
208 """
209 new_cubelist = iris.cube.CubeList([])
210 for cube_a, cube_b in zip(
211 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True
212 ):
213 multiplied_cube = cube_a * cube_b
214 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}")
215 new_cubelist.append(multiplied_cube)
216 if len(new_cubelist) == 1:
217 return new_cubelist[0]
218 else:
219 return new_cubelist
222def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList:
223 """Operator that combines multiple cubes or CubeLists into one.
225 Arguments
226 ---------
227 first: Cube | CubeList
228 First cube or CubeList to merge into CubeList.
229 second: Cube | CubeList
230 Second cube or CubeList to merge into CubeList. This must be a named
231 argument.
232 third: Cube | CubeList
233 There can be any number of additional arguments, they just need unique
234 names.
235 ...
237 Returns
238 -------
239 combined_cubelist: CubeList
240 Combined CubeList containing all cubes/CubeLists.
242 Raises
243 ------
244 TypeError:
245 If the provided arguments are not either a Cube or CubeList.
246 """
247 # Create empty CubeList to store cubes/CubeList.
248 all_cubes = CubeList()
249 # Combine all CubeLists into a single flat iterable.
250 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())):
251 # Check each item is a Cube, erroring if not.
252 if isinstance(item, Cube):
253 # Add cube to CubeList.
254 all_cubes.append(item)
255 else:
256 raise TypeError("Not a Cube or CubeList!", item)
257 return all_cubes
260def difference(cubes: CubeList):
261 """Difference of two fields.
263 Parameters
264 ----------
265 cubes: CubeList
266 A list of exactly two cubes. One must have the cset_comparison_base
267 attribute set to 1, and will be used as the base of the comparison.
269 Returns
270 -------
271 Cube
273 Raises
274 ------
275 ValueError
276 When the cubes are not compatible.
278 Notes
279 -----
280 This is a simple operator designed for combination of diagnostics or
281 creating new diagnostics by using recipes. It can be used for model
282 differences to allow for comparisons between the same field in different
283 models or model configurations.
285 Examples
286 --------
287 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B)
289 """
290 if len(cubes) != 2:
291 raise ValueError("cubes should contain exactly 2 cubes.")
292 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
293 other: Cube = cubes.extract_cube(
294 iris.Constraint(
295 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes
296 )
297 )
299 # Get spatial coord names.
300 base_lat_name, base_lon_name = get_cube_yxcoordname(base)
301 other_lat_name, other_lon_name = get_cube_yxcoordname(other)
303 # Ensure cubes to compare are on common differencing grid.
304 # This is triggered if either
305 # i) latitude and longitude shapes are not the same. Note grid points
306 # are not compared directly as these can differ through rounding
307 # errors.
308 # ii) or variables are known to often sit on different grid staggering
309 # in different models (e.g. cell center vs cell edge), as is the case
310 # for UM and LFRic comparisons.
311 # In future greater choice of regridding method might be applied depending
312 # on variable type. Linear regridding can in general be appropriate for smooth
313 # variables. Care should be taken with interpretation of differences
314 # given this dependency on regridding.
315 if (
316 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
317 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
318 ) or (
319 base.long_name
320 in [
321 "eastward_wind_at_10m",
322 "northward_wind_at_10m",
323 "northward_wind_at_cell_centres",
324 "eastward_wind_at_cell_centres",
325 "zonal_wind_at_pressure_levels",
326 "meridional_wind_at_pressure_levels",
327 "potential_vorticity_at_pressure_levels",
328 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
329 ]
330 ):
331 logging.debug(
332 "Linear regridding base cube to other grid to compute differences"
333 )
334 base = regrid_onto_cube(base, other, method="Linear")
336 # Figure out if we are comparing between UM and LFRic; flip array if so.
337 base_lat_direction = is_increasing(base.coord(base_lat_name).points)
338 other_lat_direction = is_increasing(other.coord(other_lat_name).points)
339 if base_lat_direction != other_lat_direction:
340 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
342 # Extract just common time points.
343 base, other = _extract_common_time_points(base, other)
345 # Equalise attributes so we can merge.
346 fully_equalise_attributes([base, other])
347 logging.debug("Base: %s\nOther: %s", base, other)
349 # This currently relies on the cubes having the same underlying data layout.
350 difference = base.copy()
352 # Differences don't have a standard name; long name gets a suffix. We are
353 # assuming we can rely on cubes having a long name, so we don't check for
354 # its presents.
355 difference.standard_name = None
356 difference.long_name = (
357 base.long_name if base.long_name else base.name()
358 ) + "_difference"
359 if base.var_name:
360 difference.var_name = base.var_name + "_difference"
361 elif base.standard_name:
362 difference.var_name = base.standard_name + "_difference"
364 difference.data = base.data - other.data
365 return difference
368def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]:
369 """Extract common time points from cubes to allow comparison."""
370 # Get the name of the first non-scalar time coordinate.
371 time_coord = next(
372 map(
373 lambda coord: coord.name(),
374 filter(
375 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"],
376 base.coords(),
377 ),
378 ),
379 None,
380 )
381 if not time_coord:
382 logging.debug("No time coord, skipping equalisation.")
383 return (base, other)
384 base_time_coord = base.coord(time_coord)
385 other_time_coord = other.coord(time_coord)
386 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord)
387 if time_coord == "hour":
388 # We directly compare points when comparing coordinates with
389 # non-absolute units, such as hour. We can't just check the units are
390 # equal as iris automatically converts to datetime objects in the
391 # comparison for certain coordinate names.
392 base_times = base_time_coord.points
393 other_times = other_time_coord.points
394 shared_times = set.intersection(set(base_times), set(other_times))
395 else:
396 # Units don't match, so converting to datetimes for comparison.
397 base_times = base_time_coord.units.num2date(base_time_coord.points)
398 other_times = other_time_coord.units.num2date(other_time_coord.points)
399 shared_times = set.intersection(set(base_times), set(other_times))
400 logging.debug("Shared times: %s", shared_times)
401 time_constraint = iris.Constraint(
402 coord_values={time_coord: lambda cell: cell.point in shared_times}
403 )
404 # Extract points matching the shared times.
405 base = base.extract(time_constraint)
406 other = other.extract(time_constraint)
407 if base is None or other is None:
408 raise ValueError("No common time points found!")
409 return (base, other)
412def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str):
413 """Convert the units of a cube.
415 Arguments
416 ---------
417 cubes: iris.cube.Cube | iris.cube.CubeList
418 A Cube or CubeList of a field for its units to be converted.
420 units: str
421 The unit that the original field is to be converted to. It takes
422 CF compliant units.
424 Returns
425 -------
426 iris.cube.Cube | iris.cube.CubeList
427 The field converted into the specified units.
429 Examples
430 --------
431 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit")
433 """
434 new_cubelist = iris.cube.CubeList([])
435 for cube in iter_maybe(cubes):
436 # Copy cube to keep original data.
437 cube_a = cube.copy()
438 # Convert cube units.
439 cube_a.convert_units(units)
440 new_cubelist.append(cube_a)
441 if len(new_cubelist) == 1:
442 return new_cubelist[0]
443 else:
444 return new_cubelist
447def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str):
448 """Rename a cube.
450 Arguments
451 ---------
452 cubes: iris.cube.Cube | iris.cube.CubeList
453 A Cube or CubeList of a field to be renamed.
455 name: str
456 The new name of the cube. It should be CF compliant.
458 Returns
459 -------
460 iris.cube.Cube | iris.cube.CubeList
461 The renamed field.
463 Notes
464 -----
465 This operator is designed to be used when the output field name does not
466 match expectations or needs to be different to defaults in standard_name, var_name or
467 long_name. For example, if combining masks
468 to create light rain you would like the field to be named "mask_for_light_rain"
469 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0".
471 Examples
472 --------
473 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall"
474 """
475 new_cubelist = iris.cube.CubeList([])
476 for cube in iter_maybe(cubes):
477 cube.rename(name)
478 new_cubelist.append(cube)
479 if len(new_cubelist) == 1:
480 return new_cubelist[0]
481 else:
482 return new_cubelist