Coverage for src/CSET/operators/collapse.py: 97%
136 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 12:53 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 12:53 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to perform various kind of collapse on either 1 or 2 dimensions."""
17import datetime
18import logging
19import warnings
21import iris
22import iris.analysis
23import iris.coord_categorisation
24import iris.coords
25import iris.cube
26import iris.exceptions
27import iris.util
28import numpy as np
30from CSET._common import iter_maybe
31from CSET.operators.aggregate import add_hour_coordinate
34def collapse(
35 cubes: iris.cube.Cube | iris.cube.CubeList,
36 coordinate: str | list[str],
37 method: str,
38 additional_percent: float = None,
39 condition: str = None,
40 threshold: float = None,
41 **kwargs,
42) -> iris.cube.Cube | iris.cube.CubeList:
43 """Collapse coordinate(s) of a single cube or of every cube in a cube list.
45 Collapses similar fields in each cube into a cube collapsing around the
46 specified coordinate(s) and method. This could be a (weighted) mean or
47 percentile.
49 Arguments
50 ---------
51 cubes: iris.cube.Cube | iris.cube.CubeList
52 Cube or CubeList to collapse and iterate over one dimension
53 coordinate: str | list[str]
54 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude',
55 'model_level_number', 'realization'. A list of multiple coordinates can
56 be given.
57 method: str
58 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
59 'PERCENTILE' getattr creates iris.analysis.MEAN, etc. For PERCENTILE YAML
60 file requires i.e. method: 'PERCENTILE' additional_percent: 90. For
61 PROPORTION YAML file requires i.e. method: 'PROPORTION', condition: lt, threshold: 273.15.
62 additional_percent: float, optional
63 Required for the PERCENTILE method. This is a number between 0 and 100.
64 condition: str, optional
65 Required for the PROPORTION method. Expected arguments are eq, ne, lt, gt, le, ge.
66 The letters correspond to the following conditions
67 eq: equal to;
68 ne: not equal to;
69 lt: less than;
70 gt: greater than;
71 le: less than or equal to;
72 ge: greater than or equal to.
73 threshold: float, optional
74 Required for the PROPORTION method.
76 Returns
77 -------
78 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList
79 Single variable but several methods of aggregation
81 Raises
82 ------
83 ValueError
84 If additional_percent wasn't supplied while using PERCENTILE method.
85 If condition wasn't supplied while using PROPORTION method.
86 If threshold wasn't supplied while using PROPORTION method.
87 """
88 if method == "SEQ" or method == "" or method is None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return cubes
90 if method == "PERCENTILE" and additional_percent is None:
91 raise ValueError("Must specify additional_percent")
92 if method == "PROPORTION" and condition is None:
93 raise ValueError("Must specify a condition for the probability")
94 if method == "PROPORTION" and threshold is None:
95 raise ValueError("Must specify a threshold for the probability")
97 # Retain only common time points between different models if multiple model inputs.
98 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
99 logging.debug(
100 "Extracting common time points as multiple model inputs detected."
101 )
102 for cube in cubes:
103 cube.coord("forecast_reference_time").bounds = None
104 cube.coord("forecast_period").bounds = None
105 cubes = cubes.extract_overlapping(
106 ["forecast_reference_time", "forecast_period"]
107 )
108 if len(cubes) == 0:
109 raise ValueError("No overlapping times detected in input cubes.")
111 collapsed_cubes = iris.cube.CubeList([])
112 with warnings.catch_warnings():
113 warnings.filterwarnings(
114 "ignore", "Cannot check if coordinate is contiguous", UserWarning
115 )
116 warnings.filterwarnings(
117 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning
118 )
119 for cube in iter_maybe(cubes):
120 if method == "PERCENTILE":
121 collapsed_cubes.append(
122 cube.collapsed(
123 coordinate,
124 getattr(iris.analysis, method),
125 percent=additional_percent,
126 )
127 )
128 elif method == "RANGE": 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 cube_max = cube.collapsed(coordinate, iris.analysis.MAX)
130 cube_min = cube.collapsed(coordinate, iris.analysis.MIN)
131 collapsed_cubes.append(cube_max - cube_min)
132 elif method == "PROPORTION":
133 match condition:
134 case "eq":
135 new_cube = cube.collapsed(
136 coordinate,
137 getattr(iris.analysis, method),
138 function=lambda values: values == threshold,
139 )
140 case "ne":
141 new_cube = cube.collapsed(
142 coordinate,
143 getattr(iris.analysis, method),
144 function=lambda values: values != threshold,
145 )
146 case "gt":
147 new_cube = cube.collapsed(
148 coordinate,
149 getattr(iris.analysis, method),
150 function=lambda values: values > threshold,
151 )
152 case "ge":
153 new_cube = cube.collapsed(
154 coordinate,
155 getattr(iris.analysis, method),
156 function=lambda values: values >= threshold,
157 )
158 case "lt":
159 new_cube = cube.collapsed(
160 coordinate,
161 getattr(iris.analysis, method),
162 function=lambda values: values < threshold,
163 )
164 case "le":
165 new_cube = cube.collapsed(
166 coordinate,
167 getattr(iris.analysis, method),
168 function=lambda values: values <= threshold,
169 )
170 case _:
171 raise ValueError(
172 """Unexpected value for condition. Expected eq, ne, gt, ge, lt, le. Got {condition}."""
173 )
174 new_cube.rename(f"probability_of_{cube.name()}_{condition}_{threshold}")
175 new_cube.units = "1"
176 collapsed_cubes.append(new_cube)
177 else:
178 collapsed_cubes.append(
179 cube.collapsed(coordinate, getattr(iris.analysis, method))
180 )
181 if len(collapsed_cubes) == 1:
182 return collapsed_cubes[0]
183 else:
184 return collapsed_cubes
187def collapse_by_hour_of_day(
188 cubes: iris.cube.Cube | iris.cube.CubeList,
189 method: str,
190 additional_percent: float = None,
191 **kwargs,
192) -> iris.cube.Cube:
193 """Collapse a cube by hour of the day.
195 Collapses a cube by hour of the day in the time coordinates provided by the
196 model. It is useful for creating diurnal cycle plots. It aggregates all 00
197 UTC together regardless of lead time.
199 Arguments
200 ---------
201 cubes: iris.cube.Cube | iris.cube.CubeList
202 Cube to collapse and iterate over one dimension or CubeList to convert
203 to a cube and then collapse prior to aggregating by hour. If a CubeList
204 is provided each cube is handled separately.
205 method: str
206 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
207 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
209 Returns
210 -------
211 cube: iris.cube.Cube
212 Single variable but several methods of aggregation.
214 Raises
215 ------
216 ValueError
217 If additional_percent wasn't supplied while using PERCENTILE method.
219 Notes
220 -----
221 Collapsing of the cube is around the 'time' coordinate. The coordinates are
222 first grouped by the hour of day, and then aggregated by the hour of day to
223 create a diurnal cycle. This operator is applicable for both single
224 forecasts and for multiple forecasts. The hour used is based on the units of
225 the time coordinate. If the time coordinate is in UTC, hour will be in UTC.
227 To apply this operator successfully there must only be one time dimension.
228 Should a MultiDim exception be raised the user first needs to apply the
229 collapse operator to reduce the time dimensions before applying this
230 operator. A cube containing the two time dimensions
231 'forecast_reference_time' and 'forecast_period' will be automatically
232 collapsed by lead time before being being collapsed by hour of day.
233 """
234 if method == "PERCENTILE" and additional_percent is None:
235 raise ValueError("Must specify additional_percent")
237 # Retain only common time points between different models if multiple model inputs.
238 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
239 logging.debug(
240 "Extracting common time points as multiple model inputs detected."
241 )
242 for cube in cubes:
243 cube.coord("forecast_reference_time").bounds = None
244 cube.coord("forecast_period").bounds = None
245 cubes = cubes.extract_overlapping(
246 ["forecast_reference_time", "forecast_period"]
247 )
248 if len(cubes) == 0:
249 raise ValueError("No overlapping times detected in input cubes.")
251 collapsed_cubes = iris.cube.CubeList([])
252 for cube in iter_maybe(cubes):
253 # Ensure hour coordinate in each input is sorted, and data adjusted if needed.
254 sorted_cube = iris.cube.CubeList()
255 for fcst_slice in cube.slices_over(["forecast_reference_time"]):
256 # Categorise the time coordinate by hour of the day.
257 fcst_slice = add_hour_coordinate(fcst_slice)
258 if method == "PERCENTILE":
259 by_hour = fcst_slice.aggregated_by(
260 "hour", getattr(iris.analysis, method), percent=additional_percent
261 )
262 else:
263 by_hour = fcst_slice.aggregated_by(
264 "hour", getattr(iris.analysis, method)
265 )
266 # Compute if data needs sorting to lie in increasing order [0..23].
267 # Note multiple forecasts can sit in same cube spanning different
268 # initialisation times and data ranges.
269 time_points = by_hour.coord("hour").points
270 time_points_sorted = np.sort(by_hour.coord("hour").points)
271 if time_points[0] != time_points_sorted[0]: 271 ↛ 279line 271 didn't jump to line 279 because the condition on line 271 was always true
272 nroll = time_points[0] / (time_points[1] - time_points[0])
273 # Shift hour coordinate and data cube to be in time of day order.
274 by_hour.coord("hour").points = np.roll(time_points, nroll, 0)
275 by_hour.data = np.roll(by_hour.data, nroll, axis=0)
277 # Remove unnecessary time coordinate.
278 # "hour" and "forecast_period" remain as AuxCoord.
279 by_hour.remove_coord("time")
281 sorted_cube.append(by_hour)
283 # Recombine cube slices.
284 cube = sorted_cube.merge_cube()
286 if cube.coords("forecast_reference_time", dim_coords=True):
287 # Collapse by forecast reference time to get a single cube.
288 cube = collapse(
289 cube,
290 "forecast_reference_time",
291 method,
292 additional_percent=additional_percent,
293 )
294 else:
295 # Or remove forecast reference time if a single case, as collapse
296 # will have effectively done this.
297 cube.remove_coord("forecast_reference_time")
299 # Promote "hour" to dim_coord.
300 iris.util.promote_aux_coord_to_dim_coord(cube, "hour")
301 collapsed_cubes.append(cube)
303 if len(collapsed_cubes) == 1:
304 return collapsed_cubes[0]
305 else:
306 return collapsed_cubes
309def collapse_by_validity_time(
310 cubes: iris.cube.Cube | iris.cube.CubeList,
311 method: str,
312 additional_percent: float = None,
313 **kwargs,
314) -> iris.cube.Cube:
315 """Collapse a cube around validity time for multiple cases.
317 First checks if the data can be aggregated easily. Then creates a new cube
318 by slicing over the time dimensions, removing the time dimensions,
319 re-merging the data, and creating a new time coordinate. It then collapses
320 by the new time coordinate for a specified method using the collapse
321 function.
323 Arguments
324 ---------
325 cubes: iris.cube.Cube | iris.cube.CubeList
326 Cube to collapse by validity time or CubeList that will be converted
327 to a cube before collapsing by validity time.
328 method: str
329 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
330 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
332 Returns
333 -------
334 cube: iris.cube.Cube | iris.cube.CubeList
335 Single variable collapsed by lead time based on chosen method.
337 Raises
338 ------
339 ValueError
340 If additional_percent wasn't supplied while using PERCENTILE method.
341 """
342 if method == "PERCENTILE" and additional_percent is None:
343 raise ValueError("Must specify additional_percent")
345 collapsed_cubes = iris.cube.CubeList([])
346 for cube in iter_maybe(cubes):
347 # Slice over cube by both time dimensions to create a CubeList.
348 new_cubelist = iris.cube.CubeList(
349 cube.slices_over(["forecast_period", "forecast_reference_time"])
350 )
351 for sub_cube in new_cubelist:
352 # Reconstruct the time coordinate if it is missing.
353 if "time" not in [coord.name() for coord in sub_cube.coords()]:
354 ref_time_coord = sub_cube.coord("forecast_reference_time")
355 ref_units = ref_time_coord.units
356 ref_time = ref_units.num2date(ref_time_coord.points)
357 period_coord = sub_cube.coord("forecast_period")
358 period_units = period_coord.units
359 # Given how we are slicing there will only be one point.
360 period_seconds = period_units.convert(period_coord.points[0], "seconds")
361 period_duration = datetime.timedelta(seconds=period_seconds)
362 time = ref_time + period_duration
363 time_points = ref_units.date2num(time)
364 time_coord = iris.coords.AuxCoord(
365 points=time_points, standard_name="time", units=ref_units
366 )
367 sub_cube.add_aux_coord(time_coord)
368 # Remove forecast_period and forecast_reference_time coordinates.
369 sub_cube.remove_coord("forecast_period")
370 sub_cube.remove_coord("forecast_reference_time")
371 # Create new CubeList by merging with unique = False to produce a validity
372 # time cube.
373 merged_list_1 = new_cubelist.merge(unique=False)
374 # Create a new "fake" coordinate and apply to each remaining cube to allow
375 # final merging to take place into a single cube.
376 equalised_validity_time = iris.coords.AuxCoord(
377 points=0, long_name="equalised_validity_time", units="1"
378 )
379 for sub_cube, eq_valid_time in zip(
380 merged_list_1, range(len(merged_list_1)), strict=True
381 ):
382 sub_cube.add_aux_coord(equalised_validity_time.copy(points=eq_valid_time))
384 # Merge CubeList to create final cube.
385 final_cube = merged_list_1.merge_cube()
386 logging.debug("Pre-collapse validity time cube:\n%s", final_cube)
388 # Collapse over equalised_validity_time as a proxy for equal validity
389 # time.
390 try:
391 collapsed_cube = collapse(
392 final_cube,
393 "equalised_validity_time",
394 method,
395 additional_percent=additional_percent,
396 )
397 except iris.exceptions.CoordinateCollapseError as err:
398 raise ValueError(
399 "Cubes do not overlap therefore cannot collapse across validity time."
400 ) from err
401 collapsed_cube.remove_coord("equalised_validity_time")
402 collapsed_cubes.append(collapsed_cube)
404 if len(collapsed_cubes) == 1:
405 return collapsed_cubes[0]
406 else:
407 return collapsed_cubes
410# TODO
411# Collapse function that calculates means, medians etc across members of an
412# ensemble or stratified groups. Need to allow collapse over realisation
413# dimension for fixed time. Hence will require reading in of CubeList