Coverage for src / CSET / operators / misc.py: 99%
147 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 17:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 17:20 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Miscellaneous operators."""
17import itertools
18import logging
19from collections.abc import Iterable
20from functools import reduce
22import iris
23import iris.analysis.calculus
24import numpy as np
25from iris.cube import Cube, CubeList
27from CSET._common import is_increasing, iter_maybe
28from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname
29from CSET.operators.regrid import regrid_onto_cube
32def noop(x, **kwargs):
33 """Return its input without doing anything to it.
35 Useful for constructing diagnostic chains.
37 Arguments
38 ---------
39 x: Any
40 Input to return.
42 Returns
43 -------
44 x: Any
45 The input that was given.
46 """
47 return x
50def remove_attribute(
51 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs
52) -> CubeList:
53 """Remove a cube attribute.
55 If the attribute is not on the cube, the cube is passed through unchanged.
57 Arguments
58 ---------
59 cubes: Cube | CubeList
60 One or more cubes to remove the attribute from.
61 attribute: str | Iterable
62 Name of attribute (or Iterable of names) to remove.
64 Returns
65 -------
66 cubes: CubeList
67 CubeList of cube(s) with the attribute removed.
68 """
69 # Ensure cubes is a CubeList.
70 if not isinstance(cubes, CubeList):
71 cubes = CubeList(iter_maybe(cubes))
73 for cube in cubes:
74 for attr in iter_maybe(attribute):
75 cube.attributes.pop(attr, None)
77 # Combine things that can be merged due to remove removing the
78 # attributes.
79 cubes = cubes.merge()
80 # combine items that can be merged after removing unwanted attributes
81 cubes = cubes.concatenate()
82 return cubes
85def addition(addend_1, addend_2):
86 """Addition of two fields.
88 Parameters
89 ----------
90 addend_1: Cube
91 Any field to have another field added to it.
92 addend_2: Cube
93 Any field to be added to another field.
95 Returns
96 -------
97 Cube
99 Raises
100 ------
101 ValueError, iris.exceptions.NotYetImplementedError
102 When the cubes are not compatible.
104 Notes
105 -----
106 This is a simple operator designed for combination of diagnostics or
107 creating new diagnostics by using recipes.
109 Examples
110 --------
111 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v)
113 """
114 return addend_1 + addend_2
117def subtraction(minuend, subtrahend):
118 """Subtraction of two fields.
120 Parameters
121 ----------
122 minuend: Cube
123 Any field to have another field subtracted from it.
124 subtrahend: Cube
125 Any field to be subtracted from to another field.
127 Returns
128 -------
129 Cube
131 Raises
132 ------
133 ValueError, iris.exceptions.NotYetImplementedError
134 When the cubes are not compatible.
136 Notes
137 -----
138 This is a simple operator designed for combination of diagnostics or
139 creating new diagnostics by using recipes. It can be used for model
140 differences to allow for comparisons between the same field in different
141 models or model configurations.
143 Examples
144 --------
145 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B)
147 """
148 return minuend - subtrahend
151def division(numerator, denominator):
152 """Division of two fields.
154 Parameters
155 ----------
156 numerator: Cube
157 Any field to have the ratio taken with respect to another field.
158 denominator: Cube
159 Any field used to divide another field or provide the reference
160 value in a ratio.
162 Returns
163 -------
164 Cube
166 Raises
167 ------
168 ValueError
169 When the cubes are not compatible.
171 Notes
172 -----
173 This is a simple operator designed for combination of diagnostics or
174 creating new diagnostics by using recipes.
176 Examples
177 --------
178 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux)
180 """
181 return numerator / denominator
184def multiplication(
185 multiplicand: Cube | CubeList, multiplier: Cube | CubeList
186) -> Cube | CubeList:
187 """Multiplication of two fields.
189 Parameters
190 ----------
191 multiplicand: Cube | CubeList
192 Any field to be multiplied by another field.
193 multiplier: Cube | CubeList
194 Any field to be multiplied to another field.
196 Returns
197 -------
198 Cube | CubeList
199 The result of multiplicand x multiplier.
201 Raises
202 ------
203 ValueError
204 When the cubes are not compatible.
206 Notes
207 -----
208 This is a simple operator designed for combination of diagnostics or
209 creating new diagnostics by using recipes. CubeLists are multiplied
210 on a strict ordering (e.g. first cube with first cube).
212 Examples
213 --------
214 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties)
216 """
217 new_cubelist = iris.cube.CubeList([])
218 for cube_a, cube_b in zip(
219 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True
220 ):
221 multiplied_cube = cube_a * cube_b
222 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}")
223 new_cubelist.append(multiplied_cube)
224 if len(new_cubelist) == 1:
225 return new_cubelist[0]
226 else:
227 return new_cubelist
230def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList:
231 """Operator that combines multiple cubes or CubeLists into one.
233 Arguments
234 ---------
235 first: Cube | CubeList
236 First cube or CubeList to merge into CubeList.
237 second: Cube | CubeList
238 Second cube or CubeList to merge into CubeList. This must be a named
239 argument.
240 third: Cube | CubeList
241 There can be any number of additional arguments, they just need unique
242 names.
243 ...
245 Returns
246 -------
247 combined_cubelist: CubeList
248 Combined CubeList containing all cubes/CubeLists.
250 Raises
251 ------
252 TypeError:
253 If the provided arguments are not either a Cube or CubeList.
254 """
255 # Create empty CubeList to store cubes/CubeList.
256 all_cubes = CubeList()
257 # Combine all CubeLists into a single flat iterable.
258 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())):
259 # Check each item is a Cube, erroring if not.
260 if isinstance(item, Cube):
261 # Add cube to CubeList.
262 all_cubes.append(item)
263 else:
264 raise TypeError("Not a Cube or CubeList!", item)
265 return all_cubes
268def difference(cubes: CubeList):
269 """Difference of two fields.
271 Parameters
272 ----------
273 cubes: CubeList
274 A list of exactly two cubes. One must have the cset_comparison_base
275 attribute set to 1, and will be used as the base of the comparison.
277 Returns
278 -------
279 Cube
281 Raises
282 ------
283 ValueError
284 When the cubes are not compatible.
286 Notes
287 -----
288 This is a simple operator designed for combination of diagnostics or
289 creating new diagnostics by using recipes. It can be used for model
290 differences to allow for comparisons between the same field in different
291 models or model configurations.
293 Examples
294 --------
295 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B)
297 """
298 if len(cubes) != 2:
299 raise ValueError("cubes should contain exactly 2 cubes.")
300 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
301 other: Cube = cubes.extract_cube(
302 iris.Constraint(
303 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes
304 )
305 )
307 # If cubes contain a pressure coordinate, ensure it is increasing.
308 for cube in cubes:
309 try:
310 if len(cube.coord("pressure").points) > 2: 310 ↛ 308line 310 didn't jump to line 308 because the condition on line 310 was always true
311 if not is_increasing(cube.coord("pressure").points):
312 cube.data = np.flip(cube.data, axis=cube.coord_dims("pressure")[0])
314 except iris.exceptions.CoordinateNotFoundError:
315 pass
317 # Get spatial coord names.
318 base_lat_name, base_lon_name = get_cube_yxcoordname(base)
319 other_lat_name, other_lon_name = get_cube_yxcoordname(other)
321 # Ensure cubes to compare are on common differencing grid.
322 # This is triggered if either
323 # i) latitude and longitude shapes are not the same. Note grid points
324 # are not compared directly as these can differ through rounding
325 # errors.
326 # ii) or variables are known to often sit on different grid staggering
327 # in different models (e.g. cell center vs cell edge), as is the case
328 # for UM and LFRic comparisons.
329 # In future greater choice of regridding method might be applied depending
330 # on variable type. Linear regridding can in general be appropriate for smooth
331 # variables. Care should be taken with interpretation of differences
332 # given this dependency on regridding.
333 if (
334 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
335 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
336 ) or (
337 base.long_name
338 in [
339 "eastward_wind_at_10m",
340 "northward_wind_at_10m",
341 "northward_wind_at_cell_centres",
342 "eastward_wind_at_cell_centres",
343 "zonal_wind_at_pressure_levels",
344 "meridional_wind_at_pressure_levels",
345 "potential_vorticity_at_pressure_levels",
346 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
347 ]
348 ):
349 logging.debug(
350 "Linear regridding base cube to other grid to compute differences"
351 )
352 base = regrid_onto_cube(base, other, method="Linear")
354 # Figure out if we are comparing between UM and LFRic; flip array if so.
355 base_lat_direction = is_increasing(base.coord(base_lat_name).points)
356 other_lat_direction = is_increasing(other.coord(other_lat_name).points)
357 if base_lat_direction != other_lat_direction:
358 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
360 # Extract just common time points.
361 base, other = _extract_common_time_points(base, other)
363 # Equalise attributes so we can merge.
364 fully_equalise_attributes([base, other])
365 logging.debug("Base: %s\nOther: %s", base, other)
367 # This currently relies on the cubes having the same underlying data layout.
368 difference = base.copy()
370 # Differences don't have a standard name; long name gets a suffix. We are
371 # assuming we can rely on cubes having a long name, so we don't check for
372 # its presents.
373 difference.standard_name = None
374 difference.long_name = (
375 base.long_name if base.long_name else base.name()
376 ) + "_difference"
377 if base.var_name:
378 difference.var_name = base.var_name + "_difference"
379 elif base.standard_name:
380 difference.var_name = base.standard_name + "_difference"
382 difference.data = other.data - base.data
383 return difference
386def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]:
387 """Extract common time points from cubes to allow comparison."""
388 # Get the name of the first non-scalar time coordinate.
389 time_coord = next(
390 map(
391 lambda coord: coord.name(),
392 filter(
393 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"],
394 base.coords(),
395 ),
396 ),
397 None,
398 )
399 if not time_coord:
400 logging.debug("No time coord, skipping equalisation.")
401 return (base, other)
402 base_time_coord = base.coord(time_coord)
403 other_time_coord = other.coord(time_coord)
404 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord)
405 if time_coord == "hour":
406 # We directly compare points when comparing coordinates with
407 # non-absolute units, such as hour. We can't just check the units are
408 # equal as iris automatically converts to datetime objects in the
409 # comparison for certain coordinate names.
410 base_times = base_time_coord.points
411 other_times = other_time_coord.points
412 shared_times = set.intersection(set(base_times), set(other_times))
413 else:
414 # Units don't match, so converting to datetimes for comparison.
415 base_times = base_time_coord.units.num2date(base_time_coord.points)
416 other_times = other_time_coord.units.num2date(other_time_coord.points)
417 shared_times = set.intersection(set(base_times), set(other_times))
418 logging.debug("Shared times: %s", shared_times)
419 time_constraint = iris.Constraint(
420 coord_values={
421 time_coord: lambda cell, shared_times=shared_times: (
422 cell.point in shared_times
423 )
424 }
425 )
426 # Extract points matching the shared times.
427 base = base.extract(time_constraint)
428 other = other.extract(time_constraint)
429 if base is None or other is None:
430 raise ValueError("No common time points found!")
431 return (base, other)
434def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str):
435 """Convert the units of a cube.
437 Arguments
438 ---------
439 cubes: iris.cube.Cube | iris.cube.CubeList
440 A Cube or CubeList of a field for its units to be converted.
442 units: str
443 The unit that the original field is to be converted to. It takes
444 CF compliant units.
446 Returns
447 -------
448 iris.cube.Cube | iris.cube.CubeList
449 The field converted into the specified units.
451 Examples
452 --------
453 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit")
455 """
456 new_cubelist = iris.cube.CubeList([])
457 for cube in iter_maybe(cubes):
458 # Copy cube to keep original data.
459 cube_a = cube.copy()
460 # Convert cube units.
461 cube_a.convert_units(units)
462 new_cubelist.append(cube_a)
463 if len(new_cubelist) == 1:
464 return new_cubelist[0]
465 else:
466 return new_cubelist
469def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str):
470 """Rename a cube.
472 Arguments
473 ---------
474 cubes: iris.cube.Cube | iris.cube.CubeList
475 A Cube or CubeList of a field to be renamed.
477 name: str
478 The new name of the cube. It should be CF compliant.
480 Returns
481 -------
482 iris.cube.Cube | iris.cube.CubeList
483 The renamed field.
485 Notes
486 -----
487 This operator is designed to be used when the output field name does not
488 match expectations or needs to be different to defaults in standard_name, var_name or
489 long_name. For example, if combining masks
490 to create light rain you would like the field to be named "mask_for_light_rain"
491 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0".
493 Examples
494 --------
495 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall"
496 """
497 new_cubelist = iris.cube.CubeList([])
498 for cube in iter_maybe(cubes):
499 cube.rename(name)
500 new_cubelist.append(cube)
501 if len(new_cubelist) == 1:
502 return new_cubelist[0]
503 else:
504 return new_cubelist
507def _slice_cube_on_levels(cube: iris.cube.Cube, coord_name: str, levels: list):
508 """
509 Extract levels from a cube for a given coordinate.
511 Arguments
512 ---------
513 cube: iris.cube.Cube
514 A Cube to be sliced.
516 coord_name: str
517 The coordinate name to be sliced
519 levels: list
520 A list containing points to be extracted from the cube.
522 Returns
523 -------
524 iris.cube.Cube
525 The sliced cube.
526 """
527 coord = cube.coord(coord_name)
528 (dim_index,) = cube.coord_dims(coord)
530 mask = np.isin(coord.points, levels)
532 slicer = [slice(None)] * cube.ndim
533 slicer[dim_index] = mask
535 return cube[tuple(slicer)]
538def extract_common_points(cubes: iris.cube.CubeList, coordinate: str):
539 """
540 Extract common points for a given coordinate between cubes in a CubeList.
542 Parameters
543 ----------
544 cubes: iris.cube.CubeList
545 CubeList containing cubes for which to extract common points.
547 coordinate: str
548 The coordinate name to be checked for common points.
550 Returns
551 -------
552 iris.cube.CubeList
553 CubeList containing the two cubes sliced to common points
554 for the given coordinate.
555 """
556 # Check type of input
557 if type(cubes) is not iris.cube.CubeList:
558 raise TypeError(f"Not a CubeList, got type {type(cubes)}")
560 # Extract coordinate
561 try:
562 points_list = []
563 for cube in cubes:
564 points_list.append(cube.coord(coordinate).points)
565 except iris.exceptions.CoordinateNotFoundError as err:
566 raise ValueError(f"Both cubes must have an {coordinate} coordinate") from err
568 # Find common points
569 common_points = reduce(np.intersect1d, points_list)
571 # Check that common points is more than zero.
572 if common_points.size == 0:
573 raise ValueError("No common levels found")
575 # Extract common points
576 common_cubes = iris.cube.CubeList()
577 for cube in cubes:
578 common_cubes.append(_slice_cube_on_levels(cube, coordinate, common_points))
580 return common_cubes
583def differentiate(
584 cubes: iris.cube.Cube | iris.cube.CubeList, coordinate: str, **kwargs
585) -> iris.cube.Cube | iris.cube.CubeList:
586 """Differentiate a cube on a specified coordinate.
588 Arguments
589 ---------
590 cubes: iris.cube.Cube | iris.cube.CubeList
591 A Cube or CubeList of a field that is to be differentiated.
593 coordinate: str
594 The coordinate that is to be differentiated over.
596 Returns
597 -------
598 iris.cube.Cube | iris.cube.CubeList
599 The differential of the cube along the specified coordinate.
601 Notes
602 -----
603 The differential is calculated based on a carteisan grid. This calculation
604 is then suitable for vertical and temporal derivatives. It is not sensible
605 for horizontal derivatives if they are based on spherical coordinates (e.g.
606 latitude and longitude). In essence this operator is a CSET wrapper around
607 `iris.analysis.calculus.differentiate <https://scitools-iris.readthedocs.io/en/stable/generated/api/iris.analysis.calculus.html#iris.analysis.calculus.differentiate>`_.
609 Examples
610 --------
611 >>> dT_dz = misc.differentiate(temperature, "altitude")
612 """
613 new_cubelist = iris.cube.CubeList([])
614 for cube in iter_maybe(cubes):
615 dcube = iris.analysis.calculus.differentiate(cube, coordinate)
616 new_cubelist.append(dcube)
617 if len(new_cubelist) == 1:
618 return new_cubelist[0]
619 else:
620 return new_cubelist