Coverage for src / CSET / operators / aggregate.py: 61%

120 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 15:02 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to aggregate across either 1 or 2 dimensions.""" 

16 

17import logging 

18 

19import iris 

20import iris.analysis 

21import iris.coord_categorisation 

22import iris.cube 

23import iris.exceptions 

24import iris.util 

25import isodate 

26import numpy as np 

27 

28from CSET._common import iter_maybe 

29from CSET.operators._utils import ( 

30 identify_unique_times, 

31 is_time_aggregatable, 

32 is_time_aux_coord, 

33 remove_cell_method, 

34 remove_duplicates, 

35) 

36 

37 

38def _add_nref(cube: iris.cube.Cube): 

39 """Retain information on number of forecast_reference_time inputs. 

40 

41 This preserves information on number of aggregated cases that can 

42 otherwise be lost on subsequent calls to collapse functions. 

43 """ 

44 nref = np.size(cube.coord("forecast_reference_time").points) 

45 cube.coord("time").attributes["number_reference_times"] = nref 

46 return cube 

47 

48 

49def time_aggregate( 

50 cube: iris.cube.Cube, 

51 method: str, 

52 interval_iso: str, 

53 **kwargs, 

54) -> iris.cube.Cube: 

55 """Aggregate cube by its time coordinate. 

56 

57 Aggregates similar (stash) fields in a cube for the specified coordinate and 

58 using the method supplied. The aggregated cube will keep the coordinate and 

59 add a further coordinate with the aggregated end time points. 

60 

61 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations 

62 given an interval for the new time coordinate. 

63 

64 We use the isodate class to convert ISO 8601 durations into time intervals 

65 for creating a new time coordinate for aggregation. 

66 

67 We use the lambda function to pass coord and interval into the callable 

68 category function in add_categorised to allow users to define their own 

69 sub-daily intervals for the new time coordinate. 

70 

71 Arguments 

72 --------- 

73 cube: iris.cube.Cube 

74 Cube to aggregate and iterate over one dimension 

75 coordinate: str 

76 Coordinate to aggregate over i.e. 'time', 'longitude', 

77 'latitude','model_level_number'. 

78 method: str 

79 Type of aggregate i.e. method: 'SUM', getattr creates 

80 iris.analysis.SUM, etc. 

81 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins) 

82 Interval to aggregate over. 

83 

84 Returns 

85 ------- 

86 cube: iris.cube.Cube 

87 Single variable but several methods of aggregation 

88 

89 Raises 

90 ------ 

91 ValueError 

92 If the constraint doesn't produce a single cube containing a field. 

93 """ 

94 # Duration of ISO timedelta. 

95 timedelta = isodate.parse_duration(interval_iso) 

96 

97 # Convert interval format to whole hours. 

98 interval = int(timedelta.total_seconds() / 3600) 

99 

100 # Add time categorisation overwriting hourly increment via lambda coord. 

101 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html 

102 iris.coord_categorisation.add_categorised_coord( 

103 cube, "interval", "time", lambda coord, cell: cell // interval * interval 

104 ) 

105 

106 # Aggregate cube using supplied method. 

107 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method)) 

108 aggregated_cube.remove_coord("interval") 

109 return aggregated_cube 

110 

111 

112def ensure_aggregatable_across_cases( 

113 cubes: iris.cube.Cube | iris.cube.CubeList, 

114 time_coord_name: str, 

115) -> iris.cube.CubeList: 

116 """Ensure a Cube or CubeList can be aggregated across multiple cases. 

117 

118 The cubes are grouped into buckets of compatible cubes. Each bucket is then 

119 processed to create aggregatable cubes. The function handles two types of 

120 time coordinates: 

121 

122 - Dimension coordinates: Cubes with ``forecast_period`` and 

123 ``forecast_reference_time`` as dimension coordinates are sliced and merged. 

124 - Auxiliary coordinates: Cubes with time as an auxiliary coordinate are 

125 aggregated at each unique time point and then merged. 

126 

127 Arguments 

128 --------- 

129 cubes: iris.cube.Cube | iris.cube.CubeList 

130 Each cube is checked to determine if it has the necessary time 

131 coordinates (either as dimension or auxiliary coordinates) to be 

132 aggregatable, being processed if needed. 

133 time_coord_name: str 

134 Name of the time coordinate to aggregate over, typically "time" or 

135 "forecast_period". 

136 

137 Returns 

138 ------- 

139 cubes: iris.cube.CubeList 

140 A CubeList of time aggregatable cubes. Each cube will have a 

141 ``number_reference_times`` attribute on its time coordinate indicating 

142 the number of forecast reference times aggregated. 

143 

144 Raises 

145 ------ 

146 ValueError 

147 If any of the provided cubes cannot be made aggregatable (i.e., missing 

148 required time coordinates). 

