Coverage for src / CSET / operators / misc.py: 99%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Miscellaneous operators."""
17import itertools
18import logging
19from collections.abc import Iterable
21import iris
22import iris.analysis.calculus
23import numpy as np
24from iris.cube import Cube, CubeList
26from CSET._common import is_increasing, iter_maybe
27from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname
28from CSET.operators.regrid import regrid_onto_cube
31def noop(x, **kwargs):
32 """Return its input without doing anything to it.
34 Useful for constructing diagnostic chains.
36 Arguments
37 ---------
38 x: Any
39 Input to return.
41 Returns
42 -------
43 x: Any
44 The input that was given.
45 """
46 return x
49def remove_attribute(
50 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs
51) -> CubeList:
52 """Remove a cube attribute.
54 If the attribute is not on the cube, the cube is passed through unchanged.
56 Arguments
57 ---------
58 cubes: Cube | CubeList
59 One or more cubes to remove the attribute from.
60 attribute: str | Iterable
61 Name of attribute (or Iterable of names) to remove.
63 Returns
64 -------
65 cubes: CubeList
66 CubeList of cube(s) with the attribute removed.
67 """
68 # Ensure cubes is a CubeList.
69 if not isinstance(cubes, CubeList):
70 cubes = CubeList(iter_maybe(cubes))
72 for cube in cubes:
73 for attr in iter_maybe(attribute):
74 cube.attributes.pop(attr, None)
76 # Combine things that can be merged due to remove removing the
77 # attributes.
78 cubes = cubes.merge()
79 # combine items that can be merged after removing unwanted attributes
80 cubes = cubes.concatenate()
81 return cubes
84def addition(addend_1, addend_2):
85 """Addition of two fields.
87 Parameters
88 ----------
89 addend_1: Cube
90 Any field to have another field added to it.
91 addend_2: Cube
92 Any field to be added to another field.
94 Returns
95 -------
96 Cube
98 Raises
99 ------
100 ValueError, iris.exceptions.NotYetImplementedError
101 When the cubes are not compatible.
103 Notes
104 -----
105 This is a simple operator designed for combination of diagnostics or
106 creating new diagnostics by using recipes.
108 Examples
109 --------
110 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v)
112 """
113 return addend_1 + addend_2
116def subtraction(minuend, subtrahend):
117 """Subtraction of two fields.
119 Parameters
120 ----------
121 minuend: Cube
122 Any field to have another field subtracted from it.
123 subtrahend: Cube
124 Any field to be subtracted from to another field.
126 Returns
127 -------
128 Cube
130 Raises
131 ------
132 ValueError, iris.exceptions.NotYetImplementedError
133 When the cubes are not compatible.
135 Notes
136 -----
137 This is a simple operator designed for combination of diagnostics or
138 creating new diagnostics by using recipes. It can be used for model
139 differences to allow for comparisons between the same field in different
140 models or model configurations.
142 Examples
143 --------
144 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B)
146 """
147 return minuend - subtrahend
150def division(numerator, denominator):
151 """Division of two fields.
153 Parameters
154 ----------
155 numerator: Cube
156 Any field to have the ratio taken with respect to another field.
157 denominator: Cube
158 Any field used to divide another field or provide the reference
159 value in a ratio.
161 Returns
162 -------
163 Cube
165 Raises
166 ------
167 ValueError
168 When the cubes are not compatible.
170 Notes
171 -----
172 This is a simple operator designed for combination of diagnostics or
173 creating new diagnostics by using recipes.
175 Examples
176 --------
177 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux)
179 """
180 return numerator / denominator
183def multiplication(
184 multiplicand: Cube | CubeList, multiplier: Cube | CubeList
185) -> Cube | CubeList:
186 """Multiplication of two fields.
188 Parameters
189 ----------
190 multiplicand: Cube | CubeList
191 Any field to be multiplied by another field.
192 multiplier: Cube | CubeList
193 Any field to be multiplied to another field.
195 Returns
196 -------
197 Cube | CubeList
198 The result of multiplicand x multiplier.
200 Raises
201 ------
202 ValueError
203 When the cubes are not compatible.
205 Notes
206 -----
207 This is a simple operator designed for combination of diagnostics or
208 creating new diagnostics by using recipes. CubeLists are multiplied
209 on a strict ordering (e.g. first cube with first cube).
211 Examples
212 --------
213 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties)
215 """
216 new_cubelist = iris.cube.CubeList([])
217 for cube_a, cube_b in zip(
218 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True
219 ):
220 multiplied_cube = cube_a * cube_b
221 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}")
222 new_cubelist.append(multiplied_cube)
223 if len(new_cubelist) == 1:
224 return new_cubelist[0]
225 else:
226 return new_cubelist
229def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList:
230 """Operator that combines multiple cubes or CubeLists into one.
232 Arguments
233 ---------
234 first: Cube | CubeList
235 First cube or CubeList to merge into CubeList.
236 second: Cube | CubeList
237 Second cube or CubeList to merge into CubeList. This must be a named
238 argument.
239 third: Cube | CubeList
240 There can be any number of additional arguments, they just need unique
241 names.
242 ...
244 Returns
245 -------
246 combined_cubelist: CubeList
247 Combined CubeList containing all cubes/CubeLists.
249 Raises
250 ------
251 TypeError:
252 If the provided arguments are not either a Cube or CubeList.
253 """
254 # Create empty CubeList to store cubes/CubeList.
255 all_cubes = CubeList()
256 # Combine all CubeLists into a single flat iterable.
257 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())):
258 # Check each item is a Cube, erroring if not.
259 if isinstance(item, Cube):
260 # Add cube to CubeList.
261 all_cubes.append(item)
262 else:
263 raise TypeError("Not a Cube or CubeList!", item)
264 return all_cubes
267def difference(cubes: CubeList):
268 """Difference of two fields.
270 Parameters
271 ----------
272 cubes: CubeList
273 A list of exactly two cubes. One must have the cset_comparison_base
274 attribute set to 1, and will be used as the base of the comparison.
276 Returns
277 -------
278 Cube
280 Raises
281 ------
282 ValueError
283 When the cubes are not compatible.
285 Notes
286 -----
287 This is a simple operator designed for combination of diagnostics or
288 creating new diagnostics by using recipes. It can be used for model
289 differences to allow for comparisons between the same field in different
290 models or model configurations.
292 Examples
293 --------
294 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B)
296 """
297 if len(cubes) != 2:
298 raise ValueError("cubes should contain exactly 2 cubes.")
299 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
300 other: Cube = cubes.extract_cube(
301 iris.Constraint(
302 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes
303 )
304 )
306 # If cubes contain a pressure coordinate, ensure it is increasing.
307 for cube in cubes:
308 try:
309 if len(cube.coord("pressure").points) > 2: 309 ↛ 307line 309 didn't jump to line 307 because the condition on line 309 was always true
310 if not is_increasing(cube.coord("pressure").points):
311 cube.data = np.flip(cube.data, axis=cube.coord_dims("pressure")[0])
313 except iris.exceptions.CoordinateNotFoundError:
314 pass
316 # Get spatial coord names.
317 base_lat_name, base_lon_name = get_cube_yxcoordname(base)
318 other_lat_name, other_lon_name = get_cube_yxcoordname(other)
320 # Ensure cubes to compare are on common differencing grid.
321 # This is triggered if either
322 # i) latitude and longitude shapes are not the same. Note grid points
323 # are not compared directly as these can differ through rounding
324 # errors.
325 # ii) or variables are known to often sit on different grid staggering
326 # in different models (e.g. cell center vs cell edge), as is the case
327 # for UM and LFRic comparisons.
328 # In future greater choice of regridding method might be applied depending
329 # on variable type. Linear regridding can in general be appropriate for smooth
330 # variables. Care should be taken with interpretation of differences
331 # given this dependency on regridding.
332 if (
333 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
334 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
335 ) or (
336 base.long_name
337 in [
338 "eastward_wind_at_10m",
339 "northward_wind_at_10m",
340 "northward_wind_at_cell_centres",
341 "eastward_wind_at_cell_centres",
342 "zonal_wind_at_pressure_levels",
343 "meridional_wind_at_pressure_levels",
344 "potential_vorticity_at_pressure_levels",
345 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
346 ]
347 ):
348 logging.debug(
349 "Linear regridding base cube to other grid to compute differences"
350 )
351 base = regrid_onto_cube(base, other, method="Linear")
353 # Figure out if we are comparing between UM and LFRic; flip array if so.
354 base_lat_direction = is_increasing(base.coord(base_lat_name).points)
355 other_lat_direction = is_increasing(other.coord(other_lat_name).points)
356 if base_lat_direction != other_lat_direction:
357 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
359 # Extract just common time points.
360 base, other = _extract_common_time_points(base, other)
362 # Equalise attributes so we can merge.
363 fully_equalise_attributes([base, other])
364 logging.debug("Base: %s\nOther: %s", base, other)
366 # This currently relies on the cubes having the same underlying data layout.
367 difference = base.copy()
369 # Differences don't have a standard name; long name gets a suffix. We are
370 # assuming we can rely on cubes having a long name, so we don't check for
371 # its presents.
372 difference.standard_name = None
373 difference.long_name = (
374 base.long_name if base.long_name else base.name()
375 ) + "_difference"
376 if base.var_name:
377 difference.var_name = base.var_name + "_difference"
378 elif base.standard_name:
379 difference.var_name = base.standard_name + "_difference"
381 difference.data = base.data - other.data
382 return difference
385def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]:
386 """Extract common time points from cubes to allow comparison."""
387 # Get the name of the first non-scalar time coordinate.
388 time_coord = next(
389 map(
390 lambda coord: coord.name(),
391 filter(
392 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"],
393 base.coords(),
394 ),
395 ),
396 None,
397 )
398 if not time_coord:
399 logging.debug("No time coord, skipping equalisation.")
400 return (base, other)
401 base_time_coord = base.coord(time_coord)
402 other_time_coord = other.coord(time_coord)
403 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord)
404 if time_coord == "hour":
405 # We directly compare points when comparing coordinates with
406 # non-absolute units, such as hour. We can't just check the units are
407 # equal as iris automatically converts to datetime objects in the
408 # comparison for certain coordinate names.
409 base_times = base_time_coord.points
410 other_times = other_time_coord.points
411 shared_times = set.intersection(set(base_times), set(other_times))
412 else:
413 # Units don't match, so converting to datetimes for comparison.
414 base_times = base_time_coord.units.num2date(base_time_coord.points)
415 other_times = other_time_coord.units.num2date(other_time_coord.points)
416 shared_times = set.intersection(set(base_times), set(other_times))
417 logging.debug("Shared times: %s", shared_times)
418 time_constraint = iris.Constraint(
419 coord_values={time_coord: lambda cell: cell.point in shared_times}
420 )
421 # Extract points matching the shared times.
422 base = base.extract(time_constraint)
423 other = other.extract(time_constraint)
424 if base is None or other is None:
425 raise ValueError("No common time points found!")
426 return (base, other)
429def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str):
430 """Convert the units of a cube.
432 Arguments
433 ---------
434 cubes: iris.cube.Cube | iris.cube.CubeList
435 A Cube or CubeList of a field for its units to be converted.
437 units: str
438 The unit that the original field is to be converted to. It takes
439 CF compliant units.
441 Returns
442 -------
443 iris.cube.Cube | iris.cube.CubeList
444 The field converted into the specified units.
446 Examples
447 --------
448 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit")
450 """
451 new_cubelist = iris.cube.CubeList([])
452 for cube in iter_maybe(cubes):
453 # Copy cube to keep original data.
454 cube_a = cube.copy()
455 # Convert cube units.
456 cube_a.convert_units(units)
457 new_cubelist.append(cube_a)
458 if len(new_cubelist) == 1:
459 return new_cubelist[0]
460 else:
461 return new_cubelist
464def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str):
465 """Rename a cube.
467 Arguments
468 ---------
469 cubes: iris.cube.Cube | iris.cube.CubeList
470 A Cube or CubeList of a field to be renamed.
472 name: str
473 The new name of the cube. It should be CF compliant.
475 Returns
476 -------
477 iris.cube.Cube | iris.cube.CubeList
478 The renamed field.
480 Notes
481 -----
482 This operator is designed to be used when the output field name does not
483 match expectations or needs to be different to defaults in standard_name, var_name or
484 long_name. For example, if combining masks
485 to create light rain you would like the field to be named "mask_for_light_rain"
486 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0".
488 Examples
489 --------
490 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall"
491 """
492 new_cubelist = iris.cube.CubeList([])
493 for cube in iter_maybe(cubes):
494 cube.rename(name)
495 new_cubelist.append(cube)
496 if len(new_cubelist) == 1:
497 return new_cubelist[0]
498 else:
499 return new_cubelist
502def _slice_cube_on_levels(cube: iris.cube.Cube, coord_name: str, levels: list):
503 """
504 Extract levels from a cube for a given coordinate.
506 Arguments
507 ---------
508 cube: iris.cube.Cube
509 A Cube to be sliced.
511 coord_name: str
512 The coordinate name to be sliced
514 levels: list
515 A list containing points to be extracted from the cube.
517 Returns
518 -------
519 iris.cube.Cube
520 The sliced cube.
521 """
522 coord = cube.coord(coord_name)
523 (dim_index,) = cube.coord_dims(coord)
525 mask = np.isin(coord.points, levels)
527 slicer = [slice(None)] * cube.ndim
528 slicer[dim_index] = mask
530 return cube[tuple(slicer)]
533def extract_common_points(cubes: iris.cube.CubeList, coordinate: str):
534 """
535 Extract common points for a given coordinate between two cubes.
537 Parameters
538 ----------
539 cubes: iris.cube.CubeList
540 CubeList containing exactly two cubes.
542 coordinate: str
543 The coordinate name to be checked for common levels.
545 Returns
546 -------
547 iris.cube.CubeList
548 CubeList containing the two cubes sliced to common levels
549 for the given coordinate.
550 """
551 # Check type of input
552 if type(cubes) is not iris.cube.CubeList:
553 raise TypeError(f"Not a CubeList, got type {type(cubes)}")
555 # Check that only two cubes are passed into function.
556 if len(cubes) != 2:
557 raise ValueError(f"Maximum of two cubes allowed, received {len(cubes)}")
559 # Extract coordinate
560 try:
561 p1 = cubes[0].coord(coordinate)
562 p2 = cubes[1].coord(coordinate)
563 except iris.exceptions.CoordinateNotFoundError as err:
564 raise ValueError(f"Both cubes must have an {coordinate} coordinate") from err
566 # Find common points
567 common_points = np.intersect1d(p1.points, p2.points)
569 # Check that common points is more than zero.
570 if common_points.size == 0:
571 raise ValueError("No common levels found")
573 # Extract common points
574 cube0_common = _slice_cube_on_levels(cubes[0], coordinate, common_points)
575 cube1_common = _slice_cube_on_levels(cubes[1], coordinate, common_points)
577 return iris.cube.CubeList([cube0_common, cube1_common])
580def differentiate(
581 cubes: iris.cube.Cube | iris.cube.CubeList, coordinate: str, **kwargs
582) -> iris.cube.Cube | iris.cube.CubeList:
583 """Differentiate a cube on a specified coordinate.
585 Arguments
586 ---------
587 cubes: iris.cube.Cube | iris.cube.CubeList
588 A Cube or CubeList of a field that is to be differentiated.
590 coordinate: str
591 The coordinate that is to be differentiated over.
593 Returns
594 -------
595 iris.cube.Cube | iris.cube.CubeList
596 The differential of the cube along the specified coordinate.
598 Notes
599 -----
600 The differential is calculated based on a carteisan grid. This calculation
601 is then suitable for vertical and temporal derivatives. It is not sensible
602 for horizontal derivatives if they are based on spherical coordinates (e.g.
603 latitude and longitude). In essence this operator is a CSET wrapper around
604 `iris.analysis.calculus.differentiate <https://scitools-iris.readthedocs.io/en/stable/generated/api/iris.analysis.calculus.html#iris.analysis.calculus.differentiate>`_.
606 Examples
607 --------
608 >>> dT_dz = misc.differentiate(temperature, "altitude")
609 """
610 new_cubelist = iris.cube.CubeList([])
611 for cube in iter_maybe(cubes):
612 dcube = iris.analysis.calculus.differentiate(cube, coordinate)
613 new_cubelist.append(dcube)
614 if len(new_cubelist) == 1:
615 return new_cubelist[0]
616 else:
617 return new_cubelist