Coverage for src/CSET/operators/_utils.py: 94%
128 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16Common operator functionality used across CSET.
18Functions below should only be added if it is not suitable as a standalone
19operator, and will be used across multiple operators.
20"""
22import logging
23import re
24from datetime import timedelta
26import iris
27import iris.cube
28import iris.exceptions
29import iris.util
30from iris.time import PartialDateTime
33def pdt_fromisoformat(
34 datestring,
35) -> tuple[iris.time.PartialDateTime, timedelta | None]:
36 """Generate PartialDateTime object.
38 Function that takes an ISO 8601 date string and returns a PartialDateTime object.
40 Arguments
41 ---------
42 datestring: str
43 ISO 8601 date.
45 Returns
46 -------
47 time_object: iris.time.PartialDateTime
48 """
50 def make_offset(sign, value) -> timedelta:
51 if len(value) not in [2, 4, 5]: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}')
54 hours = int(value[:2])
55 minutes = 0
56 if len(value) in [4, 5]:
57 minutes = int(value[-2:])
58 return timedelta(hours=sign * hours, minutes=sign * minutes)
60 # Remove the microseconds coord due to no support in PartialDateTime
61 datestring = re.sub(r"\.\d+", "", datestring)
63 datetime_split = datestring.split("T")
64 date = datetime_split[0]
65 if len(datetime_split) == 1:
66 time = ""
67 elif len(datetime_split) == 2: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true
68 time = datetime_split[1]
69 else:
70 raise ValueError("datesting in an unexpected format")
72 offset = None
73 time_split = time.split("+")
74 if len(time_split) == 2:
75 time = time_split[0]
76 offset = make_offset(1, time_split[1])
77 else:
78 time_split = time.split("-")
79 if len(time_split) == 2: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 time = time_split[0]
81 offset = make_offset(-1, time_split[1])
82 else:
83 offset = None
85 if re.fullmatch(r"\d{8}", date):
86 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}"
87 elif re.fullmatch(r"\d{6}", date):
88 date = f"{date[0:4]}-{date[4:6]}"
90 if len(date) < 7: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM")
93 # Returning a PartialDateTime for the special case of string form "YYYY-MM"
94 if re.fullmatch(r"\d{4}-\d{2}", date):
95 pdt = PartialDateTime(
96 year=int(date[0:4]),
97 month=int(date[5:7]),
98 day=None,
99 hour=None,
100 minute=None,
101 second=None,
102 )
103 return pdt, offset
105 year = int(date[0:4])
106 month = int(date[5:7])
107 day = int(date[8:10])
109 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0)
111 # Normalise the time parts into standard format
112 if re.fullmatch(r"\d{4}", time): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 time = f"{time[0:2]}:{time[2:4]}"
114 if re.fullmatch(r"\d{6}", time):
115 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}"
117 if len(time) >= 2:
118 kwargs["hour"] = int(time[0:2])
119 if len(time) >= 5:
120 kwargs["minute"] = int(time[3:5])
121 if len(time) >= 8:
122 kwargs["second"] = int(time[6:8])
124 pdt = PartialDateTime(**kwargs)
126 return pdt, offset
129def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]:
130 """
131 Return horizontal dimension coordinate name(s) from a given cube.
133 Arguments
134 ---------
136 cube: iris.cube.Cube
137 An iris cube which will be checked to see if it contains coordinate
138 names that match a pre-defined list of acceptable horizontal
139 dimension coordinate names.
141 Returns
142 -------
143 (y_coord, x_coord)
144 A tuple containing the horizontal coordinate name for latitude and longitude respectively
145 found within the cube.
147 Raises
148 ------
149 ValueError
150 If a unique y/x horizontal coordinate cannot be found.
151 """
152 # Acceptable horizontal coordinate names.
153 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
154 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
156 # Get a list of dimension coordinate names for the cube
157 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
158 coord_names = [coord.name() for coord in cube.coords()]
160 # Check which x-coordinate we have, if any
161 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
162 if len(x_coords) != 1:
163 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES]
164 if len(x_coords) != 1:
165 raise ValueError("Could not identify a unique x-coordinate in cube")
167 # Check which y-coordinate we have, if any
168 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
169 if len(y_coords) != 1:
170 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES]
171 if len(y_coords) != 1:
172 raise ValueError("Could not identify a unique y-coordinate in cube")
174 return (y_coords[0], x_coords[0])
177def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int:
178 """
179 Return coordinate dimension for a named coordinate from a given cube.
181 Arguments
182 ---------
184 cube: iris.cube.Cube
185 An iris cube which will be checked to see if it contains coordinate
186 names that match a pre-defined list of acceptable horizontal
187 coordinate names.
189 coord_name: str
190 A cube dimension name
192 Returns
193 -------
194 coord_index
195 An integer specifying where in the cube dimension list a specified coordinate name is found.
197 Raises
198 ------
199 ValueError
200 If a specified dimension coordinate cannot be found.
201 """
202 # Get a list of dimension coordinate names for the cube
203 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
205 # Check if requested dimension is found in cube and get index
206 if coord_name in coord_names:
207 coord_index = cube.coord_dims(coord_name)[0]
208 else:
209 raise ValueError("Could not find requested dimension %s", coord_name)
211 return coord_index
214def is_spatialdim(cube: iris.cube.Cube) -> bool:
215 """Determine whether a cube is has two spatial dimension coordinates.
217 If cube has both spatial dims, it will contain two unique coordinates
218 that explain space (latitude and longitude). The coordinates have to
219 be iterable/contain usable dimension data, as cubes may contain these
220 coordinates as scalar dimensions after being collapsed.
222 Arguments
223 ---------
224 cube: iris.cube.Cube
225 An iris cube which will be checked to see if it contains coordinate
226 names that match a pre-defined list of acceptable coordinate names.
228 Returns
229 -------
230 bool
231 If true, then the cube has a spatial projection and thus can be plotted
232 as a map.
233 """
234 # Acceptable horizontal coordinate names.
235 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"]
236 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"]
238 # Get a list of coordinate names for the cube
239 coord_names = [coord.name() for coord in cube.dim_coords]
240 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES]
241 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES]
243 # If there is one coordinate for both x and y direction return True.
244 if len(x_coords) == 1 and len(y_coords) == 1:
245 return True
246 else:
247 return False
250def is_transect(cube: iris.cube.Cube) -> bool:
251 """Determine whether a cube is a transect.
253 If cube is a transect, it will contain only one spatial (map) coordinate,
254 and one vertical coordinate (either pressure or model level).
256 Arguments
257 ---------
258 cube: iris.cube.Cube
259 An iris cube which will be checked to see if it contains coordinate
260 names that match a pre-defined list of acceptable coordinate names.
262 Returns
263 -------
264 bool
265 If true, then the cube is a transect that contains one spatial (map)
266 coordinate and one vertical coordinate.
267 """
268 # Acceptable spatial (map) coordinate names.
269 SPATIAL_MAP_COORD_NAMES = [
270 "longitude",
271 "grid_longitude",
272 "projection_x_coordinate",
273 "x",
274 "latitude",
275 "grid_latitude",
276 "projection_y_coordinate",
277 "y",
278 "distance",
279 ]
281 # Acceptable vertical coordinate names
282 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"]
284 # Get a list of coordinate names for the cube
285 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
287 # Check which spatial coordinates we have.
288 spatial_coords = [
289 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES
290 ]
291 if len(spatial_coords) != 1:
292 return False
294 # Check which vertical coordinates we have.
295 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES]
296 if len(vertical_coords) != 1:
297 return False
299 # Passed criteria so return True
300 return True
303def fully_equalise_attributes(cubes: iris.cube.CubeList):
304 """Remove any unique attributes between cubes or coordinates in place."""
305 # Equalise cube attributes.
306 removed = iris.util.equalise_attributes(cubes)
307 logging.debug("Removed attributes from cube: %s", removed)
309 # Equalise coordinate attributes.
310 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes]
312 all_coords = set.union(*coord_sets)
313 coords_to_equalise = set.intersection(*coord_sets)
314 coords_to_remove = set.difference(all_coords, coords_to_equalise)
316 logging.debug("All coordinates: %s", all_coords)
317 logging.debug("Coordinates to remove: %s", coords_to_remove)
318 logging.debug("Coordinates to equalise: %s", coords_to_equalise)
320 for coord in coords_to_remove:
321 for cube in cubes:
322 try:
323 cube.remove_coord(coord)
324 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name())
325 except iris.exceptions.CoordinateNotFoundError:
326 pass
328 for coord in coords_to_equalise:
329 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes])
330 logging.debug("Removed attributes from coordinate %s: %s", coord, removed)
332 return cubes
335def is_time_aggregatable(cube: iris.cube.Cube) -> bool:
336 """Determine whether a cube can be aggregated in time.
338 If a cube is aggregatable it will contain both a 'forecast_reference_time'
339 and 'forecast_period' coordinate as dimensional coordinates.
341 Arguments
342 ---------
343 cube: iris.cube.Cube
344 An iris cube which will be checked to see if it is aggregatable based
345 on a set of pre-defined dimensional time coordinates:
346 'forecast_period' and 'forecast_reference_time'.
348 Returns
349 -------
350 bool
351 If true, then the cube is aggregatable and contains dimensional
352 coordinates including both 'forecast_reference_time' and
353 'forecast_period'.
354 """
355 # Acceptable time coordinate names for aggregatable cube.
356 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"]
358 # Coordinate names for the cube.
359 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)]
361 # Check which temporal coordinates we have.
362 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES]
363 # Return whether both coordinates are in the temporal coordinates.
364 return len(temporal_coords) == 2