149 

150 Notes 

151 ----- 

152 This operator is designed to ensure that cubes can be aggregated across 

153 multiple cases (e.g., different model runs or forecast times). Its 

154 functionality is particularly useful for case study or trial aggregation 

155 when computing statistics such as percentiles, Q-Q plots, and histograms. 

156 

157 For cubes to be aggregatable, they must have either: 

158 

159 - ``forecast_period`` and ``forecast_reference_time`` as dimension 

160 or auxiliary coordinates 

161 """ 

162 

163 # Group compatible cubes. 

164 class Buckets: 

165 def __init__(self): 

166 self.buckets = [] 

167 

168 def add(self, cube: iris.cube.Cube): 

169 """Add a cube into a bucket. 

170 

171 If the cube is compatible with an existing bucket it is added there. 

172 Otherwise it gets its own bucket. 

173 """ 

174 for bucket in self.buckets: 

175 if bucket[0].is_compatible(cube): 

176 bucket.append(cube) 

177 return 

178 self.buckets.append(iris.cube.CubeList([cube])) 

179 

180 def get_buckets(self) -> list[iris.cube.CubeList]: 

181 return self.buckets 

182 

183 b = Buckets() 

184 for cube in iter_maybe(cubes): 

185 b.add(cube) 

186 buckets = b.get_buckets() 

187 

188 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets)) 

189 

190 # Ensure each bucket is a single aggregatable cube. 

191 aggregatable_cubes = iris.cube.CubeList() 

192 for bucket in buckets: 

193 # Single cubes that are already aggregatable won't need processing. 

194 if len(bucket) == 1 and is_time_aggregatable(bucket[0]): 

195 aggregatable_cube = bucket[0] 

196 aggregatable_cube = _add_nref(aggregatable_cube) 

197 aggregatable_cubes.append(aggregatable_cube) 

198 continue 

199 

200 # Check if all cubes in bucket are time_aggregatable 

201 if all(is_time_aggregatable(cube) for cube in bucket): 

202 to_merge = iris.cube.CubeList() 

203 for cube in bucket: 

204 try: 

205 to_merge.extend( 

206 cube.slices_over(["forecast_period", "forecast_reference_time"]) 

207 ) 

208 except iris.exceptions.CoordinateNotFoundError as err: 

209 raise ValueError( 

210 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.", 

211 cube, 

212 ) from err 

213 aggregatable_cube = to_merge.merge_cube() 

214 

215 # Add attribute on number of forecast_reference_times 

216 aggregatable_cube = _add_nref(aggregatable_cube) 

217 

218 aggregatable_cubes.append(aggregatable_cube) 

219 

220 elif all(is_time_aux_coord(cube) for cube in bucket): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 times = identify_unique_times(bucket, time_coord_name) 

222 

223 aggregated_list = [] 

224 for time in times: 

225 for cube in bucket: 

226 aggregated_list.extend( 

227 _aggregate_without_time_dimcoords( 

228 cube, time, iris.analysis.MEAN, None 

229 ) 

230 ) 

231 

232 cubes_to_merge = iris.cube.CubeList(aggregated_list) 

233 

234 aggregatable_cube = cubes_to_merge.merge_cube() 

235 aggregatable_cube = _add_nref(aggregatable_cube) 

236 aggregatable_cubes.append(aggregatable_cube) 

237 

238 else: 

239 raise ValueError( 

240 "Cube is missing required dimension or auxiliary time coordinates", 

241 bucket, 

242 ) 

243 

244 return aggregatable_cubes 

245 

246 

247def add_hour_coordinate( 

248 cubes: iris.cube.Cube | iris.cube.CubeList, 

249) -> iris.cube.Cube | iris.cube.CubeList: 

250 """Add a category coordinate of hour of day to a Cube or CubeList. 

251 

252 Arguments 

253 --------- 

254 cubes: iris.cube.Cube | iris.cube.CubeList 

255 Cube of any variable that has a time coordinate. 

256 Note input Cube or CubeList items should only have 1 time dimension. 

257 

258 Returns 

259 ------- 

260 cube: iris.cube.Cube 

261 A Cube with an additional auxiliary coordinate of hour. 

262 

263 Notes 

264 ----- 

265 This is a simple operator designed to be used prior to case aggregation for 

266 histograms, Q-Q plots, and percentiles when aggregated by hour of day. 

267 """ 

268 new_cubelist = iris.cube.CubeList() 

269 for cube in iter_maybe(cubes): 

270 # Add a category coordinate of hour into each cube. 

271 iris.util.promote_aux_coord_to_dim_coord(cube, "time") 

272 iris.coord_categorisation.add_hour(cube, "time", name="hour") 

273 cube.coord("hour").units = "hours" 

274 new_cubelist.append(cube) 

275 

276 if len(new_cubelist) == 1: 

277 return new_cubelist[0] 

278 else: 

279 return new_cubelist 

280 

281 

282def rolling_window_time_aggregation( 

283 cubes: iris.cube.Cube | iris.cube.CubeList, method: str, window: int 

284) -> iris.cube.Cube | iris.cube.CubeList: 

285 """Aggregate a cube along the time dimension using a rolling window. 

