Coverage for src / CSET / operators / aggregate.py: 98%
76 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to aggregate across either 1 or 2 dimensions."""
17import logging
19import iris
20import iris.analysis
21import iris.coord_categorisation
22import iris.cube
23import iris.exceptions
24import iris.util
25import isodate
26import numpy as np
28from CSET._common import iter_maybe
29from CSET.operators._utils import is_time_aggregatable
32def _add_nref(cube: iris.cube.Cube):
33 """Retain information on number of forecast_reference_time inputs.
35 This preserves information on number of aggregated cases that can
36 otherwise be lost on subsequent calls to collapse functions.
37 """
38 nref = np.size(cube.coord("forecast_reference_time").points)
39 cube.coord("time").attributes["number_reference_times"] = nref
40 return cube
43def time_aggregate(
44 cube: iris.cube.Cube,
45 method: str,
46 interval_iso: str,
47 **kwargs,
48) -> iris.cube.Cube:
49 """Aggregate cube by its time coordinate.
51 Aggregates similar (stash) fields in a cube for the specified coordinate and
52 using the method supplied. The aggregated cube will keep the coordinate and
53 add a further coordinate with the aggregated end time points.
55 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations
56 given an interval for the new time coordinate.
58 We use the isodate class to convert ISO 8601 durations into time intervals
59 for creating a new time coordinate for aggregation.
61 We use the lambda function to pass coord and interval into the callable
62 category function in add_categorised to allow users to define their own
63 sub-daily intervals for the new time coordinate.
65 Arguments
66 ---------
67 cube: iris.cube.Cube
68 Cube to aggregate and iterate over one dimension
69 coordinate: str
70 Coordinate to aggregate over i.e. 'time', 'longitude',
71 'latitude','model_level_number'.
72 method: str
73 Type of aggregate i.e. method: 'SUM', getattr creates
74 iris.analysis.SUM, etc.
75 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins)
76 Interval to aggregate over.
78 Returns
79 -------
80 cube: iris.cube.Cube
81 Single variable but several methods of aggregation
83 Raises
84 ------
85 ValueError
86 If the constraint doesn't produce a single cube containing a field.
87 """
88 # Duration of ISO timedelta.
89 timedelta = isodate.parse_duration(interval_iso)
91 # Convert interval format to whole hours.
92 interval = int(timedelta.total_seconds() / 3600)
94 # Add time categorisation overwriting hourly increment via lambda coord.
95 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html
96 iris.coord_categorisation.add_categorised_coord(
97 cube, "interval", "time", lambda coord, cell: cell // interval * interval
98 )
100 # Aggregate cube using supplied method.
101 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method))
102 aggregated_cube.remove_coord("interval")
103 return aggregated_cube
106def ensure_aggregatable_across_cases(
107 cubes: iris.cube.Cube | iris.cube.CubeList,
108) -> iris.cube.CubeList:
109 """Ensure a Cube or CubeList can be aggregated across multiple cases.
111 The cubes are grouped into buckets of compatible cubes, then each bucket is
112 converted into a single aggregatable cube with ``forecast_period`` and
113 ``forecast_reference_time`` dimension coordinates.
115 Arguments
116 ---------
117 cubes: iris.cube.Cube | iris.cube.CubeList
118 Each cube is checked to determine if it has the the necessary
119 dimensional coordinates to be aggregatable, being processed if needed.
121 Returns
122 -------
123 cubes: iris.cube.CubeList
124 A CubeList of time aggregatable cubes.
126 Raises
127 ------
128 ValueError
129 If any of the provided cubes cannot be made aggregatable.
131 Notes
132 -----
133 This is a simple operator designed to ensure that a Cube is aggregatable
134 across cases. If a CubeList is presented it will create an aggregatable Cube
135 from that list. Its functionality is for case study (or trial) aggregation
136 to ensure that the full dataset can be loaded as a single cube. This
137 functionality is particularly useful for percentiles, Q-Q plots, and
138 histograms.
140 The necessary dimension coordinates for a cube to be aggregatable are
141 ``forecast_period`` and ``forecast_reference_time``.
142 """
144 # Group compatible cubes.
145 class Buckets:
146 def __init__(self):
147 self.buckets = []
149 def add(self, cube: iris.cube.Cube):
150 """Add a cube into a bucket.
152 If the cube is compatible with an existing bucket it is added there.
153 Otherwise it gets its own bucket.
154 """
155 for bucket in self.buckets:
156 if bucket[0].is_compatible(cube):
157 bucket.append(cube)
158 return
159 self.buckets.append(iris.cube.CubeList([cube]))
161 def get_buckets(self) -> list[iris.cube.CubeList]:
162 return self.buckets
164 b = Buckets()
165 for cube in iter_maybe(cubes):
166 b.add(cube)
167 buckets = b.get_buckets()
169 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets))
171 # Ensure each bucket is a single aggregatable cube.
172 aggregatable_cubes = iris.cube.CubeList()
173 for bucket in buckets:
174 # Single cubes that are already aggregatable won't need processing.
175 if len(bucket) == 1 and is_time_aggregatable(bucket[0]):
176 aggregatable_cube = bucket[0]
177 aggregatable_cube = _add_nref(aggregatable_cube)
178 aggregatable_cubes.append(aggregatable_cube)
179 continue
181 # Create an aggregatable cube from the provided CubeList.
182 to_merge = iris.cube.CubeList()
183 for cube in bucket:
184 try:
185 to_merge.extend(
186 cube.slices_over(["forecast_period", "forecast_reference_time"])
187 )
188 except iris.exceptions.CoordinateNotFoundError as err:
189 raise ValueError(
190 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.",
191 cube,
192 ) from err
193 aggregatable_cube = to_merge.merge_cube()
195 # Add attribute on number of forecast_reference_times
196 aggregatable_cube = _add_nref(aggregatable_cube)
198 # Verify cube is now aggregatable.
199 if not is_time_aggregatable(aggregatable_cube): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 raise ValueError(
201 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.",
202 aggregatable_cube,
203 )
204 aggregatable_cubes.append(aggregatable_cube)
206 return aggregatable_cubes
209def add_hour_coordinate(
210 cubes: iris.cube.Cube | iris.cube.CubeList,
211) -> iris.cube.Cube | iris.cube.CubeList:
212 """Add a category coordinate of hour of day to a Cube or CubeList.
214 Arguments
215 ---------
216 cubes: iris.cube.Cube | iris.cube.CubeList
217 Cube of any variable that has a time coordinate.
218 Note input Cube or CubeList items should only have 1 time dimension.
220 Returns
221 -------
222 cube: iris.cube.Cube
223 A Cube with an additional auxiliary coordinate of hour.
225 Notes
226 -----
227 This is a simple operator designed to be used prior to case aggregation for
228 histograms, Q-Q plots, and percentiles when aggregated by hour of day.
229 """
230 new_cubelist = iris.cube.CubeList()
231 for cube in iter_maybe(cubes):
232 # Add a category coordinate of hour into each cube.
233 iris.util.promote_aux_coord_to_dim_coord(cube, "time")
234 iris.coord_categorisation.add_hour(cube, "time", name="hour")
235 cube.coord("hour").units = "hours"
236 new_cubelist.append(cube)
238 if len(new_cubelist) == 1:
239 return new_cubelist[0]
240 else:
241 return new_cubelist
244def rolling_window_time_aggregation(
245 cubes: iris.cube.Cube | iris.cube.CubeList, method: str, window: int
246) -> iris.cube.Cube | iris.cube.CubeList:
247 """Aggregate a cube along the time dimension using a rolling window.
249 Arguments
250 ---------
251 cubes: iris.cube.Cube | iris.cube.CubeList
252 Cube or Cubelist of any variable to be aggregated over a rolling window
253 in time.
254 method: str
255 Type of aggregate i.e. method: 'MAX', getattr creates
256 iris.analysis.MAX, etc.
257 window: int
258 The rolling window size.
260 Returns
261 -------
262 cube: iris.cube.Cube | iris.cube.CubeList
263 A Cube or Cubelist of the rolling window aggregate. The Cubes will have
264 a time dimension that is reduced in size to the original cube by the
265 window size.
267 Notes
268 -----
269 This operator is designed to be used to help create daily maxima and minima
270 for any variable.
271 """
272 new_cubelist = iris.cube.CubeList()
273 for cube in iter_maybe(cubes):
274 # Use a rolling window in time to applied specified aggregation method
275 # over a specified window length.
276 window_cube = cube.rolling_window(
277 "time", getattr(iris.analysis, method), window
278 )
279 new_cubelist.append(window_cube)
281 if len(new_cubelist) == 1:
282 return new_cubelist[0]
283 else:
284 return new_cubelist