Coverage for src / CSET / operators / collapse.py: 91%
154 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 14:08 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 14:08 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to perform various kind of collapse on either 1 or 2 dimensions."""
17import datetime
18import logging
19import warnings
21import iris
22import iris.analysis
23import iris.coord_categorisation
24import iris.coords
25import iris.cube
26import iris.exceptions
27import iris.util
28import numpy as np
30from CSET._common import iter_maybe
31from CSET.operators.aggregate import add_hour_coordinate
34def collapse(
35 cubes: iris.cube.Cube | iris.cube.CubeList,
36 coordinate: str | list[str],
37 method: str,
38 additional_percent: float = None,
39 **kwargs,
40) -> iris.cube.Cube | iris.cube.CubeList:
41 """Collapse coordinate(s) of a single cube or of every cube in a cube list.
43 Collapses similar fields in each cube into a cube collapsing around the
44 specified coordinate(s) and method. This could be a (weighted) mean or
45 percentile.
47 Arguments
48 ---------
49 cubes: iris.cube.Cube | iris.cube.CubeList
50 Cube or CubeList to collapse and iterate over one dimension
51 coordinate: str | list[str]
52 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude',
53 'model_level_number', 'realization'. A list of multiple coordinates can
54 be given.
55 method: str
56 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
57 'PERCENTILE' getattr creates iris.analysis.MEAN, etc. For PERCENTILE YAML
58 file requires i.e. method: 'PERCENTILE' additional_percent: 90.
59 additional_percent: float, optional
60 Required for the PERCENTILE method. This is a number between 0 and 100.
62 Returns
63 -------
64 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList
65 Single variable but several methods of aggregation
67 Raises
68 ------
69 ValueError
70 If additional_percent wasn't supplied while using PERCENTILE method.
71 """
72 if method == "SEQ" or method == "" or method is None: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 return cubes
74 if method == "PERCENTILE" and additional_percent is None:
75 raise ValueError("Must specify additional_percent")
77 # Retain only common time points between different models if multiple model inputs.
78 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
79 logging.debug(
80 "Extracting common time points as multiple model inputs detected."
81 )
82 for cube in cubes:
83 cube.coord("forecast_reference_time").bounds = None
84 cube.coord("forecast_period").bounds = None
85 cubes = cubes.extract_overlapping(
86 ["forecast_reference_time", "forecast_period"]
87 )
88 if len(cubes) == 0:
89 raise ValueError("No overlapping times detected in input cubes.")
91 collapsed_cubes = iris.cube.CubeList([])
92 with warnings.catch_warnings():
93 warnings.filterwarnings(
94 "ignore", "Cannot check if coordinate is contiguous", UserWarning
95 )
96 warnings.filterwarnings(
97 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning
98 )
99 for cube in iter_maybe(cubes):
100 # Apply a mask to check for invalid data, this will allow NaNs to
101 # be ignored.
102 cube.data = np.ma.masked_invalid(cube.data)
103 if method == "PERCENTILE":
104 collapsed_cubes.append(
105 cube.collapsed(
106 coordinate,
107 getattr(iris.analysis, method),
108 percent=additional_percent,
109 )
110 )
111 elif method == "RANGE": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 cube_max = cube.collapsed(coordinate, iris.analysis.MAX)
113 cube_min = cube.collapsed(coordinate, iris.analysis.MIN)
114 collapsed_cubes.append(cube_max - cube_min)
115 else:
116 collapsed_cubes.append(
117 cube.collapsed(coordinate, getattr(iris.analysis, method))
118 )
119 if len(collapsed_cubes) == 1:
120 return collapsed_cubes[0]
121 else:
122 return collapsed_cubes
125def collapse_by_hour_of_day(
126 cubes: iris.cube.Cube | iris.cube.CubeList,
127 method: str,
128 additional_percent: float = None,
129 **kwargs,
130) -> iris.cube.Cube:
131 """Collapse a cube by hour of the day.
133 Collapses a cube by hour of the day in the time coordinates provided by the
134 model. It is useful for creating diurnal cycle plots. It aggregates all 00
135 UTC together regardless of lead time.
137 Arguments
138 ---------
139 cubes: iris.cube.Cube | iris.cube.CubeList
140 Cube to collapse and iterate over one dimension or CubeList to convert
141 to a cube and then collapse prior to aggregating by hour. If a CubeList
142 is provided each cube is handled separately.
143 method: str
144 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
145 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
147 Returns
148 -------
149 cube: iris.cube.Cube
150 Single variable but several methods of aggregation.
152 Raises
153 ------
154 ValueError
155 If additional_percent wasn't supplied while using PERCENTILE method.
157 Notes
158 -----
159 Collapsing of the cube is around the 'time' coordinate. The coordinates are
160 first grouped by the hour of day, and then aggregated by the hour of day to
161 create a diurnal cycle. This operator is applicable for both single
162 forecasts and for multiple forecasts. The hour used is based on the units of
163 the time coordinate. If the time coordinate is in UTC, hour will be in UTC.
165 To apply this operator successfully there must only be one time dimension.
166 Should a MultiDim exception be raised the user first needs to apply the
167 collapse operator to reduce the time dimensions before applying this
168 operator. A cube containing the two time dimensions
169 'forecast_reference_time' and 'forecast_period' will be automatically
170 collapsed by lead time before being being collapsed by hour of day.
171 """
172 if method == "PERCENTILE" and additional_percent is None:
173 raise ValueError("Must specify additional_percent")
175 # Retain only common time points between different models if multiple model inputs.
176 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
177 logging.debug(
178 "Extracting common time points as multiple model inputs detected."
179 )
180 for cube in cubes:
181 cube.coord("forecast_reference_time").bounds = None
182 cube.coord("forecast_period").bounds = None
183 cubes = cubes.extract_overlapping(
184 ["forecast_reference_time", "forecast_period"]
185 )
186 if len(cubes) == 0:
187 raise ValueError("No overlapping times detected in input cubes.")
189 collapsed_cubes = iris.cube.CubeList([])
190 for cube in iter_maybe(cubes):
191 # Ensure hour coordinate in each input is sorted, and data adjusted if needed.
192 sorted_cube = iris.cube.CubeList()
193 for fcst_slice in cube.slices_over(["forecast_reference_time"]):
194 # Categorise the time coordinate by hour of the day.
195 fcst_slice = add_hour_coordinate(fcst_slice)
196 if method == "PERCENTILE":
197 by_hour = fcst_slice.aggregated_by(
198 "hour", getattr(iris.analysis, method), percent=additional_percent
199 )
200 else:
201 by_hour = fcst_slice.aggregated_by(
202 "hour", getattr(iris.analysis, method)
203 )
204 # Compute if data needs sorting to lie in increasing order [0..23].
205 # Note multiple forecasts can sit in same cube spanning different
206 # initialisation times and data ranges.
207 time_points = by_hour.coord("hour").points
208 time_points_sorted = np.sort(by_hour.coord("hour").points)
209 if time_points[0] != time_points_sorted[0]: 209 ↛ 217line 209 didn't jump to line 217 because the condition on line 209 was always true
210 nroll = time_points[0] / (time_points[1] - time_points[0])
211 # Shift hour coordinate and data cube to be in time of day order.
212 by_hour.coord("hour").points = np.roll(time_points, nroll, 0)
213 by_hour.data = np.roll(by_hour.data, nroll, axis=0)
215 # Remove unnecessary time coordinate.
216 # "hour" and "forecast_period" remain as AuxCoord.
217 by_hour.remove_coord("time")
219 sorted_cube.append(by_hour)
221 # Recombine cube slices.
222 cube = sorted_cube.merge_cube()
224 # Apply a mask to check for invalid data, this will allow NaNs to
225 # be ignored.
226 cube.data = np.ma.masked_invalid(cube.data)
228 if cube.coords("forecast_reference_time", dim_coords=True):
229 # Collapse by forecast reference time to get a single cube.
230 cube = collapse(
231 cube,
232 "forecast_reference_time",
233 method,
234 additional_percent=additional_percent,
235 )
236 else:
237 # Or remove forecast reference time if a single case, as collapse
238 # will have effectively done this.
239 cube.remove_coord("forecast_reference_time")
241 # Promote "hour" to dim_coord.
242 iris.util.promote_aux_coord_to_dim_coord(cube, "hour")
243 collapsed_cubes.append(cube)
245 if len(collapsed_cubes) == 1:
246 return collapsed_cubes[0]
247 else:
248 return collapsed_cubes
251def collapse_by_validity_time(
252 cubes: iris.cube.Cube | iris.cube.CubeList,
253 method: str,
254 additional_percent: float = None,
255 **kwargs,
256) -> iris.cube.Cube:
257 """Collapse a cube around validity time for multiple cases.
259 First checks if the data can be aggregated easily. Then creates a new cube
260 by slicing over the time dimensions, removing the time dimensions,
261 re-merging the data, and creating a new time coordinate. It then collapses
262 by the new time coordinate for a specified method using the collapse
263 function.
265 Arguments
266 ---------
267 cubes: iris.cube.Cube | iris.cube.CubeList
268 Cube to collapse by validity time or CubeList that will be converted
269 to a cube before collapsing by validity time.
270 method: str
271 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
272 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
274 Returns
275 -------
276 cube: iris.cube.Cube | iris.cube.CubeList
277 Single variable collapsed by lead time based on chosen method.
279 Raises
280 ------
281 ValueError
282 If additional_percent wasn't supplied while using PERCENTILE method.
283 """
284 if method == "PERCENTILE" and additional_percent is None:
285 raise ValueError("Must specify additional_percent")
287 collapsed_cubes = iris.cube.CubeList([])
288 for cube in iter_maybe(cubes):
289 # Slice over cube by both time dimensions to create a CubeList.
290 new_cubelist = iris.cube.CubeList(
291 cube.slices_over(["forecast_period", "forecast_reference_time"])
292 )
293 for sub_cube in new_cubelist:
294 # Reconstruct the time coordinate if it is missing.
295 if "time" not in [coord.name() for coord in sub_cube.coords()]:
296 ref_time_coord = sub_cube.coord("forecast_reference_time")
297 ref_units = ref_time_coord.units
298 ref_time = ref_units.num2date(ref_time_coord.points)
299 period_coord = sub_cube.coord("forecast_period")
300 period_units = period_coord.units
301 # Given how we are slicing there will only be one point.
302 period_seconds = period_units.convert(period_coord.points[0], "seconds")
303 period_duration = datetime.timedelta(seconds=period_seconds)
304 time = ref_time + period_duration
305 time_points = ref_units.date2num(time)
306 time_coord = iris.coords.AuxCoord(
307 points=time_points, standard_name="time", units=ref_units
308 )
309 sub_cube.add_aux_coord(time_coord)
310 # Remove forecast_period and forecast_reference_time coordinates.
311 sub_cube.remove_coord("forecast_period")
312 sub_cube.remove_coord("forecast_reference_time")
313 # Create new CubeList by merging with unique = False to produce a validity
314 # time cube.
315 merged_list_1 = new_cubelist.merge(unique=False)
316 # Create a new "fake" coordinate and apply to each remaining cube to allow
317 # final merging to take place into a single cube.
318 equalised_validity_time = iris.coords.AuxCoord(
319 points=0, long_name="equalised_validity_time", units="1"
320 )
321 for sub_cube, eq_valid_time in zip(
322 merged_list_1, range(len(merged_list_1)), strict=True
323 ):
324 sub_cube.add_aux_coord(equalised_validity_time.copy(points=eq_valid_time))
326 # Merge CubeList to create final cube.
327 final_cube = merged_list_1.merge_cube()
328 logging.debug("Pre-collapse validity time cube:\n%s", final_cube)
330 # Apply a mask to check for invalid data, this will allow NaNs to
331 # be ignored.
332 final_cube.data = np.ma.masked_invalid(final_cube.data)
334 # Collapse over equalised_validity_time as a proxy for equal validity
335 # time.
336 try:
337 collapsed_cube = collapse(
338 final_cube,
339 "equalised_validity_time",
340 method,
341 additional_percent=additional_percent,
342 )
343 except iris.exceptions.CoordinateCollapseError as err:
344 raise ValueError(
345 "Cubes do not overlap therefore cannot collapse across validity time."
346 ) from err
347 collapsed_cube.remove_coord("equalised_validity_time")
348 collapsed_cubes.append(collapsed_cube)
350 if len(collapsed_cubes) == 1:
351 return collapsed_cubes[0]
352 else:
353 return collapsed_cubes
356def proportion(
357 cubes: iris.cube.Cube | iris.cube.CubeList,
358 coordinate: str | list[str],
359 condition: str,
360 threshold: float,
361 **kwargs,
362) -> iris.cube.Cube | iris.cube.CubeList:
363 """Find the proportion of an event for all cubes.
365 Find the proportion of points at a specified threhsold in each cube into a
366 cube collapsing around the specified coordinate(s).
368 Arguments
369 ---------
370 cubes: iris.cube.Cube | iris.cube.CubeList
371 Cube or CubeList to collapse and iterate over one dimension
372 coordinate: str | list[str]
373 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude',
374 'model_level_number', 'realization'. A list of multiple coordinates can
375 be given.
376 condition: str
377 The condition for the event. Expected arguments are eq, ne, lt, gt, le, ge.
378 The letters correspond to the following conditions
379 eq: equal to;
380 ne: not equal to;
381 lt: less than;
382 gt: greater than;
383 le: less than or equal to;
384 ge: greater than or equal to.
385 threshold: float
386 The value for the event.
388 Returns
389 -------
390 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList
391 The proportion of the event.
392 """
393 # Set method
394 method = "PROPORTION"
395 # Retain only common time points between different models if multiple model inputs.
396 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 logging.debug(
398 "Extracting common time points as multiple model inputs detected."
399 )
400 for cube in cubes:
401 cube.coord("forecast_reference_time").bounds = None
402 cube.coord("forecast_period").bounds = None
403 cubes = cubes.extract_overlapping(
404 ["forecast_reference_time", "forecast_period"]
405 )
406 if len(cubes) == 0:
407 raise ValueError("No overlapping times detected in input cubes.")
409 collapsed_cubes = iris.cube.CubeList([])
410 with warnings.catch_warnings():
411 warnings.filterwarnings(
412 "ignore", "Cannot check if coordinate is contiguous", UserWarning
413 )
414 warnings.filterwarnings(
415 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning
416 )
417 for cube in iter_maybe(cubes):
418 # Apply a mask to check for invalid data, this will allow NaNs to
419 # be ignored.
420 cube.data = np.ma.masked_invalid(cube.data)
421 match condition:
422 case "eq":
423 new_cube = cube.collapsed(
424 coordinate,
425 getattr(iris.analysis, method),
426 function=lambda values: values == threshold,
427 )
428 case "ne":
429 new_cube = cube.collapsed(
430 coordinate,
431 getattr(iris.analysis, method),
432 function=lambda values: values != threshold,
433 )
434 case "gt":
435 new_cube = cube.collapsed(
436 coordinate,
437 getattr(iris.analysis, method),
438 function=lambda values: values > threshold,
439 )
440 case "ge":
441 new_cube = cube.collapsed(
442 coordinate,
443 getattr(iris.analysis, method),
444 function=lambda values: values >= threshold,
445 )
446 case "lt":
447 new_cube = cube.collapsed(
448 coordinate,
449 getattr(iris.analysis, method),
450 function=lambda values: values < threshold,
451 )
452 case "le":
453 new_cube = cube.collapsed(
454 coordinate,
455 getattr(iris.analysis, method),
456 function=lambda values: values <= threshold,
457 )
458 case _:
459 raise ValueError(
460 """Unexpected value for condition. Expected eq, ne, gt, ge, lt, le. Got {condition}."""
461 )
462 name = cube.long_name if cube.long_name else cube.name()
463 new_cube.rename(f"probability_of_{name}_{condition}_{threshold}")
464 new_cube.units = "1"
465 collapsed_cubes.append(new_cube)
467 if len(collapsed_cubes) == 1: 467 ↛ 470line 467 didn't jump to line 470 because the condition on line 467 was always true
468 return collapsed_cubes[0]
469 else:
470 return collapsed_cubes
473# TODO
474# Collapse function that calculates means, medians etc across members of an
475# ensemble or stratified groups. Need to allow collapse over realisation
476# dimension for fixed time. Hence will require reading in of CubeList