286 

287 Arguments 

288 --------- 

289 cubes: iris.cube.Cube | iris.cube.CubeList 

290 Cube or Cubelist of any variable to be aggregated over a rolling window 

291 in time. 

292 method: str 

293 Type of aggregate i.e. method: 'MAX', getattr creates 

294 iris.analysis.MAX, etc. 

295 window: int 

296 The rolling window size. 

297 

298 Returns 

299 ------- 

300 cube: iris.cube.Cube | iris.cube.CubeList 

301 A Cube or Cubelist of the rolling window aggregate. The Cubes will have 

302 a time dimension that is reduced in size to the original cube by the 

303 window size. 

304 

305 Notes 

306 ----- 

307 This operator is designed to be used to help create daily maxima and minima 

308 for any variable. 

309 """ 

310 new_cubelist = iris.cube.CubeList() 

311 for cube in iter_maybe(cubes): 

312 # Use a rolling window in time to applied specified aggregation method 

313 # over a specified window length. 

314 window_cube = cube.rolling_window( 

315 "time", getattr(iris.analysis, method), window 

316 ) 

317 new_cubelist.append(window_cube) 

318 

319 if len(new_cubelist) == 1: 

320 return new_cubelist[0] 

321 else: 

322 return new_cubelist 

323 

324 

325def _aggregate_without_time_dimcoords(cubes, time_coord, aggregator, percentile): 

326 """Aggregate cubes at a specific time without time dimension coordinates. 

327 

328 Arguments 

329 --------- 

330 cubes: iris.cube.CubeList 

331 Cubes to aggregate. 

332 time_coord: iris.coords.Coord 

333 Single time coordinate point to aggregate at. 

334 aggregator: iris.analysis.Aggregator 

335 Aggregation method (e.g., iris.analysis.MEAN). 

336 percentile: float, optional 

337 Percentile value if using PERCENTILE aggregator. 

338 """ 

339 # Handle single cube input 

340 if isinstance(cubes, iris.cube.Cube): 

341 cubes = iris.cube.CubeList([cubes]) 

342 

343 # Check the supplied time coordinate to make sure it corresponds to a 

344 # single time only 

345 if len(time_coord.points) != 1: 

346 raise ValueError("Time coordinate should specify a single time only") 

347 

348 # Remove any duplicate cubes in the input 

349 cubes = remove_duplicates(cubes) 

350 

351 time_coord_name = time_coord.name() 

352 

353 time_constraint = iris.Constraint( 

354 coord_values={time_coord_name: lambda cell: cell.point in time_coord.cells()} 

355 ) 

356 cubes_at_time = cubes.extract(time_constraint) 

357 

358 # Add a temporary "number" coordinate to uniquely label the different 

359 # data points at this time 

360 number = 0 

361 numbered_cubes = iris.cube.CubeList() 

362 for cube in cubes_at_time: 

363 for slc in cube.slices_over(time_coord_name): 

364 number_coord = iris.coords.AuxCoord(number, long_name="number") 

365 slc.add_aux_coord(number_coord) 

366 numbered_cubes.append(slc) 

367 number += 1 

368 cubes_at_time = numbered_cubes 

369 

370 cubes_at_time = cubes_at_time.merge() 

371 

372 aggregated_cubes = iris.cube.CubeList() 

373 for cube in cubes_at_time: 

374 # If there was only a single data point at this time, then "number" 

375 # will be a scalar coordinate. If so, make it a dimension coordinate 

376 # to allow collapsing 

377 if not cube.coord_dims("number"): 

378 cube = iris.util.new_axis(cube, scalar_coord="number") 

379 

380 num_cases = cube.coord("number").points.size 

381 num_cases_coord = iris.coords.AuxCoord(num_cases, long_name="num_cases") 

382 cube.add_aux_coord(num_cases_coord) 

383 

384 # Do aggregation across the temporary "number" coordinate 

385 if isinstance(aggregator, type(iris.analysis.PERCENTILE)): 

386 cube = cube.collapsed("number", aggregator, percent=percentile) 

387 else: 

388 cube = cube.collapsed("number", aggregator) 

389 

390 # Now remove the "number" coordinate and its cell method 

391 cube.remove_coord("number") 

392 cell_method = iris.coords.CellMethod(aggregator.name(), coords="number") 

393 cube = remove_cell_method(cube, cell_method) 

394 

395 aggregated_cubes.append(cube) 

396 

397 return aggregated_cubes