Coverage for src/CSET/operators/misc.py: 99%
158 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-19 11:18 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-19 11:18 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Miscellaneous operators."""
17import itertools
18import logging
19from collections.abc import Iterable
20from functools import reduce
22import iris
23import iris.analysis.calculus
24import numpy as np
25from iris.cube import Cube, CubeList
27from CSET._common import is_increasing, iter_maybe
28from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname
29from CSET.operators.regrid import regrid_onto_cube
32def noop(x, **kwargs):
33 """Return its input without doing anything to it.
35 Useful for constructing diagnostic chains.
37 Arguments
38 ---------
39 x: Any
40 Input to return.
42 Returns
43 -------
44 x: Any
45 The input that was given.
46 """
47 return x
50def remove_attribute(
51 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs
52) -> CubeList:
53 """Remove a cube attribute.
55 If the attribute is not on the cube, the cube is passed through unchanged.
57 Arguments
58 ---------
59 cubes: Cube | CubeList
60 One or more cubes to remove the attribute from.
61 attribute: str | Iterable
62 Name of attribute (or Iterable of names) to remove.
64 Returns
65 -------
66 cubes: CubeList
67 CubeList of cube(s) with the attribute removed.
68 """
69 # Ensure cubes is a CubeList.
70 if not isinstance(cubes, CubeList):
71 cubes = CubeList(iter_maybe(cubes))
73 for cube in cubes:
74 for attr in iter_maybe(attribute):
75 cube.attributes.pop(attr, None)
77 # Combine things that can be merged due to remove removing the
78 # attributes.
79 cubes = cubes.merge()
80 # combine items that can be merged after removing unwanted attributes
81 cubes = cubes.concatenate()
82 return cubes
85def addition(addend_1, addend_2):
86 """Addition of two fields.
88 Parameters
89 ----------
90 addend_1: Cube
91 Any field to have another field added to it.
92 addend_2: Cube
93 Any field to be added to another field.
95 Returns
96 -------
97 Cube
99 Raises
100 ------
101 ValueError, iris.exceptions.NotYetImplementedError
102 When the cubes are not compatible.
104 Notes
105 -----
106 This is a simple operator designed for combination of diagnostics or
107 creating new diagnostics by using recipes.
109 Examples
110 --------
111 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v)
113 """
114 return addend_1 + addend_2
117def subtraction(
118 minuend: Cube | CubeList, subtrahend: Cube | CubeList
119) -> Cube | CubeList:
120 """Subtraction of two fields.
122 Parameters
123 ----------
124 minuend: Cube | CubeList
125 Any field to have another field subtracted from it.
126 subtrahend: Cube | CubeList
127 Any field to be subtracted from to another field.
129 Returns
130 -------
131 Cube | CubeList
132 The result of minuend - subtrahend
134 Raises
135 ------
136 ValueError, iris.exceptions.NotYetImplementedError
137 When the cubes are not compatible.
139 Notes
140 -----
141 This is a simple operator designed for combination of diagnostics or
142 creating new diagnostics by using recipes. It can be used for model
143 differences to allow for comparisons between the same field in different
144 models or model configurations.
146 Examples
147 --------
148 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B)
150 """
152 def subtract_preserve_attributes(a, b):
153 result = a - b
154 result.attributes.update(a.attributes)
155 return result
157 # Case where both inputs are single cubes
158 if isinstance(minuend, iris.cube.Cube) and isinstance(subtrahend, iris.cube.Cube):
159 return subtract_preserve_attributes(minuend, subtrahend)
161 # Check if minuend is iterable
162 cubes_a = iter_maybe(minuend)
164 # Case: subtrahend also iterable
165 if isinstance(subtrahend, iris.cube.CubeList):
166 cubes_b = iter_maybe(subtrahend)
167 result = iris.cube.CubeList(
168 [
169 subtract_preserve_attributes(a, b)
170 for a, b in zip(cubes_a, cubes_b, strict=True)
171 ]
172 )
173 else:
174 # Case: subtract single cube from each minuend
175 result = iris.cube.CubeList(
176 [subtract_preserve_attributes(a, subtrahend) for a in cubes_a]
177 )
179 # Return single cube if only one result, else return CubeList
180 return result[0] if len(result) == 1 else result
183def division(numerator, denominator):
184 """Division of two fields.
186 Parameters
187 ----------
188 numerator: Cube
189 Any field to have the ratio taken with respect to another field.
190 denominator: Cube
191 Any field used to divide another field or provide the reference
192 value in a ratio.
194 Returns
195 -------
196 Cube
198 Raises
199 ------
200 ValueError
201 When the cubes are not compatible.
203 Notes
204 -----
205 This is a simple operator designed for combination of diagnostics or
206 creating new diagnostics by using recipes.
208 Examples
209 --------
210 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux)
212 """
213 return numerator / denominator
216def multiplication(
217 multiplicand: Cube | CubeList, multiplier: Cube | CubeList
218) -> Cube | CubeList:
219 """Multiplication of two fields.
221 Parameters
222 ----------
223 multiplicand: Cube | CubeList
224 Any field to be multiplied by another field.
225 multiplier: Cube | CubeList
226 Any field to be multiplied to another field.
228 Returns
229 -------
230 Cube | CubeList
231 The result of multiplicand x multiplier.
233 Raises
234 ------
235 ValueError
236 When the cubes are not compatible.
238 Notes
239 -----
240 This is a simple operator designed for combination of diagnostics or
241 creating new diagnostics by using recipes. CubeLists are multiplied
242 on a strict ordering (e.g. first cube with first cube).
244 Examples
245 --------
246 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties)
248 """
249 new_cubelist = iris.cube.CubeList([])
250 for cube_a, cube_b in zip(
251 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True
252 ):
253 multiplied_cube = cube_a * cube_b
254 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}")
255 new_cubelist.append(multiplied_cube)
256 if len(new_cubelist) == 1:
257 return new_cubelist[0]
258 else:
259 return new_cubelist
262def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList:
263 """Operator that combines multiple cubes or CubeLists into one.
265 Arguments
266 ---------
267 first: Cube | CubeList
268 First cube or CubeList to merge into CubeList.
269 second: Cube | CubeList
270 Second cube or CubeList to merge into CubeList. This must be a named
271 argument.
272 third: Cube | CubeList
273 There can be any number of additional arguments, they just need unique
274 names.
275 ...
277 Returns
278 -------
279 combined_cubelist: CubeList
280 Combined CubeList containing all cubes/CubeLists.
282 Raises
283 ------
284 TypeError:
285 If the provided arguments are not either a Cube or CubeList.
286 """
287 # Create empty CubeList to store cubes/CubeList.
288 all_cubes = CubeList()
289 # Combine all CubeLists into a single flat iterable.
290 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())):
291 # Check each item is a Cube, erroring if not.
292 if isinstance(item, Cube):
293 # Add cube to CubeList.
294 all_cubes.append(item)
295 else:
296 raise TypeError("Not a Cube or CubeList!", item)
297 return all_cubes
300def difference(cubes: CubeList):
301 """Difference of two fields.
303 Parameters
304 ----------
305 cubes: CubeList
306 A list of exactly two cubes. One must have the cset_comparison_base
307 attribute set to 1, and will be used as the base of the comparison.
309 Returns
310 -------
311 Cube
313 Raises
314 ------
315 ValueError
316 When the cubes are not compatible.
318 Notes
319 -----
320 This is a simple operator designed for combination of diagnostics or
321 creating new diagnostics by using recipes. It can be used for model
322 differences to allow for comparisons between the same field in different
323 models or model configurations.
325 Examples
326 --------
327 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B)
329 """
330 if len(cubes) != 2:
331 raise ValueError("cubes should contain exactly 2 cubes.")
332 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
333 other: Cube = cubes.extract_cube(
334 iris.Constraint(
335 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes
336 )
337 )
339 # If cubes contain a pressure coordinate, ensure it is increasing.
340 for cube in cubes:
341 try:
342 if len(cube.coord("pressure").points) > 2: 342 ↛ 340line 342 didn't jump to line 340 because the condition on line 342 was always true
343 if not is_increasing(cube.coord("pressure").points):
344 cube.data = np.flip(cube.data, axis=cube.coord_dims("pressure")[0])
346 except iris.exceptions.CoordinateNotFoundError:
347 pass
349 # Get spatial coord names.
350 base_lat_name, base_lon_name = get_cube_yxcoordname(base)
351 other_lat_name, other_lon_name = get_cube_yxcoordname(other)
353 # Ensure cubes to compare are on common differencing grid.
354 # This is triggered if either
355 # i) latitude and longitude shapes are not the same. Note grid points
356 # are not compared directly as these can differ through rounding
357 # errors.
358 # ii) or variables are known to often sit on different grid staggering
359 # in different models (e.g. cell center vs cell edge), as is the case
360 # for UM and LFRic comparisons.
361 # In future greater choice of regridding method might be applied depending
362 # on variable type. Linear regridding can in general be appropriate for smooth
363 # variables. Care should be taken with interpretation of differences
364 # given this dependency on regridding.
365 if (
366 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
367 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
368 ) or (
369 base.long_name
370 in [
371 "eastward_wind_at_10m",
372 "northward_wind_at_10m",
373 "northward_wind_at_cell_centres",
374 "eastward_wind_at_cell_centres",
375 "zonal_wind_at_pressure_levels",
376 "meridional_wind_at_pressure_levels",
377 "potential_vorticity_at_pressure_levels",
378 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
379 ]
380 ):
381 logging.debug(
382 "Linear regridding base cube to other grid to compute differences"
383 )
384 base = regrid_onto_cube(base, other, method="Linear")
386 # Figure out if we are comparing between UM and LFRic; flip array if so.
387 base_lat_direction = is_increasing(base.coord(base_lat_name).points)
388 other_lat_direction = is_increasing(other.coord(other_lat_name).points)
389 if base_lat_direction != other_lat_direction:
390 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
392 # Extract just common time points.
393 base, other = _extract_common_time_points(base, other)
395 # Equalise attributes so we can merge.
396 fully_equalise_attributes([base, other])
397 logging.debug("Base: %s\nOther: %s", base, other)
399 # This currently relies on the cubes having the same underlying data layout.
400 difference = base.copy()
402 # Differences don't have a standard name; long name gets a suffix. We are
403 # assuming we can rely on cubes having a long name, so we don't check for
404 # its presents.
405 difference.standard_name = None
406 difference.long_name = (
407 base.long_name if base.long_name else base.name()
408 ) + "_difference"
409 if base.var_name:
410 difference.var_name = base.var_name + "_difference"
411 elif base.standard_name:
412 difference.var_name = base.standard_name + "_difference"
414 difference.data = other.data - base.data
415 return difference
418def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]:
419 """Extract common time points from cubes to allow comparison."""
420 # Get the name of the first non-scalar time coordinate.
421 time_coord = next(
422 map(
423 lambda coord: coord.name(),
424 filter(
425 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"],
426 base.coords(),
427 ),
428 ),
429 None,
430 )
431 if not time_coord:
432 logging.debug("No time coord, skipping equalisation.")
433 return (base, other)
434 base_time_coord = base.coord(time_coord)
435 other_time_coord = other.coord(time_coord)
436 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord)
437 if time_coord == "hour":
438 # We directly compare points when comparing coordinates with
439 # non-absolute units, such as hour. We can't just check the units are
440 # equal as iris automatically converts to datetime objects in the
441 # comparison for certain coordinate names.
442 base_times = base_time_coord.points
443 other_times = other_time_coord.points
444 shared_times = set.intersection(set(base_times), set(other_times))
445 else:
446 # Units don't match, so converting to datetimes for comparison.
447 base_times = base_time_coord.units.num2date(base_time_coord.points)
448 other_times = other_time_coord.units.num2date(other_time_coord.points)
449 shared_times = set.intersection(set(base_times), set(other_times))
450 logging.debug("Shared times: %s", shared_times)
451 time_constraint = iris.Constraint(
452 coord_values={
453 time_coord: lambda cell, shared_times=shared_times: (
454 cell.point in shared_times
455 )
456 }
457 )
458 # Extract points matching the shared times.
459 base = base.extract(time_constraint)
460 other = other.extract(time_constraint)
461 if base is None or other is None:
462 raise ValueError("No common time points found!")
463 return (base, other)
466def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str):
467 """Convert the units of a cube.
469 Arguments
470 ---------
471 cubes: iris.cube.Cube | iris.cube.CubeList
472 A Cube or CubeList of a field for its units to be converted.
474 units: str
475 The unit that the original field is to be converted to. It takes
476 CF compliant units.
478 Returns
479 -------
480 iris.cube.Cube | iris.cube.CubeList
481 The field converted into the specified units.
483 Examples
484 --------
485 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit")
487 """
488 new_cubelist = iris.cube.CubeList([])
489 for cube in iter_maybe(cubes):
490 # Copy cube to keep original data.
491 cube_a = cube.copy()
492 # Convert cube units.
493 cube_a.convert_units(units)
494 new_cubelist.append(cube_a)
495 if len(new_cubelist) == 1:
496 return new_cubelist[0]
497 else:
498 return new_cubelist
501def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str):
502 """Rename a cube.
504 Arguments
505 ---------
506 cubes: iris.cube.Cube | iris.cube.CubeList
507 A Cube or CubeList of a field to be renamed.
509 name: str
510 The new name of the cube. It should be CF compliant.
512 Returns
513 -------
514 iris.cube.Cube | iris.cube.CubeList
515 The renamed field.
517 Notes
518 -----
519 This operator is designed to be used when the output field name does not
520 match expectations or needs to be different to defaults in standard_name, var_name or
521 long_name. For example, if combining masks
522 to create light rain you would like the field to be named "mask_for_light_rain"
523 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0".
525 Examples
526 --------
527 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall"
528 """
529 new_cubelist = iris.cube.CubeList([])
530 for cube in iter_maybe(cubes):
531 cube.rename(name)
532 new_cubelist.append(cube)
533 if len(new_cubelist) == 1:
534 return new_cubelist[0]
535 else:
536 return new_cubelist
539def _slice_cube_on_levels(cube: iris.cube.Cube, coord_name: str, levels: list):
540 """
541 Extract levels from a cube for a given coordinate.
543 Arguments
544 ---------
545 cube: iris.cube.Cube
546 A Cube to be sliced.
548 coord_name: str
549 The coordinate name to be sliced
551 levels: list
552 A list containing points to be extracted from the cube.
554 Returns
555 -------
556 iris.cube.Cube
557 The sliced cube.
558 """
559 coord = cube.coord(coord_name)
560 (dim_index,) = cube.coord_dims(coord)
562 mask = np.isin(coord.points, levels)
564 slicer = [slice(None)] * cube.ndim
565 slicer[dim_index] = mask
567 return cube[tuple(slicer)]
570def extract_common_points(cubes: iris.cube.CubeList, coordinate: str):
571 """
572 Extract common points for a given coordinate between cubes in a CubeList.
574 Parameters
575 ----------
576 cubes: iris.cube.CubeList
577 CubeList containing cubes for which to extract common points.
579 coordinate: str
580 The coordinate name to be checked for common points.
582 Returns
583 -------
584 iris.cube.CubeList
585 CubeList containing the two cubes sliced to common points
586 for the given coordinate.
587 """
588 # Check type of input
589 if type(cubes) is not iris.cube.CubeList:
590 raise TypeError(f"Not a CubeList, got type {type(cubes)}")
592 # Extract coordinate
593 try:
594 points_list = []
595 for cube in cubes:
596 points_list.append(cube.coord(coordinate).points)
597 except iris.exceptions.CoordinateNotFoundError as err:
598 raise ValueError(f"Both cubes must have an {coordinate} coordinate") from err
600 # Find common points
601 common_points = reduce(np.intersect1d, points_list)
603 # Check that common points is more than zero.
604 if common_points.size == 0:
605 raise ValueError("No common levels found")
607 # Extract common points
608 common_cubes = iris.cube.CubeList()
609 for cube in cubes:
610 common_cubes.append(_slice_cube_on_levels(cube, coordinate, common_points))
612 return common_cubes
615def differentiate(
616 cubes: iris.cube.Cube | iris.cube.CubeList, coordinate: str, **kwargs
617) -> iris.cube.Cube | iris.cube.CubeList:
618 """Differentiate a cube on a specified coordinate.
620 Arguments
621 ---------
622 cubes: iris.cube.Cube | iris.cube.CubeList
623 A Cube or CubeList of a field that is to be differentiated.
625 coordinate: str
626 The coordinate that is to be differentiated over.
628 Returns
629 -------
630 iris.cube.Cube | iris.cube.CubeList
631 The differential of the cube along the specified coordinate.
633 Notes
634 -----
635 The differential is calculated based on a carteisan grid. This calculation
636 is then suitable for vertical and temporal derivatives. It is not sensible
637 for horizontal derivatives if they are based on spherical coordinates (e.g.
638 latitude and longitude). In essence this operator is a CSET wrapper around
639 `iris.analysis.calculus.differentiate <https://scitools-iris.readthedocs.io/en/stable/generated/api/iris.analysis.calculus.html#iris.analysis.calculus.differentiate>`_.
641 Examples
642 --------
643 >>> dT_dz = misc.differentiate(temperature, "altitude")
644 """
645 new_cubelist = iris.cube.CubeList([])
646 for cube in iter_maybe(cubes):
647 dcube = iris.analysis.calculus.differentiate(cube, coordinate)
648 new_cubelist.append(dcube)
649 if len(new_cubelist) == 1:
650 return new_cubelist[0]
651 else:
652 return new_cubelist