Coverage for src/CSET/operators/collapse.py: 96%
113 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to perform various kind of collapse on either 1 or 2 dimensions."""
17import datetime
18import logging
19import warnings
21import iris
22import iris.analysis
23import iris.coord_categorisation
24import iris.coords
25import iris.cube
26import iris.exceptions
27import iris.util
28import numpy as np
30from CSET._common import iter_maybe
31from CSET.operators.aggregate import add_hour_coordinate
34def collapse(
35 cubes: iris.cube.Cube | iris.cube.CubeList,
36 coordinate: str | list[str],
37 method: str,
38 additional_percent: float = None,
39 **kwargs,
40) -> iris.cube.Cube | iris.cube.CubeList:
41 """Collapse coordinate(s) of a single cube or of every cube in a cube list.
43 Collapses similar fields in each cube into a cube collapsing around the
44 specified coordinate(s) and method. This could be a (weighted) mean or
45 percentile.
47 Arguments
48 ---------
49 cubes: iris.cube.Cube | iris.cube.CubeList
50 Cube or CubeList to collapse and iterate over one dimension
51 coordinate: str | list[str]
52 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude',
53 'model_level_number', 'realization'. A list of multiple coordinates can
54 be given.
55 method: str
56 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
57 'PERCENTILE' getattr creates iris.analysis.MEAN, etc For PERCENTILE YAML
58 file requires i.e. method: 'PERCENTILE' additional_percent: 90
59 additional_percent: float, optional
60 Required for the PERCENTILE method. This is a number between 0 and 100.
62 Returns
63 -------
64 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList
65 Single variable but several methods of aggregation
67 Raises
68 ------
69 ValueError
70 If additional_percent wasn't supplied while using PERCENTILE method.
71 """
72 if method == "SEQ" or method == "" or method is None: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 return cubes
74 if method == "PERCENTILE" and additional_percent is None:
75 raise ValueError("Must specify additional_percent")
77 # Retain only common time points between different models if multiple model inputs.
78 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
79 logging.debug(
80 "Extracting common time points as multiple model inputs detected."
81 )
82 for cube in cubes:
83 cube.coord("forecast_reference_time").bounds = None
84 cube.coord("forecast_period").bounds = None
85 cubes = cubes.extract_overlapping(
86 ["forecast_reference_time", "forecast_period"]
87 )
88 if len(cubes) == 0:
89 raise ValueError("No overlapping times detected in input cubes.")
91 collapsed_cubes = iris.cube.CubeList([])
92 with warnings.catch_warnings():
93 warnings.filterwarnings(
94 "ignore", "Cannot check if coordinate is contiguous", UserWarning
95 )
96 warnings.filterwarnings(
97 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning
98 )
99 for cube in iter_maybe(cubes):
100 if method == "PERCENTILE":
101 collapsed_cubes.append(
102 cube.collapsed(
103 coordinate,
104 getattr(iris.analysis, method),
105 percent=additional_percent,
106 )
107 )
108 elif method == "RANGE": 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 cube_max = cube.collapsed(coordinate, iris.analysis.MAX)
110 cube_min = cube.collapsed(coordinate, iris.analysis.MIN)
111 collapsed_cubes.append(cube_max - cube_min)
112 else:
113 collapsed_cubes.append(
114 cube.collapsed(coordinate, getattr(iris.analysis, method))
115 )
116 if len(collapsed_cubes) == 1:
117 return collapsed_cubes[0]
118 else:
119 return collapsed_cubes
122def collapse_by_hour_of_day(
123 cubes: iris.cube.Cube | iris.cube.CubeList,
124 method: str,
125 additional_percent: float = None,
126 **kwargs,
127) -> iris.cube.Cube:
128 """Collapse a cube by hour of the day.
130 Collapses a cube by hour of the day in the time coordinates provided by the
131 model. It is useful for creating diurnal cycle plots. It aggregates all 00
132 UTC together regardless of lead time.
134 Arguments
135 ---------
136 cubes: iris.cube.Cube | iris.cube.CubeList
137 Cube to collapse and iterate over one dimension or CubeList to convert
138 to a cube and then collapse prior to aggregating by hour. If a CubeList
139 is provided each cube is handled separately.
140 method: str
141 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
142 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
144 Returns
145 -------
146 cube: iris.cube.Cube
147 Single variable but several methods of aggregation.
149 Raises
150 ------
151 ValueError
152 If additional_percent wasn't supplied while using PERCENTILE method.
154 Notes
155 -----
156 Collapsing of the cube is around the 'time' coordinate. The coordinates are
157 first grouped by the hour of day, and then aggregated by the hour of day to
158 create a diurnal cycle. This operator is applicable for both single
159 forecasts and for multiple forecasts. The hour used is based on the units of
160 the time coordinate. If the time coordinate is in UTC, hour will be in UTC.
162 To apply this operator successfully there must only be one time dimension.
163 Should a MultiDim exception be raised the user first needs to apply the
164 collapse operator to reduce the time dimensions before applying this
165 operator. A cube containing the two time dimensions
166 'forecast_reference_time' and 'forecast_period' will be automatically
167 collapsed by lead time before being being collapsed by hour of day.
168 """
169 if method == "PERCENTILE" and additional_percent is None:
170 raise ValueError("Must specify additional_percent")
172 # Retain only common time points between different models if multiple model inputs.
173 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1:
174 logging.debug(
175 "Extracting common time points as multiple model inputs detected."
176 )
177 for cube in cubes:
178 cube.coord("forecast_reference_time").bounds = None
179 cube.coord("forecast_period").bounds = None
180 cubes = cubes.extract_overlapping(
181 ["forecast_reference_time", "forecast_period"]
182 )
183 if len(cubes) == 0:
184 raise ValueError("No overlapping times detected in input cubes.")
186 collapsed_cubes = iris.cube.CubeList([])
187 for cube in iter_maybe(cubes):
188 # Ensure hour coordinate in each input is sorted, and data adjusted if needed.
189 sorted_cube = iris.cube.CubeList()
190 for fcst_slice in cube.slices_over(["forecast_reference_time"]):
191 # Categorise the time coordinate by hour of the day.
192 fcst_slice = add_hour_coordinate(fcst_slice)
193 if method == "PERCENTILE":
194 by_hour = fcst_slice.aggregated_by(
195 "hour", getattr(iris.analysis, method), percent=additional_percent
196 )
197 else:
198 by_hour = fcst_slice.aggregated_by(
199 "hour", getattr(iris.analysis, method)
200 )
201 # Compute if data needs sorting to lie in increasing order [0..23].
202 # Note multiple forecasts can sit in same cube spanning different
203 # initialisation times and data ranges.
204 time_points = by_hour.coord("hour").points
205 time_points_sorted = np.sort(by_hour.coord("hour").points)
206 if time_points[0] != time_points_sorted[0]: 206 ↛ 214line 206 didn't jump to line 214 because the condition on line 206 was always true
207 nroll = time_points[0] / (time_points[1] - time_points[0])
208 # Shift hour coordinate and data cube to be in time of day order.
209 by_hour.coord("hour").points = np.roll(time_points, nroll, 0)
210 by_hour.data = np.roll(by_hour.data, nroll, axis=0)
212 # Remove unnecessary time coordinate.
213 # "hour" and "forecast_period" remain as AuxCoord.
214 by_hour.remove_coord("time")
216 sorted_cube.append(by_hour)
218 # Recombine cube slices.
219 cube = sorted_cube.merge_cube()
221 if cube.coords("forecast_reference_time", dim_coords=True):
222 # Collapse by forecast reference time to get a single cube.
223 cube = collapse(
224 cube,
225 "forecast_reference_time",
226 method,
227 additional_percent=additional_percent,
228 )
229 else:
230 # Or remove forecast reference time if a single case, as collapse
231 # will have effectively done this.
232 cube.remove_coord("forecast_reference_time")
234 # Promote "hour" to dim_coord.
235 iris.util.promote_aux_coord_to_dim_coord(cube, "hour")
236 collapsed_cubes.append(cube)
238 if len(collapsed_cubes) == 1:
239 return collapsed_cubes[0]
240 else:
241 return collapsed_cubes
244def collapse_by_validity_time(
245 cubes: iris.cube.Cube | iris.cube.CubeList,
246 method: str,
247 additional_percent: float = None,
248 **kwargs,
249) -> iris.cube.Cube:
250 """Collapse a cube around validity time for multiple cases.
252 First checks if the data can be aggregated easily. Then creates a new cube
253 by slicing over the time dimensions, removing the time dimensions,
254 re-merging the data, and creating a new time coordinate. It then collapses
255 by the new time coordinate for a specified method using the collapse
256 function.
258 Arguments
259 ---------
260 cubes: iris.cube.Cube | iris.cube.CubeList
261 Cube to collapse by validity time or CubeList that will be converted
262 to a cube before collapsing by validity time.
263 method: str
264 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN',
265 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified.
267 Returns
268 -------
269 cube: iris.cube.Cube | iris.cube.CubeList
270 Single variable collapsed by lead time based on chosen method.
272 Raises
273 ------
274 ValueError
275 If additional_percent wasn't supplied while using PERCENTILE method.
276 """
277 if method == "PERCENTILE" and additional_percent is None:
278 raise ValueError("Must specify additional_percent")
280 collapsed_cubes = iris.cube.CubeList([])
281 for cube in iter_maybe(cubes):
282 # Slice over cube by both time dimensions to create a CubeList.
283 new_cubelist = iris.cube.CubeList(
284 cube.slices_over(["forecast_period", "forecast_reference_time"])
285 )
286 for sub_cube in new_cubelist:
287 # Reconstruct the time coordinate if it is missing.
288 if "time" not in [coord.name() for coord in sub_cube.coords()]:
289 ref_time_coord = sub_cube.coord("forecast_reference_time")
290 ref_units = ref_time_coord.units
291 ref_time = ref_units.num2date(ref_time_coord.points)
292 period_coord = sub_cube.coord("forecast_period")
293 period_units = period_coord.units
294 # Given how we are slicing there will only be one point.
295 period_seconds = period_units.convert(period_coord.points[0], "seconds")
296 period_duration = datetime.timedelta(seconds=period_seconds)
297 time = ref_time + period_duration
298 time_points = ref_units.date2num(time)
299 time_coord = iris.coords.AuxCoord(
300 points=time_points, standard_name="time", units=ref_units
301 )
302 sub_cube.add_aux_coord(time_coord)
303 # Remove forecast_period and forecast_reference_time coordinates.
304 sub_cube.remove_coord("forecast_period")
305 sub_cube.remove_coord("forecast_reference_time")
306 # Create new CubeList by merging with unique = False to produce a validity
307 # time cube.
308 merged_list_1 = new_cubelist.merge(unique=False)
309 # Create a new "fake" coordinate and apply to each remaining cube to allow
310 # final merging to take place into a single cube.
311 equalised_validity_time = iris.coords.AuxCoord(
312 points=0, long_name="equalised_validity_time", units="1"
313 )
314 for sub_cube, eq_valid_time in zip(
315 merged_list_1, range(len(merged_list_1)), strict=True
316 ):
317 sub_cube.add_aux_coord(equalised_validity_time.copy(points=eq_valid_time))
319 # Merge CubeList to create final cube.
320 final_cube = merged_list_1.merge_cube()
321 logging.debug("Pre-collapse validity time cube:\n%s", final_cube)
323 # Collapse over equalised_validity_time as a proxy for equal validity
324 # time.
325 try:
326 collapsed_cube = collapse(
327 final_cube,
328 "equalised_validity_time",
329 method,
330 additional_percent=additional_percent,
331 )
332 except iris.exceptions.CoordinateCollapseError as err:
333 raise ValueError(
334 "Cubes do not overlap therefore cannot collapse across validity time."
335 ) from err
336 collapsed_cube.remove_coord("equalised_validity_time")
337 collapsed_cubes.append(collapsed_cube)
339 if len(collapsed_cubes) == 1:
340 return collapsed_cubes[0]
341 else:
342 return collapsed_cubes
345# TODO
346# Collapse function that calculates means, medians etc across members of an
347# ensemble or stratified groups. Need to allow collapse over realisation
348# dimension for fixed time. Hence will require reading in of CubeList