Coverage for src/CSET/operators/_utils.py: 94%
183 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 14:09 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 14:09 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common operator functionality used across CSET.
18Functions below should only be added if it is not suitable as a standalone
19operator, and will be used across multiple operators.
20"""
22import logging
23import re
24from datetime import timedelta
26import iris
27import iris.coords
28import iris.cube
29import iris.exceptions
30import iris.util
31from iris.time import PartialDateTime
33from CSET._common import iter_maybe
36def pdt_fromisoformat(
37 datestring,
38) -> tuple[iris.time.PartialDateTime, timedelta | None]:
39 """Generate PartialDateTime object.
41 Function that takes an ISO 8601 date string and returns a PartialDateTime object.
43 Arguments
44 ---------
45 datestring: str
46 ISO 8601 date.
48 Returns
49 -------
50 time_object: iris.time.PartialDateTime
51 """
53 def make_offset(sign, value) -> timedelta:
54 if len(value) not in [2, 4, 5]: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}')
57 hours = int(value[:2])
58 minutes = 0
59 if len(value) in [4, 5]:
60 minutes = int(value[-2:])
61 return timedelta(hours=sign * hours, minutes=sign * minutes)
63 # Remove the microseconds coord due to no support in PartialDateTime
64 datestring = re.sub(r"\.\d+", "", datestring)
66 datetime_split = datestring.split("T")
67 date = datetime_split[0]
68 if len(datetime_split) == 1:
69 time = ""
70 elif len(datetime_split) == 2: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true
71 time = datetime_split[1]
72 else:
73 raise ValueError("datesting in an unexpected format")
75 offset = None
76 time_split = time.split("+")
77 if len(time_split) == 2:
78 time = time_split[0]
79 offset = make_offset(1, time_split[1])
80 else:
81 time_split = time.split("-")
82 if len(time_split) == 2: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 time = time_split[0]
84 offset = make_offset(-1, time_split[1])
85 else:
86 offset = None
88 if re.fullmatch(r"\d{8}", date):
89 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}"
90 elif re.fullmatch(r"\d{6}", date):
91 date = f"{date[0:4]}-{date[4:6]}"
93 if len(date) < 7: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM")
96 # Returning a PartialDateTime for the special case of string form "YYYY-MM"
97 if re.fullmatch(r"\d{4}-\d{2}", date):
98 pdt = PartialDateTime(
99 year=int(date[0:4]),
100 month=int(date[5:7]),
101 day=None,
102 hour=None,
103 minute=None,
104 second=None,
105 )
106 return pdt, offset
108 year = int(date[0:4])
109 month = int(date[5:7])
110 day = int(date[8:10])
112 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0)
114 # Normalise the time parts into standard format
115 if re.fullmatch(r"\d{4}", time): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 time = f"{time[0:2]}:{time[2:4]}"
117 if re.fullmatch(r"\d{6}", time):
118 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}"
120 if len(time) >= 2:
121 kwargs["hour"] = int(time[0:2])
122 if len(time) >= 5:
123 kwargs["minute"] = int(time[3:5])
124 if len(time) >= 8:
125 kwargs["second"] = int(time[6:8])
127 pdt = PartialDateTime(**kwargs)
129 return pdt, offset
132def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]:
133 """
134 Return horizontal dimension coordinate name(s) from a given cube.
136 Arguments
137 ---------
139 cube: iris.cube.Cube
140 An iris cube which will be checked to see if it contains coordinate
141 names that match a pre-defined list of acceptable horizontal
142 dimension coordinate names.
144 Returns
145 -------
146 (y_coord, x_coord)
147 A tuple containing the horizontal coordinate name for latitude and longitude respectively
148 found within the cube.
150 Raises
151 ------
152 ValueError
153 If a unique y/x horizontal coordinate cannot be found.
154 """
155 # Acceptable horizontal coordinate names.
156 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
157 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
159 # Get a list of dimension coordinate names for the cube
160 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
161 coord_names = [coord.name() for coord in cube.coords()]
163 # Check which x-coordinate we have, if any
164 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
165 if len(x_coords) != 1:
166 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES]
167 if len(x_coords) != 1:
168 raise ValueError("Could not identify a unique x-coordinate in cube")
170 # Check which y-coordinate we have, if any
171 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
172 if len(y_coords) != 1:
173 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES]
174 if len(y_coords) != 1:
175 raise ValueError("Could not identify a unique y-coordinate in cube")
177 return (y_coords[0], x_coords[0])
180def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int:
181 """
182 Return coordinate dimension for a named coordinate from a given cube.
184 Arguments
185 ---------
187 cube: iris.cube.Cube
188 An iris cube which will be checked to see if it contains coordinate
189 names that match a pre-defined list of acceptable horizontal
190 coordinate names.
192 coord_name: str
193 A cube dimension name
195 Returns
196 -------
197 coord_index
198 An integer specifying where in the cube dimension list a specified coordinate name is found.
200 Raises
201 ------
202 ValueError
203 If a specified dimension coordinate cannot be found.
204 """
205 # Get a list of dimension coordinate names for the cube
206 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
208 # Check if requested dimension is found in cube and get index
209 if coord_name in coord_names:
210 coord_index = cube.coord_dims(coord_name)[0]
211 else:
212 raise ValueError("Could not find requested dimension %s", coord_name)
214 return coord_index
217def is_spatialdim(cube: iris.cube.Cube) -> bool:
218 """Determine whether a cube is has two spatial dimension coordinates.
220 If cube has both spatial dims, it will contain two unique coordinates
221 that explain space (latitude and longitude). The coordinates have to
222 be iterable/contain usable dimension data, as cubes may contain these
223 coordinates as scalar dimensions after being collapsed.
225 Arguments
226 ---------
227 cube: iris.cube.Cube
228 An iris cube which will be checked to see if it contains coordinate
229 names that match a pre-defined list of acceptable coordinate names.
231 Returns
232 -------
233 bool
234 If true, then the cube has a spatial projection and thus can be plotted
235 as a map.
236 """
237 # Acceptable horizontal coordinate names.
238 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
239 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
241 # Get a list of coordinate names for the cube
242 coord_names = [coord.name() for coord in cube.dim_coords]
243 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
244 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
246 # If there is one coordinate for both x and y direction return True.
247 if len(x_coords) == 1 and len(y_coords) == 1:
248 return True
249 else:
250 return False
253def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool:
254 """Determine whether a cube has specified dimension coordinates.
256 Arguments
257 ---------
258 cube: iris.cube.Cube
259 An iris cube which will be checked to see if it contains coordinate
260 names that match a pre-defined list of acceptable coordinate names.
262 coord_name: str
263 A cube dimension name
265 Returns
266 -------
267 bool
268 If true, then the cube has a spatial projection and thus can be plotted
269 as a map.
270 """
271 # Get a list of dimension coordinate names for the cube
272 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
274 # Check if requested dimension is found in cube and get index
275 if coord_name in coord_names:
276 return True
277 else:
278 return False
281def is_transect(cube: iris.cube.Cube) -> bool:
282 """Determine whether a cube is a transect.
284 If cube is a transect, it will contain only one spatial (map) coordinate,
285 and one vertical coordinate (either pressure or model level).
287 Arguments
288 ---------
289 cube: iris.cube.Cube
290 An iris cube which will be checked to see if it contains coordinate
291 names that match a pre-defined list of acceptable coordinate names.
293 Returns
294 -------
295 bool
296 If true, then the cube is a transect that contains one spatial (map)
297 coordinate and one vertical coordinate.
298 """
299 # Acceptable spatial (map) coordinate names.
300 SPATIAL_MAP_COORD_NAMES = [
301 "longitude",
302 "grid_longitude",
303 "projection_x_coordinate",
304 "x",
305 "latitude",
306 "grid_latitude",
307 "projection_y_coordinate",
308 "y",
309 "distance",
310 ]
312 # Acceptable vertical coordinate names
313 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"]
315 # Get a list of coordinate names for the cube
316 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
318 # Check which spatial coordinates we have.
319 spatial_coords = [
320 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES
321 ]
322 if len(spatial_coords) != 1:
323 return False
325 # Check which vertical coordinates we have.
326 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES]
327 if len(vertical_coords) != 1:
328 return False
330 # Passed criteria so return True
331 return True
334def check_stamp_coordinate(cube: iris.cube.Cube) -> str:
335 """
336 Return stamp dimension coordinate name from a given cube, if exists.
338 If cube contains a valid stamp coordinate as a dimension coordinate,
339 function will return name of the stamp coordinate.
341 Arguments
342 ---------
343 cube: iris.cube.Cube
344 An iris cube which will be checked to see if it contains coordinate
345 names that match a pre-defined list of acceptable coordinate names.
347 Returns
348 -------
349 str
350 If available, then return name of stamp coordinate.
351 Defaults to "realization" if alternative stamp coordinate not found.
352 """
353 # Acceptable stamp coordinate names
354 STAMP_COORD_NAMES = ["realization", "member", "sample", "pseudo_level"]
356 # Check which dimension coordinates we have.
357 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
359 # Check if any acceptable stamp coordinates are cube dimensions.
360 stamp_coords = [coord for coord in dim_coord_names if coord in STAMP_COORD_NAMES]
361 if len(stamp_coords) == 1:
362 stamp_coordinate = stamp_coords[0]
363 else:
364 stamp_coordinate = "realization"
366 return stamp_coordinate
369def fully_equalise_attributes(cubes: iris.cube.CubeList):
370 """Remove any unique attributes between cubes or coordinates in place."""
371 # Equalise cube attributes.
372 removed = iris.util.equalise_attributes(cubes)
373 logging.debug("Removed attributes from cube: %s", removed)
375 # Equalise coordinate attributes.
376 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes]
378 all_coords = set.union(*coord_sets)
379 coords_to_equalise = set.intersection(*coord_sets)
380 coords_to_remove = set.difference(all_coords, coords_to_equalise)
382 logging.debug("All coordinates: %s", all_coords)
383 logging.debug("Coordinates to remove: %s", coords_to_remove)
384 logging.debug("Coordinates to equalise: %s", coords_to_equalise)
386 for coord in coords_to_remove:
387 for cube in cubes:
388 try:
389 cube.remove_coord(coord)
390 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name())
391 except iris.exceptions.CoordinateNotFoundError:
392 pass
394 for coord in coords_to_equalise:
395 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes])
396 logging.debug("Removed attributes from coordinate %s: %s", coord, removed)
398 return cubes
401def slice_over_maybe(cube: iris.cube.Cube, coord_name, index):
402 """Test slicing over cube if exists.
404 Return None if not existing.
406 Arguments
407 ---------
408 cube: iris.cube.Cube
409 An iris cube which will be checked to see if it can be sliced over
410 given coordinate.
411 coord_name: coord
412 An iris coordinate over which to slice cube.
413 index:
414 Coordinate index value to extract
416 Returns
417 -------
418 cube_slice: iris.cube.Cube
419 A slice of iris cube, if available to slice.
420 """
421 if cube is None:
422 return None
424 # Check if coord exists as dimension coordinate
425 if not is_coorddim(cube, coord_name): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 return cube
428 # Use iris to find which axis the dimension coordinate corresponds to
429 dim = cube.coord_dims(coord_name)[0]
431 # Create list of slices for each dimension
432 slices = [slice(None)] * cube.ndim
434 # Only replace the slice for the dim to be extracted
435 slices[dim] = index
437 return cube[tuple(slices)]
440def is_time_aggregatable(cube: iris.cube.Cube) -> bool:
441 """Determine whether a cube can be aggregated in time.
443 If a cube is aggregatable it will contain both a 'forecast_reference_time'
444 and 'forecast_period' coordinate as dimension or scalar coordinates.
446 Arguments
447 ---------
448 cube: iris.cube.Cube
449 An iris cube which will be checked to see if it is aggregatable based
450 on a set of pre-defined dimensional time coordinates:
451 'forecast_period' and 'forecast_reference_time'.
453 Returns
454 -------
455 bool
456 If true, then the cube is aggregatable and contains dimensional
457 coordinates including both 'forecast_reference_time' and
458 'forecast_period'.
459 """
460 # Acceptable time coordinate names for aggregatable cube.
461 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"]
463 def strictly_monotonic(coord: iris.coords.Coord) -> bool:
464 """Return whether a coord is strictly monotonic, catching errors."""
465 try:
466 return coord.is_monotonic()
467 except iris.exceptions.CoordinateMultiDimError:
468 return False
470 # Strictly monotonic coordinate names for the cube.
471 coord_names = [coord.name() for coord in cube.coords() if strictly_monotonic(coord)]
473 # Check which temporal coordinates we have.
474 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES]
475 # Return whether both coordinates are in the temporal coordinates.
476 return len(temporal_coords) == 2
479def check_single_cube(cube: iris.cube.Cube | iris.cube.CubeList) -> iris.cube.Cube:
480 """Ensure a single cube is given.
482 If a CubeList of length one is given that the contained cube is returned,
483 otherwise an error is raised.
485 Parameters
486 ----------
487 cube: Cube | CubeList
488 The cube to check.
490 Returns
491 -------
492 cube: Cube
493 The checked cube.
495 Raises
496 ------
497 TypeError
498 If the input cube is not a Cube or CubeList of a single Cube.
499 """
500 if isinstance(cube, iris.cube.Cube):
501 return cube
502 if isinstance(cube, iris.cube.CubeList):
503 if len(cube) == 1:
504 return cube[0]
505 else:
506 raise ValueError("CubeList did not contain a single cube.", cube)
507 raise TypeError(
508 "check_single_cube requires a Cube or CubeList of a single cube.", cube
509 )
512def check_sequence_coordinate(cubes, sequence_coordinate):
513 # If several histograms are plotted with time as sequence_coordinate for the
514 # time slider option.
515 for cube in iter_maybe(cubes):
516 try:
517 cube.coord(sequence_coordinate)
518 except iris.exceptions.CoordinateNotFoundError as err:
519 raise ValueError(
520 f"Cube must have a {sequence_coordinate} coordinate."
521 ) from err
524def get_num_models(cube: iris.cube.Cube | iris.cube.CubeList) -> int:
525 """Return number of models based on cube attributes."""
526 model_names = {cb.attributes.get("model_name") for cb in iter_maybe(cube)}
528 if not model_names: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 logging.debug("Missing model names. Will assume single model.")
530 return 1
531 else:
532 return len(model_names)
535def validate_cube_shape(
536 cube: iris.cube.Cube | iris.cube.CubeList, num_models: int
537) -> None:
538 """Check all cubes have a model name."""
539 if isinstance(cube, iris.cube.CubeList) and len(cube) != num_models:
540 raise ValueError(
541 f"The number of model names ({num_models}) should equal the number "
542 f"of cubes ({len(cube)})."
543 )
546def validate_cubes_coords(
547 cubes: iris.cube.CubeList, coords: list[iris.coords.Coord]
548) -> None:
549 """Check same number of cubes as sequence coordinate for zip functions."""
550 if len(cubes) != len(coords):
551 raise ValueError(
552 f"The number of CubeList entries ({len(cubes)}) should equal the number "
553 f"of sequence coordinates ({len(coords)})."
554 f"Check that number of time entries in input data are consistent if "
555 f"performing time-averaging steps prior to plotting outputs."
556 )