Coverage for src/CSET/operators/aggregate.py: 100%
56 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
1# © Crown copyright, Met Office (2022-2024) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to aggregate across either 1 or 2 dimensions."""
17import logging
19import iris
20import iris.analysis
21import iris.coord_categorisation
22import iris.cube
23import isodate
25from CSET._common import iter_maybe
26from CSET.operators._utils import is_time_aggregatable
29def time_aggregate(
30 cube: iris.cube.Cube,
31 method: str,
32 interval_iso: str,
33 **kwargs,
34) -> iris.cube.Cube:
35 """Aggregate cube by its time coordinate.
37 Aggregates similar (stash) fields in a cube for the specified coordinate and
38 using the method supplied. The aggregated cube will keep the coordinate and
39 add a further coordinate with the aggregated end time points.
41 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations
42 given an interval for the new time coordinate.
44 We use the isodate class to convert ISO 8601 durations into time intervals
45 for creating a new time coordinate for aggregation.
47 We use the lambda function to pass coord and interval into the callable
48 category function in add_categorised to allow users to define their own
49 sub-daily intervals for the new time coordinate.
51 Arguments
52 ---------
53 cube: iris.cube.Cube
54 Cube to aggregate and iterate over one dimension
55 coordinate: str
56 Coordinate to aggregate over i.e. 'time', 'longitude',
57 'latitude','model_level_number'.
58 method: str
59 Type of aggregate i.e. method: 'SUM', getattr creates
60 iris.analysis.SUM, etc.
61 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins)
62 Interval to aggregate over.
64 Returns
65 -------
66 cube: iris.cube.Cube
67 Single variable but several methods of aggregation
69 Raises
70 ------
71 ValueError
72 If the constraint doesn't produce a single cube containing a field.
73 """
74 # Duration of ISO timedelta.
75 timedelta = isodate.parse_duration(interval_iso)
77 # Convert interval format to whole hours.
78 interval = int(timedelta.total_seconds() / 3600)
80 # Add time categorisation overwriting hourly increment via lambda coord.
81 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html
82 iris.coord_categorisation.add_categorised_coord(
83 cube, "interval", "time", lambda coord, cell: cell // interval * interval
84 )
86 # Aggregate cube using supplied method.
87 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method))
88 aggregated_cube.remove_coord("interval")
89 return aggregated_cube
92def ensure_aggregatable_across_cases(
93 cubes: iris.cube.Cube | iris.cube.CubeList,
94) -> iris.cube.CubeList:
95 """Ensure a Cube or CubeList can be aggregated across multiple cases.
97 The cubes are grouped into buckets of compatible cubes, then each bucket is
98 converted into a single aggregatable cube with ``forecast_period`` and
99 ``forecast_reference_time`` dimension coordinates.
101 Arguments
102 ---------
103 cubes: iris.cube.Cube | iris.cube.CubeList
104 Each cube is checked to determine if it has the the necessary
105 dimensional coordinates to be aggregatable, being processed if needed.
107 Returns
108 -------
109 cubes: iris.cube.CubeList
110 A CubeList of time aggregatable cubes.
112 Raises
113 ------
114 ValueError
115 If any of the provided cubes cannot be made aggregatable.
117 Notes
118 -----
119 This is a simple operator designed to ensure that a Cube is aggregatable
120 across cases. If a CubeList is presented it will create an aggregatable Cube
121 from that list. Its functionality is for case study (or trial) aggregation
122 to ensure that the full dataset can be loaded as a single cube. This
123 functionality is particularly useful for percentiles, Q-Q plots, and
124 histograms.
126 The necessary dimension coordinates for a cube to be aggregatable are
127 ``forecast_period`` and ``forecast_reference_time``.
128 """
130 # Group compatible cubes.
131 class Buckets:
132 def __init__(self):
133 self.buckets = []
135 def add(self, cube: iris.cube.Cube):
136 """Add a cube into a bucket.
138 If the cube is compatible with an existing bucket it is added there.
139 Otherwise it gets its own bucket.
140 """
141 for bucket in self.buckets:
142 if bucket[0].is_compatible(cube):
143 bucket.append(cube)
144 return
145 self.buckets.append(iris.cube.CubeList([cube]))
147 def get_buckets(self) -> list[iris.cube.CubeList]:
148 return self.buckets
150 b = Buckets()
151 for cube in iter_maybe(cubes):
152 b.add(cube)
153 buckets = b.get_buckets()
155 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets))
157 # Ensure each bucket is a single aggregatable cube.
158 aggregatable_cubes = iris.cube.CubeList()
159 for bucket in buckets:
160 # Single cubes that are already aggregatable won't need processing.
161 if len(bucket) == 1 and is_time_aggregatable(bucket[0]):
162 aggregatable_cube = bucket[0]
163 aggregatable_cubes.append(aggregatable_cube)
164 continue
166 # Create an aggregatable cube from the provided CubeList.
167 to_merge = iris.cube.CubeList()
168 for cube in bucket:
169 to_merge.extend(
170 cube.slices_over(["forecast_period", "forecast_reference_time"])
171 )
172 aggregatable_cube = to_merge.merge_cube()
174 # Verify cube is now aggregatable.
175 if not is_time_aggregatable(aggregatable_cube):
176 raise ValueError(
177 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.",
178 aggregatable_cube,
179 )
180 aggregatable_cubes.append(aggregatable_cube)
182 return aggregatable_cubes
185def add_hour_coordinate(
186 cubes: iris.cube.Cube | iris.cube.CubeList,
187) -> iris.cube.Cube | iris.cube.CubeList:
188 """Add a category coordinate of hour of day to a Cube or CubeList.
190 Arguments
191 ---------
192 cubes: iris.cube.Cube | iris.cube.CubeList
193 Cube of any variable that has a time coordinate.
194 Note input Cube or CubeList items should only have 1 time dimension.
196 Returns
197 -------
198 cube: iris.cube.Cube
199 A Cube with an additional auxiliary coordinate of hour.
201 Notes
202 -----
203 This is a simple operator designed to be used prior to case aggregation for
204 histograms, Q-Q plots, and percentiles when aggregated by hour of day.
205 """
206 new_cubelist = iris.cube.CubeList()
207 for cube in iter_maybe(cubes):
208 # Add a category coordinate of hour into each cube.
209 iris.util.promote_aux_coord_to_dim_coord(cube, "time")
210 iris.coord_categorisation.add_hour(cube, "time", name="hour")
211 cube.coord("hour").units = "hours"
212 new_cubelist.append(cube)
214 if len(new_cubelist) == 1:
215 return new_cubelist[0]
216 else:
217 return new_cubelist