Coverage for src / CSET / operators / misc.py: 90%
164 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 15:01 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 15:01 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Miscellaneous operators."""
17import itertools
18import logging
19from collections.abc import Iterable
21import iris
22import iris.analysis.calculus
23import numpy as np
24from cf_units import Unit
25from iris.cube import Cube, CubeList
27from CSET._common import is_increasing, iter_maybe
28from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname
29from CSET.operators.regrid import regrid_onto_cube
32def noop(x, **kwargs):
33 """Return its input without doing anything to it.
35 Useful for constructing diagnostic chains.
37 Arguments
38 ---------
39 x: Any
40 Input to return.
42 Returns
43 -------
44 x: Any
45 The input that was given.
46 """
47 return x
50def remove_attribute(
51 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs
52) -> CubeList:
53 """Remove a cube attribute.
55 If the attribute is not on the cube, the cube is passed through unchanged.
57 Arguments
58 ---------
59 cubes: Cube | CubeList
60 One or more cubes to remove the attribute from.
61 attribute: str | Iterable
62 Name of attribute (or Iterable of names) to remove.
64 Returns
65 -------
66 cubes: CubeList
67 CubeList of cube(s) with the attribute removed.
68 """
69 # Ensure cubes is a CubeList.
70 if not isinstance(cubes, CubeList):
71 cubes = CubeList(iter_maybe(cubes))
73 for cube in cubes:
74 for attr in iter_maybe(attribute):
75 cube.attributes.pop(attr, None)
77 # Combine things that can be merged due to remove removing the
78 # attributes.
79 cubes = cubes.merge()
80 # combine items that can be merged after removing unwanted attributes
81 cubes = cubes.concatenate()
82 return cubes
85def addition(addend_1, addend_2):
86 """Addition of two fields.
88 Parameters
89 ----------
90 addend_1: Cube
91 Any field to have another field added to it.
92 addend_2: Cube
93 Any field to be added to another field.
95 Returns
96 -------
97 Cube
99 Raises
100 ------
101 ValueError, iris.exceptions.NotYetImplementedError
102 When the cubes are not compatible.
104 Notes
105 -----
106 This is a simple operator designed for combination of diagnostics or
107 creating new diagnostics by using recipes.
109 Examples
110 --------
111 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v)
113 """
114 return addend_1 + addend_2
117def subtraction(minuend, subtrahend):
118 """Subtraction of two fields.
120 Parameters
121 ----------
122 minuend: Cube
123 Any field to have another field subtracted from it.
124 subtrahend: Cube
125 Any field to be subtracted from to another field.
127 Returns
128 -------
129 Cube
131 Raises
132 ------
133 ValueError, iris.exceptions.NotYetImplementedError
134 When the cubes are not compatible.
136 Notes
137 -----
138 This is a simple operator designed for combination of diagnostics or
139 creating new diagnostics by using recipes. It can be used for model
140 differences to allow for comparisons between the same field in different
141 models or model configurations.
143 Examples
144 --------
145 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B)
147 """
148 return minuend - subtrahend
151def division(numerator, denominator):
152 """Division of two fields.
154 Parameters
155 ----------
156 numerator: Cube
157 Any field to have the ratio taken with respect to another field.
158 denominator: Cube
159 Any field used to divide another field or provide the reference
160 value in a ratio.
162 Returns
163 -------
164 Cube
166 Raises
167 ------
168 ValueError
169 When the cubes are not compatible.
171 Notes
172 -----
173 This is a simple operator designed for combination of diagnostics or
174 creating new diagnostics by using recipes.
176 Examples
177 --------
178 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux)
180 """
181 return numerator / denominator
184def multiplication(
185 multiplicand: Cube | CubeList, multiplier: Cube | CubeList
186) -> Cube | CubeList:
187 """Multiplication of two fields.
189 Parameters
190 ----------
191 multiplicand: Cube | CubeList
192 Any field to be multiplied by another field.
193 multiplier: Cube | CubeList
194 Any field to be multiplied to another field.
196 Returns
197 -------
198 Cube | CubeList
199 The result of multiplicand x multiplier.
201 Raises
202 ------
203 ValueError
204 When the cubes are not compatible.
206 Notes
207 -----
208 This is a simple operator designed for combination of diagnostics or
209 creating new diagnostics by using recipes. CubeLists are multiplied
210 on a strict ordering (e.g. first cube with first cube).
212 Examples
213 --------
214 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties)
216 """
217 new_cubelist = iris.cube.CubeList([])
218 for cube_a, cube_b in zip(
219 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True
220 ):
221 multiplied_cube = cube_a * cube_b
222 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}")
223 new_cubelist.append(multiplied_cube)
224 if len(new_cubelist) == 1:
225 return new_cubelist[0]
226 else:
227 return new_cubelist
230def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList:
231 """Operator that combines multiple cubes or CubeLists into one.
233 Arguments
234 ---------
235 first: Cube | CubeList
236 First cube or CubeList to merge into CubeList.
237 second: Cube | CubeList
238 Second cube or CubeList to merge into CubeList. This must be a named
239 argument.
240 third: Cube | CubeList
241 There can be any number of additional arguments, they just need unique
242 names.
243 ...
245 Returns
246 -------
247 combined_cubelist: CubeList
248 Combined CubeList containing all cubes/CubeLists.
250 Raises
251 ------
252 TypeError:
253 If the provided arguments are not either a Cube or CubeList.
254 """
255 # Create empty CubeList to store cubes/CubeList.
256 all_cubes = CubeList()
257 # Combine all CubeLists into a single flat iterable.
258 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())):
259 # Check each item is a Cube, erroring if not.
260 if isinstance(item, Cube):
261 # Add cube to CubeList.
262 all_cubes.append(item)
263 else:
264 raise TypeError("Not a Cube or CubeList!", item)
265 return all_cubes
268def difference(cubes: CubeList):
269 """Difference of two fields.
271 Parameters
272 ----------
273 cubes: CubeList
274 A list of exactly two cubes. One must have the cset_comparison_base
275 attribute set to 1, and will be used as the base of the comparison.
277 Returns
278 -------
279 Cube
281 Raises
282 ------
283 ValueError
284 When the cubes are not compatible.
286 Notes
287 -----
288 This is a simple operator designed for combination of diagnostics or
289 creating new diagnostics by using recipes. It can be used for model
290 differences to allow for comparisons between the same field in different
291 models or model configurations.
293 Examples
294 --------
295 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B)
297 """
298 if len(cubes) != 2:
299 raise ValueError("cubes should contain exactly 2 cubes.")
300 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
301 other: Cube = cubes.extract_cube(
302 iris.Constraint(
303 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes
304 )
305 )
307 # If cubes contain a pressure coordinate, ensure it is increasing.
308 for cube in cubes:
309 try:
310 if len(cube.coord("pressure").points) > 2: 310 ↛ 308line 310 didn't jump to line 308 because the condition on line 310 was always true
311 if not is_increasing(cube.coord("pressure").points):
312 cube.data = np.flip(cube.data, axis=cube.coord_dims("pressure")[0])
314 except iris.exceptions.CoordinateNotFoundError:
315 pass
317 # Get spatial coord names.
318 base_lat_name, base_lon_name = get_cube_yxcoordname(base)
319 other_lat_name, other_lon_name = get_cube_yxcoordname(other)
321 # Ensure cubes to compare are on common differencing grid.
322 # This is triggered if either
323 # i) latitude and longitude shapes are not the same. Note grid points
324 # are not compared directly as these can differ through rounding
325 # errors.
326 # ii) or variables are known to often sit on different grid staggering
327 # in different models (e.g. cell center vs cell edge), as is the case
328 # for UM and LFRic comparisons.
329 # In future greater choice of regridding method might be applied depending
330 # on variable type. Linear regridding can in general be appropriate for smooth
331 # variables. Care should be taken with interpretation of differences
332 # given this dependency on regridding.
333 if (
334 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
335 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
336 ) or (
337 base.long_name
338 in [
339 "eastward_wind_at_10m",
340 "northward_wind_at_10m",
341 "northward_wind_at_cell_centres",
342 "eastward_wind_at_cell_centres",
343 "zonal_wind_at_pressure_levels",
344 "meridional_wind_at_pressure_levels",
345 "potential_vorticity_at_pressure_levels",
346 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
347 ]
348 ):
349 logging.debug(
350 "Linear regridding base cube to other grid to compute differences"
351 )
352 base = regrid_onto_cube(base, other, method="Linear")
354 # Figure out if we are comparing between UM and LFRic; flip array if so.
355 base_lat_direction = is_increasing(base.coord(base_lat_name).points)
356 other_lat_direction = is_increasing(other.coord(other_lat_name).points)
357 if base_lat_direction != other_lat_direction:
358 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
360 # Extract just common time points.
361 base, other = _extract_common_time_points(base, other)
363 # Equalise attributes so we can merge.
364 fully_equalise_attributes([base, other])
365 logging.debug("Base: %s\nOther: %s", base, other)
367 # This currently relies on the cubes having the same underlying data layout.
368 difference = base.copy()
370 # Differences don't have a standard name; long name gets a suffix. We are
371 # assuming we can rely on cubes having a long name, so we don't check for
372 # its presents.
373 difference.standard_name = None
374 difference.long_name = (
375 base.long_name if base.long_name else base.name()
376 ) + "_difference"
377 if base.var_name:
378 difference.var_name = base.var_name + "_difference"
379 elif base.standard_name:
380 difference.var_name = base.standard_name + "_difference"
382 difference.data = other.data - base.data
383 return difference
386def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]:
387 """Extract common time points from cubes to allow comparison."""
388 # Get the name of the first non-scalar time coordinate.
389 time_coord = next(
390 map(
391 lambda coord: coord.name(),
392 filter(
393 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"],
394 base.coords(),
395 ),
396 ),
397 None,
398 )
399 if not time_coord:
400 logging.debug("No time coord, skipping equalisation.")
401 return (base, other)
402 base_time_coord = base.coord(time_coord)
403 other_time_coord = other.coord(time_coord)
404 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord)
405 if time_coord == "hour":
406 # We directly compare points when comparing coordinates with
407 # non-absolute units, such as hour. We can't just check the units are
408 # equal as iris automatically converts to datetime objects in the
409 # comparison for certain coordinate names.
410 base_times = base_time_coord.points
411 other_times = other_time_coord.points
412 shared_times = set.intersection(set(base_times), set(other_times))
413 else:
414 # Units don't match, so converting to datetimes for comparison.
415 base_times = base_time_coord.units.num2date(base_time_coord.points)
416 other_times = other_time_coord.units.num2date(other_time_coord.points)
417 shared_times = set.intersection(set(base_times), set(other_times))
418 logging.debug("Shared times: %s", shared_times)
419 time_constraint = iris.Constraint(
420 coord_values={time_coord: lambda cell: cell.point in shared_times}
421 )
422 # Extract points matching the shared times.
423 base = base.extract(time_constraint)
424 other = other.extract(time_constraint)
425 if base is None or other is None:
426 raise ValueError("No common time points found!")
427 return (base, other)
430def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str):
431 """Convert the units of a cube.
433 Arguments
434 ---------
435 cubes: iris.cube.Cube | iris.cube.CubeList
436 A Cube or CubeList of a field for its units to be converted.
438 units: str
439 The unit that the original field is to be converted to. It takes
440 CF compliant units.
442 Returns
443 -------
444 iris.cube.Cube | iris.cube.CubeList
445 The field converted into the specified units.
447 Examples
448 --------
449 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit")
451 """
452 new_cubelist = iris.cube.CubeList([])
453 for cube in iter_maybe(cubes):
454 # Copy cube to keep original data.
455 cube_a = cube.copy()
456 # Convert cube units.
457 cube_a.convert_units(units)
458 new_cubelist.append(cube_a)
459 if len(new_cubelist) == 1:
460 return new_cubelist[0]
461 else:
462 return new_cubelist
465def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str):
466 """Rename a cube.
468 Arguments
469 ---------
470 cubes: iris.cube.Cube | iris.cube.CubeList
471 A Cube or CubeList of a field to be renamed.
473 name: str
474 The new name of the cube. It should be CF compliant.
476 Returns
477 -------
478 iris.cube.Cube | iris.cube.CubeList
479 The renamed field.
481 Notes
482 -----
483 This operator is designed to be used when the output field name does not
484 match expectations or needs to be different to defaults in standard_name, var_name or
485 long_name. For example, if combining masks
486 to create light rain you would like the field to be named "mask_for_light_rain"
487 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0".
489 Examples
490 --------
491 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall"
492 """
493 new_cubelist = iris.cube.CubeList([])
494 for cube in iter_maybe(cubes):
495 cube.rename(name)
496 new_cubelist.append(cube)
497 if len(new_cubelist) == 1:
498 return new_cubelist[0]
499 else:
500 return new_cubelist
503def _slice_cube_on_levels(cube: iris.cube.Cube, coord_name: str, levels: list):
504 """
505 Extract levels from a cube for a given coordinate.
507 Arguments
508 ---------
509 cube: iris.cube.Cube
510 A Cube to be sliced.
512 coord_name: str
513 The coordinate name to be sliced
515 levels: list
516 A list containing points to be extracted from the cube.
518 Returns
519 -------
520 iris.cube.Cube
521 The sliced cube.
522 """
523 coord = cube.coord(coord_name)
524 (dim_index,) = cube.coord_dims(coord)
526 mask = np.isin(coord.points, levels)
528 slicer = [slice(None)] * cube.ndim
529 slicer[dim_index] = mask
531 return cube[tuple(slicer)]
534def extract_common_points(cubes: iris.cube.CubeList, coordinate: str):
535 """
536 Extract common points for a given coordinate between two cubes.
538 Parameters
539 ----------
540 cubes: iris.cube.CubeList
541 CubeList containing exactly two cubes.
543 coordinate: str
544 The coordinate name to be checked for common levels.
546 Returns
547 -------
548 iris.cube.CubeList
549 CubeList containing the two cubes sliced to common levels
550 for the given coordinate.
551 """
552 # Check type of input
553 if type(cubes) is not iris.cube.CubeList:
554 raise TypeError(f"Not a CubeList, got type {type(cubes)}")
556 # Check that only two cubes are passed into function.
557 if len(cubes) != 2:
558 raise ValueError(f"Maximum of two cubes allowed, received {len(cubes)}")
560 # Extract coordinate
561 try:
562 p1 = cubes[0].coord(coordinate)
563 p2 = cubes[1].coord(coordinate)
564 except iris.exceptions.CoordinateNotFoundError as err:
565 raise ValueError(f"Both cubes must have an {coordinate} coordinate") from err
567 # Find common points
568 common_points = np.intersect1d(p1.points, p2.points)
570 # Check that common points is more than zero.
571 if common_points.size == 0:
572 raise ValueError("No common levels found")
574 # Extract common points
575 cube0_common = _slice_cube_on_levels(cubes[0], coordinate, common_points)
576 cube1_common = _slice_cube_on_levels(cubes[1], coordinate, common_points)
578 return iris.cube.CubeList([cube0_common, cube1_common])
581def differentiate(
582 cubes: iris.cube.Cube | iris.cube.CubeList, coordinate: str, **kwargs
583) -> iris.cube.Cube | iris.cube.CubeList:
584 """Differentiate a cube on a specified coordinate.
586 Arguments
587 ---------
588 cubes: iris.cube.Cube | iris.cube.CubeList
589 A Cube or CubeList of a field that is to be differentiated.
591 coordinate: str
592 The coordinate that is to be differentiated over.
594 Returns
595 -------
596 iris.cube.Cube | iris.cube.CubeList
597 The differential of the cube along the specified coordinate.
599 Notes
600 -----
601 The differential is calculated based on a carteisan grid. This calculation
602 is then suitable for vertical and temporal derivatives. It is not sensible
603 for horizontal derivatives if they are based on spherical coordinates (e.g.
604 latitude and longitude). In essence this operator is a CSET wrapper around
605 `iris.analysis.calculus.differentiate <https://scitools-iris.readthedocs.io/en/stable/generated/api/iris.analysis.calculus.html#iris.analysis.calculus.differentiate>`_.
607 Examples
608 --------
609 >>> dT_dz = misc.differentiate(temperature, "altitude")
610 """
611 new_cubelist = iris.cube.CubeList([])
612 for cube in iter_maybe(cubes):
613 dcube = iris.analysis.calculus.differentiate(cube, coordinate)
614 new_cubelist.append(dcube)
615 if len(new_cubelist) == 1:
616 return new_cubelist[0]
617 else:
618 return new_cubelist
621def latent_heat_units(
622 cubes: Cube | CubeList,
623 **kwargs,
624) -> Cube | CubeList:
625 """
626 Convert w'q' covariance (e.g. from Cardington surface site netCDF files) to latent heat flux (W m-2).
628 Note
629 ----
630 Using fixed value of latent heat of vapourisation for now; varies by about 5% between -20 and +40degC.
631 Possible future improvement.
632 """
633 REQUIRED_UNITS = Unit("kg m-2 s-1")
634 OUTPUT_UNITS = Unit("W m-2")
636 Lc = 2.45e6 # J kg-1
638 out = iris.cube.CubeList()
640 for cube in iter_maybe(cubes):
641 # ---- ONLY ACT ON MASS FLUXES ----
642 if cube.units is None or cube.units.is_unknown():
643 out.append(cube)
644 continue
645 if not cube.units.is_convertible(REQUIRED_UNITS):
646 # ✅ This is UM LE or some other diagnostic — leave untouched
647 out.append(cube)
648 continue
650 cube_a = cube.copy()
651 cube_a = cube_a * Lc
652 cube_a.units = OUTPUT_UNITS
654 out.append(cube_a)
656 return out[0] if len(out) == 1 else out