Coverage for src / CSET / operators / _utils.py: 94%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common operator functionality used across CSET.
18Functions below should only be added if it is not suitable as a standalone
19operator, and will be used across multiple operators.
20"""
22import logging
23import re
24from datetime import timedelta
26import iris
27import iris.coords
28import iris.cube
29import iris.exceptions
30import iris.util
31from iris.time import PartialDateTime
34def pdt_fromisoformat(
35 datestring,
36) -> tuple[iris.time.PartialDateTime, timedelta | None]:
37 """Generate PartialDateTime object.
39 Function that takes an ISO 8601 date string and returns a PartialDateTime object.
41 Arguments
42 ---------
43 datestring: str
44 ISO 8601 date.
46 Returns
47 -------
48 time_object: iris.time.PartialDateTime
49 """
51 def make_offset(sign, value) -> timedelta:
52 if len(value) not in [2, 4, 5]: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}')
55 hours = int(value[:2])
56 minutes = 0
57 if len(value) in [4, 5]:
58 minutes = int(value[-2:])
59 return timedelta(hours=sign * hours, minutes=sign * minutes)
61 # Remove the microseconds coord due to no support in PartialDateTime
62 datestring = re.sub(r"\.\d+", "", datestring)
64 datetime_split = datestring.split("T")
65 date = datetime_split[0]
66 if len(datetime_split) == 1:
67 time = ""
68 elif len(datetime_split) == 2: 68 ↛ 71line 68 didn't jump to line 71 because the condition on line 68 was always true
69 time = datetime_split[1]
70 else:
71 raise ValueError("datesting in an unexpected format")
73 offset = None
74 time_split = time.split("+")
75 if len(time_split) == 2:
76 time = time_split[0]
77 offset = make_offset(1, time_split[1])
78 else:
79 time_split = time.split("-")
80 if len(time_split) == 2: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 time = time_split[0]
82 offset = make_offset(-1, time_split[1])
83 else:
84 offset = None
86 if re.fullmatch(r"\d{8}", date):
87 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}"
88 elif re.fullmatch(r"\d{6}", date):
89 date = f"{date[0:4]}-{date[4:6]}"
91 if len(date) < 7: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM")
94 # Returning a PartialDateTime for the special case of string form "YYYY-MM"
95 if re.fullmatch(r"\d{4}-\d{2}", date):
96 pdt = PartialDateTime(
97 year=int(date[0:4]),
98 month=int(date[5:7]),
99 day=None,
100 hour=None,
101 minute=None,
102 second=None,
103 )
104 return pdt, offset
106 year = int(date[0:4])
107 month = int(date[5:7])
108 day = int(date[8:10])
110 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0)
112 # Normalise the time parts into standard format
113 if re.fullmatch(r"\d{4}", time): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 time = f"{time[0:2]}:{time[2:4]}"
115 if re.fullmatch(r"\d{6}", time):
116 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}"
118 if len(time) >= 2:
119 kwargs["hour"] = int(time[0:2])
120 if len(time) >= 5:
121 kwargs["minute"] = int(time[3:5])
122 if len(time) >= 8:
123 kwargs["second"] = int(time[6:8])
125 pdt = PartialDateTime(**kwargs)
127 return pdt, offset
130def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]:
131 """
132 Return horizontal dimension coordinate name(s) from a given cube.
134 Arguments
135 ---------
137 cube: iris.cube.Cube
138 An iris cube which will be checked to see if it contains coordinate
139 names that match a pre-defined list of acceptable horizontal
140 dimension coordinate names.
142 Returns
143 -------
144 (y_coord, x_coord)
145 A tuple containing the horizontal coordinate name for latitude and longitude respectively
146 found within the cube.
148 Raises
149 ------
150 ValueError
151 If a unique y/x horizontal coordinate cannot be found.
152 """
153 # Acceptable horizontal coordinate names.
154 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
155 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
157 # Get a list of dimension coordinate names for the cube
158 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
159 coord_names = [coord.name() for coord in cube.coords()]
161 # Check which x-coordinate we have, if any
162 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
163 if len(x_coords) != 1:
164 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES]
165 if len(x_coords) != 1:
166 raise ValueError("Could not identify a unique x-coordinate in cube")
168 # Check which y-coordinate we have, if any
169 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
170 if len(y_coords) != 1:
171 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES]
172 if len(y_coords) != 1:
173 raise ValueError("Could not identify a unique y-coordinate in cube")
175 return (y_coords[0], x_coords[0])
178def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int:
179 """
180 Return coordinate dimension for a named coordinate from a given cube.
182 Arguments
183 ---------
185 cube: iris.cube.Cube
186 An iris cube which will be checked to see if it contains coordinate
187 names that match a pre-defined list of acceptable horizontal
188 coordinate names.
190 coord_name: str
191 A cube dimension name
193 Returns
194 -------
195 coord_index
196 An integer specifying where in the cube dimension list a specified coordinate name is found.
198 Raises
199 ------
200 ValueError
201 If a specified dimension coordinate cannot be found.
202 """
203 # Get a list of dimension coordinate names for the cube
204 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
206 # Check if requested dimension is found in cube and get index
207 if coord_name in coord_names:
208 coord_index = cube.coord_dims(coord_name)[0]
209 else:
210 raise ValueError("Could not find requested dimension %s", coord_name)
212 return coord_index
215def is_spatialdim(cube: iris.cube.Cube) -> bool:
216 """Determine whether a cube is has two spatial dimension coordinates.
218 If cube has both spatial dims, it will contain two unique coordinates
219 that explain space (latitude and longitude). The coordinates have to
220 be iterable/contain usable dimension data, as cubes may contain these
221 coordinates as scalar dimensions after being collapsed.
223 Arguments
224 ---------
225 cube: iris.cube.Cube
226 An iris cube which will be checked to see if it contains coordinate
227 names that match a pre-defined list of acceptable coordinate names.
229 Returns
230 -------
231 bool
232 If true, then the cube has a spatial projection and thus can be plotted
233 as a map.
234 """
235 # Acceptable horizontal coordinate names.
236 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
237 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
239 # Get a list of coordinate names for the cube
240 coord_names = [coord.name() for coord in cube.dim_coords]
241 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
242 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
244 # If there is one coordinate for both x and y direction return True.
245 if len(x_coords) == 1 and len(y_coords) == 1:
246 return True
247 else:
248 return False
251def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool:
252 """Determine whether a cube has specified dimension coordinates.
254 Arguments
255 ---------
256 cube: iris.cube.Cube
257 An iris cube which will be checked to see if it contains coordinate
258 names that match a pre-defined list of acceptable coordinate names.
260 coord_name: str
261 A cube dimension name
263 Returns
264 -------
265 bool
266 If true, then the cube has a spatial projection and thus can be plotted
267 as a map.
268 """
269 # Get a list of dimension coordinate names for the cube
270 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
272 # Check if requested dimension is found in cube and get index
273 if coord_name in coord_names:
274 return True
275 else:
276 return False
279def is_transect(cube: iris.cube.Cube) -> bool:
280 """Determine whether a cube is a transect.
282 If cube is a transect, it will contain only one spatial (map) coordinate,
283 and one vertical coordinate (either pressure or model level).
285 Arguments
286 ---------
287 cube: iris.cube.Cube
288 An iris cube which will be checked to see if it contains coordinate
289 names that match a pre-defined list of acceptable coordinate names.
291 Returns
292 -------
293 bool
294 If true, then the cube is a transect that contains one spatial (map)
295 coordinate and one vertical coordinate.
296 """
297 # Acceptable spatial (map) coordinate names.
298 SPATIAL_MAP_COORD_NAMES = [
299 "longitude",
300 "grid_longitude",
301 "projection_x_coordinate",
302 "x",
303 "latitude",
304 "grid_latitude",
305 "projection_y_coordinate",
306 "y",
307 "distance",
308 ]
310 # Acceptable vertical coordinate names
311 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"]
313 # Get a list of coordinate names for the cube
314 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
316 # Check which spatial coordinates we have.
317 spatial_coords = [
318 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES
319 ]
320 if len(spatial_coords) != 1:
321 return False
323 # Check which vertical coordinates we have.
324 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES]
325 if len(vertical_coords) != 1:
326 return False
328 # Passed criteria so return True
329 return True
332def check_stamp_coordinate(cube: iris.cube.Cube) -> str:
333 """
334 Return stamp dimension coordinate name from a given cube, if exists.
336 If cube contains a valid stamp coordinate as a dimension coordinate,
337 function will return name of the stamp coordinate.
339 Arguments
340 ---------
341 cube: iris.cube.Cube
342 An iris cube which will be checked to see if it contains coordinate
343 names that match a pre-defined list of acceptable coordinate names.
345 Returns
346 -------
347 str
348 If available, then return name of stamp coordinate.
349 Defaults to "realization" if alternative stamp coordinate not found.
350 """
351 # Acceptable stamp coordinate names
352 STAMP_COORD_NAMES = ["realization", "member", "pseudo_level"]
354 # Check which dimension coordinates we have.
355 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
357 # Check if any acceptable stamp coordinates are cube dimensions.
358 stamp_coords = [coord for coord in dim_coord_names if coord in STAMP_COORD_NAMES]
359 if len(stamp_coords) == 1:
360 stamp_coordinate = stamp_coords[0]
361 logging.debug("Retained valid stamp_coordinate from cube: %s", stamp_coordinate)
362 else:
363 stamp_coordinate = "realization"
364 logging.debug("Default stamp_coordinate assumed: %s", stamp_coordinate)
366 return stamp_coordinate
369def fully_equalise_attributes(cubes: iris.cube.CubeList):
370 """Remove any unique attributes between cubes or coordinates in place."""
371 # Equalise cube attributes.
372 removed = iris.util.equalise_attributes(cubes)
373 logging.debug("Removed attributes from cube: %s", removed)
375 # Equalise coordinate attributes.
376 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes]
378 all_coords = set.union(*coord_sets)
379 coords_to_equalise = set.intersection(*coord_sets)
380 coords_to_remove = set.difference(all_coords, coords_to_equalise)
382 logging.debug("All coordinates: %s", all_coords)
383 logging.debug("Coordinates to remove: %s", coords_to_remove)
384 logging.debug("Coordinates to equalise: %s", coords_to_equalise)
386 for coord in coords_to_remove:
387 for cube in cubes:
388 try:
389 cube.remove_coord(coord)
390 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name())
391 except iris.exceptions.CoordinateNotFoundError:
392 pass
394 for coord in coords_to_equalise:
395 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes])
396 logging.debug("Removed attributes from coordinate %s: %s", coord, removed)
398 return cubes
401def slice_over_maybe(cube: iris.cube.Cube, coord_name, index):
402 """Test slicing over cube if exists.
404 Return None if not existing.
406 Arguments
407 ---------
408 cube: iris.cube.Cube
409 An iris cube which will be checked to see if it can be sliced over
410 given coordinate.
411 coord_name: coord
412 An iris coordinate over which to slice cube.
413 index:
414 Coordinate index value to extract
416 Returns
417 -------
418 cube_slice: iris.cube.Cube
419 A slice of iris cube, if available to slice.
420 """
421 if cube is None:
422 return None
424 # Check if coord exists as dimension coordinate
425 if not is_coorddim(cube, coord_name): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 return cube
428 # Use iris to find which axis the dimension coordinate corresponds to
429 dim = cube.coord_dims(coord_name)[0]
431 # Create list of slices for each dimension
432 slices = [slice(None)] * cube.ndim
434 # Only replace the slice for the dim to be extracted
435 slices[dim] = index
437 return cube[tuple(slices)]
440def is_time_aggregatable(cube: iris.cube.Cube) -> bool:
441 """Determine whether a cube can be aggregated in time.
443 If a cube is aggregatable it will contain both a 'forecast_reference_time'
444 and 'forecast_period' coordinate as dimension or scalar coordinates.
446 Arguments
447 ---------
448 cube: iris.cube.Cube
449 An iris cube which will be checked to see if it is aggregatable based
450 on a set of pre-defined dimensional time coordinates:
451 'forecast_period' and 'forecast_reference_time'.
453 Returns
454 -------
455 bool
456 If true, then the cube is aggregatable and contains dimensional
457 coordinates including both 'forecast_reference_time' and
458 'forecast_period'.
459 """
460 # Acceptable time coordinate names for aggregatable cube.
461 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"]
463 def strictly_monotonic(coord: iris.coords.Coord) -> bool:
464 """Return whether a coord is strictly monotonic, catching errors."""
465 try:
466 return coord.is_monotonic()
467 except iris.exceptions.CoordinateMultiDimError:
468 return False
470 # Strictly monotonic coordinate names for the cube.
471 coord_names = [coord.name() for coord in cube.coords() if strictly_monotonic(coord)]
473 # Check which temporal coordinates we have.
474 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES]
475 # Return whether both coordinates are in the temporal coordinates.
476 return len(temporal_coords) == 2