Coverage for src / CSET / operators / aggregate.py: 61%
120 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 15:02 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 15:02 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to aggregate across either 1 or 2 dimensions."""
17import logging
19import iris
20import iris.analysis
21import iris.coord_categorisation
22import iris.cube
23import iris.exceptions
24import iris.util
25import isodate
26import numpy as np
28from CSET._common import iter_maybe
29from CSET.operators._utils import (
30 identify_unique_times,
31 is_time_aggregatable,
32 is_time_aux_coord,
33 remove_cell_method,
34 remove_duplicates,
35)
38def _add_nref(cube: iris.cube.Cube):
39 """Retain information on number of forecast_reference_time inputs.
41 This preserves information on number of aggregated cases that can
42 otherwise be lost on subsequent calls to collapse functions.
43 """
44 nref = np.size(cube.coord("forecast_reference_time").points)
45 cube.coord("time").attributes["number_reference_times"] = nref
46 return cube
49def time_aggregate(
50 cube: iris.cube.Cube,
51 method: str,
52 interval_iso: str,
53 **kwargs,
54) -> iris.cube.Cube:
55 """Aggregate cube by its time coordinate.
57 Aggregates similar (stash) fields in a cube for the specified coordinate and
58 using the method supplied. The aggregated cube will keep the coordinate and
59 add a further coordinate with the aggregated end time points.
61 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations
62 given an interval for the new time coordinate.
64 We use the isodate class to convert ISO 8601 durations into time intervals
65 for creating a new time coordinate for aggregation.
67 We use the lambda function to pass coord and interval into the callable
68 category function in add_categorised to allow users to define their own
69 sub-daily intervals for the new time coordinate.
71 Arguments
72 ---------
73 cube: iris.cube.Cube
74 Cube to aggregate and iterate over one dimension
75 coordinate: str
76 Coordinate to aggregate over i.e. 'time', 'longitude',
77 'latitude','model_level_number'.
78 method: str
79 Type of aggregate i.e. method: 'SUM', getattr creates
80 iris.analysis.SUM, etc.
81 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins)
82 Interval to aggregate over.
84 Returns
85 -------
86 cube: iris.cube.Cube
87 Single variable but several methods of aggregation
89 Raises
90 ------
91 ValueError
92 If the constraint doesn't produce a single cube containing a field.
93 """
94 # Duration of ISO timedelta.
95 timedelta = isodate.parse_duration(interval_iso)
97 # Convert interval format to whole hours.
98 interval = int(timedelta.total_seconds() / 3600)
100 # Add time categorisation overwriting hourly increment via lambda coord.
101 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html
102 iris.coord_categorisation.add_categorised_coord(
103 cube, "interval", "time", lambda coord, cell: cell // interval * interval
104 )
106 # Aggregate cube using supplied method.
107 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method))
108 aggregated_cube.remove_coord("interval")
109 return aggregated_cube
112def ensure_aggregatable_across_cases(
113 cubes: iris.cube.Cube | iris.cube.CubeList,
114 time_coord_name: str,
115) -> iris.cube.CubeList:
116 """Ensure a Cube or CubeList can be aggregated across multiple cases.
118 The cubes are grouped into buckets of compatible cubes. Each bucket is then
119 processed to create aggregatable cubes. The function handles two types of
120 time coordinates:
122 - Dimension coordinates: Cubes with ``forecast_period`` and
123 ``forecast_reference_time`` as dimension coordinates are sliced and merged.
124 - Auxiliary coordinates: Cubes with time as an auxiliary coordinate are
125 aggregated at each unique time point and then merged.
127 Arguments
128 ---------
129 cubes: iris.cube.Cube | iris.cube.CubeList
130 Each cube is checked to determine if it has the necessary time
131 coordinates (either as dimension or auxiliary coordinates) to be
132 aggregatable, being processed if needed.
133 time_coord_name: str
134 Name of the time coordinate to aggregate over, typically "time" or
135 "forecast_period".
137 Returns
138 -------
139 cubes: iris.cube.CubeList
140 A CubeList of time aggregatable cubes. Each cube will have a
141 ``number_reference_times`` attribute on its time coordinate indicating
142 the number of forecast reference times aggregated.
144 Raises
145 ------
146 ValueError
147 If any of the provided cubes cannot be made aggregatable (i.e., missing
148 required time coordinates).
150 Notes
151 -----
152 This operator is designed to ensure that cubes can be aggregated across
153 multiple cases (e.g., different model runs or forecast times). Its
154 functionality is particularly useful for case study or trial aggregation
155 when computing statistics such as percentiles, Q-Q plots, and histograms.
157 For cubes to be aggregatable, they must have either:
159 - ``forecast_period`` and ``forecast_reference_time`` as dimension
160 or auxiliary coordinates
161 """
163 # Group compatible cubes.
164 class Buckets:
165 def __init__(self):
166 self.buckets = []
168 def add(self, cube: iris.cube.Cube):
169 """Add a cube into a bucket.
171 If the cube is compatible with an existing bucket it is added there.
172 Otherwise it gets its own bucket.
173 """
174 for bucket in self.buckets:
175 if bucket[0].is_compatible(cube):
176 bucket.append(cube)
177 return
178 self.buckets.append(iris.cube.CubeList([cube]))
180 def get_buckets(self) -> list[iris.cube.CubeList]:
181 return self.buckets
183 b = Buckets()
184 for cube in iter_maybe(cubes):
185 b.add(cube)
186 buckets = b.get_buckets()
188 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets))
190 # Ensure each bucket is a single aggregatable cube.
191 aggregatable_cubes = iris.cube.CubeList()
192 for bucket in buckets:
193 # Single cubes that are already aggregatable won't need processing.
194 if len(bucket) == 1 and is_time_aggregatable(bucket[0]):
195 aggregatable_cube = bucket[0]
196 aggregatable_cube = _add_nref(aggregatable_cube)
197 aggregatable_cubes.append(aggregatable_cube)
198 continue
200 # Check if all cubes in bucket are time_aggregatable
201 if all(is_time_aggregatable(cube) for cube in bucket):
202 to_merge = iris.cube.CubeList()
203 for cube in bucket:
204 try:
205 to_merge.extend(
206 cube.slices_over(["forecast_period", "forecast_reference_time"])
207 )
208 except iris.exceptions.CoordinateNotFoundError as err:
209 raise ValueError(
210 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.",
211 cube,
212 ) from err
213 aggregatable_cube = to_merge.merge_cube()
215 # Add attribute on number of forecast_reference_times
216 aggregatable_cube = _add_nref(aggregatable_cube)
218 aggregatable_cubes.append(aggregatable_cube)
220 elif all(is_time_aux_coord(cube) for cube in bucket): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 times = identify_unique_times(bucket, time_coord_name)
223 aggregated_list = []
224 for time in times:
225 for cube in bucket:
226 aggregated_list.extend(
227 _aggregate_without_time_dimcoords(
228 cube, time, iris.analysis.MEAN, None
229 )
230 )
232 cubes_to_merge = iris.cube.CubeList(aggregated_list)
234 aggregatable_cube = cubes_to_merge.merge_cube()
235 aggregatable_cube = _add_nref(aggregatable_cube)
236 aggregatable_cubes.append(aggregatable_cube)
238 else:
239 raise ValueError(
240 "Cube is missing required dimension or auxiliary time coordinates",
241 bucket,
242 )
244 return aggregatable_cubes
247def add_hour_coordinate(
248 cubes: iris.cube.Cube | iris.cube.CubeList,
249) -> iris.cube.Cube | iris.cube.CubeList:
250 """Add a category coordinate of hour of day to a Cube or CubeList.
252 Arguments
253 ---------
254 cubes: iris.cube.Cube | iris.cube.CubeList
255 Cube of any variable that has a time coordinate.
256 Note input Cube or CubeList items should only have 1 time dimension.
258 Returns
259 -------
260 cube: iris.cube.Cube
261 A Cube with an additional auxiliary coordinate of hour.
263 Notes
264 -----
265 This is a simple operator designed to be used prior to case aggregation for
266 histograms, Q-Q plots, and percentiles when aggregated by hour of day.
267 """
268 new_cubelist = iris.cube.CubeList()
269 for cube in iter_maybe(cubes):
270 # Add a category coordinate of hour into each cube.
271 iris.util.promote_aux_coord_to_dim_coord(cube, "time")
272 iris.coord_categorisation.add_hour(cube, "time", name="hour")
273 cube.coord("hour").units = "hours"
274 new_cubelist.append(cube)
276 if len(new_cubelist) == 1:
277 return new_cubelist[0]
278 else:
279 return new_cubelist
282def rolling_window_time_aggregation(
283 cubes: iris.cube.Cube | iris.cube.CubeList, method: str, window: int
284) -> iris.cube.Cube | iris.cube.CubeList:
285 """Aggregate a cube along the time dimension using a rolling window.
287 Arguments
288 ---------
289 cubes: iris.cube.Cube | iris.cube.CubeList
290 Cube or Cubelist of any variable to be aggregated over a rolling window
291 in time.
292 method: str
293 Type of aggregate i.e. method: 'MAX', getattr creates
294 iris.analysis.MAX, etc.
295 window: int
296 The rolling window size.
298 Returns
299 -------
300 cube: iris.cube.Cube | iris.cube.CubeList
301 A Cube or Cubelist of the rolling window aggregate. The Cubes will have
302 a time dimension that is reduced in size to the original cube by the
303 window size.
305 Notes
306 -----
307 This operator is designed to be used to help create daily maxima and minima
308 for any variable.
309 """
310 new_cubelist = iris.cube.CubeList()
311 for cube in iter_maybe(cubes):
312 # Use a rolling window in time to applied specified aggregation method
313 # over a specified window length.
314 window_cube = cube.rolling_window(
315 "time", getattr(iris.analysis, method), window
316 )
317 new_cubelist.append(window_cube)
319 if len(new_cubelist) == 1:
320 return new_cubelist[0]
321 else:
322 return new_cubelist
325def _aggregate_without_time_dimcoords(cubes, time_coord, aggregator, percentile):
326 """Aggregate cubes at a specific time without time dimension coordinates.
328 Arguments
329 ---------
330 cubes: iris.cube.CubeList
331 Cubes to aggregate.
332 time_coord: iris.coords.Coord
333 Single time coordinate point to aggregate at.
334 aggregator: iris.analysis.Aggregator
335 Aggregation method (e.g., iris.analysis.MEAN).
336 percentile: float, optional
337 Percentile value if using PERCENTILE aggregator.
338 """
339 # Handle single cube input
340 if isinstance(cubes, iris.cube.Cube):
341 cubes = iris.cube.CubeList([cubes])
343 # Check the supplied time coordinate to make sure it corresponds to a
344 # single time only
345 if len(time_coord.points) != 1:
346 raise ValueError("Time coordinate should specify a single time only")
348 # Remove any duplicate cubes in the input
349 cubes = remove_duplicates(cubes)
351 time_coord_name = time_coord.name()
353 time_constraint = iris.Constraint(
354 coord_values={time_coord_name: lambda cell: cell.point in time_coord.cells()}
355 )
356 cubes_at_time = cubes.extract(time_constraint)
358 # Add a temporary "number" coordinate to uniquely label the different
359 # data points at this time
360 number = 0
361 numbered_cubes = iris.cube.CubeList()
362 for cube in cubes_at_time:
363 for slc in cube.slices_over(time_coord_name):
364 number_coord = iris.coords.AuxCoord(number, long_name="number")
365 slc.add_aux_coord(number_coord)
366 numbered_cubes.append(slc)
367 number += 1
368 cubes_at_time = numbered_cubes
370 cubes_at_time = cubes_at_time.merge()
372 aggregated_cubes = iris.cube.CubeList()
373 for cube in cubes_at_time:
374 # If there was only a single data point at this time, then "number"
375 # will be a scalar coordinate. If so, make it a dimension coordinate
376 # to allow collapsing
377 if not cube.coord_dims("number"):
378 cube = iris.util.new_axis(cube, scalar_coord="number")
380 num_cases = cube.coord("number").points.size
381 num_cases_coord = iris.coords.AuxCoord(num_cases, long_name="num_cases")
382 cube.add_aux_coord(num_cases_coord)
384 # Do aggregation across the temporary "number" coordinate
385 if isinstance(aggregator, type(iris.analysis.PERCENTILE)):
386 cube = cube.collapsed("number", aggregator, percent=percentile)
387 else:
388 cube = cube.collapsed("number", aggregator)
390 # Now remove the "number" coordinate and its cell method
391 cube.remove_coord("number")
392 cell_method = iris.coords.CellMethod(aggregator.name(), coords="number")
393 cube = remove_cell_method(cube, cell_method)
395 aggregated_cubes.append(cube)
397 return aggregated_cubes