Coverage for src / CSET / operators / collapse.py: 91%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 14:08 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to perform various kind of collapse on either 1 or 2 dimensions.""" 

16 

17import datetime 

18import logging 

19import warnings 

20 

21import iris 

22import iris.analysis 

23import iris.coord_categorisation 

24import iris.coords 

25import iris.cube 

26import iris.exceptions 

27import iris.util 

28import numpy as np 

29 

30from CSET._common import iter_maybe 

31from CSET.operators.aggregate import add_hour_coordinate 

32 

33 

34def collapse( 

35 cubes: iris.cube.Cube | iris.cube.CubeList, 

36 coordinate: str | list[str], 

37 method: str, 

38 additional_percent: float = None, 

39 **kwargs, 

40) -> iris.cube.Cube | iris.cube.CubeList: 

41 """Collapse coordinate(s) of a single cube or of every cube in a cube list. 

42 

43 Collapses similar fields in each cube into a cube collapsing around the 

44 specified coordinate(s) and method. This could be a (weighted) mean or 

45 percentile. 

46 

47 Arguments 

48 --------- 

49 cubes: iris.cube.Cube | iris.cube.CubeList 

50 Cube or CubeList to collapse and iterate over one dimension 

51 coordinate: str | list[str] 

52 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude', 

53 'model_level_number', 'realization'. A list of multiple coordinates can 

54 be given. 

55 method: str 

56 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

57 'PERCENTILE' getattr creates iris.analysis.MEAN, etc. For PERCENTILE YAML 

58 file requires i.e. method: 'PERCENTILE' additional_percent: 90. 

59 additional_percent: float, optional 

60 Required for the PERCENTILE method. This is a number between 0 and 100. 

61 

62 Returns 

63 ------- 

64 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList 

65 Single variable but several methods of aggregation 

66 

67 Raises 

68 ------ 

69 ValueError 

70 If additional_percent wasn't supplied while using PERCENTILE method. 

71 """ 

72 if method == "SEQ" or method == "" or method is None: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 return cubes 

74 if method == "PERCENTILE" and additional_percent is None: 

75 raise ValueError("Must specify additional_percent") 

76 

77 # Retain only common time points between different models if multiple model inputs. 

78 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 

79 logging.debug( 

80 "Extracting common time points as multiple model inputs detected." 

81 ) 

82 for cube in cubes: 

83 cube.coord("forecast_reference_time").bounds = None 

84 cube.coord("forecast_period").bounds = None 

85 cubes = cubes.extract_overlapping( 

86 ["forecast_reference_time", "forecast_period"] 

87 ) 

88 if len(cubes) == 0: 

89 raise ValueError("No overlapping times detected in input cubes.") 

90 

91 collapsed_cubes = iris.cube.CubeList([]) 

92 with warnings.catch_warnings(): 

93 warnings.filterwarnings( 

94 "ignore", "Cannot check if coordinate is contiguous", UserWarning 

95 ) 

96 warnings.filterwarnings( 

97 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning 

98 ) 

99 for cube in iter_maybe(cubes): 

100 # Apply a mask to check for invalid data, this will allow NaNs to 

101 # be ignored. 

102 cube.data = np.ma.masked_invalid(cube.data) 

103 if method == "PERCENTILE": 

104 collapsed_cubes.append( 

105 cube.collapsed( 

106 coordinate, 

107 getattr(iris.analysis, method), 

108 percent=additional_percent, 

109 ) 

110 ) 

111 elif method == "RANGE": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 cube_max = cube.collapsed(coordinate, iris.analysis.MAX) 

113 cube_min = cube.collapsed(coordinate, iris.analysis.MIN) 

114 collapsed_cubes.append(cube_max - cube_min) 

115 else: 

116 collapsed_cubes.append( 

117 cube.collapsed(coordinate, getattr(iris.analysis, method)) 

118 ) 

119 if len(collapsed_cubes) == 1: 

120 return collapsed_cubes[0] 

121 else: 

122 return collapsed_cubes 

123 

124 

125def collapse_by_hour_of_day( 

126 cubes: iris.cube.Cube | iris.cube.CubeList, 

127 method: str, 

128 additional_percent: float = None, 

129 **kwargs, 

130) -> iris.cube.Cube: 

131 """Collapse a cube by hour of the day. 

132 

133 Collapses a cube by hour of the day in the time coordinates provided by the 

134 model. It is useful for creating diurnal cycle plots. It aggregates all 00 

135 UTC together regardless of lead time. 

136 

137 Arguments 

138 --------- 

139 cubes: iris.cube.Cube | iris.cube.CubeList 

140 Cube to collapse and iterate over one dimension or CubeList to convert 

141 to a cube and then collapse prior to aggregating by hour. If a CubeList 

142 is provided each cube is handled separately. 

143 method: str 

144 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

145 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified. 

146 

147 Returns 

148 ------- 

149 cube: iris.cube.Cube 

150 Single variable but several methods of aggregation. 

151 

152 Raises 

153 ------ 

154 ValueError 

155 If additional_percent wasn't supplied while using PERCENTILE method. 

156 

157 Notes 

158 ----- 

159 Collapsing of the cube is around the 'time' coordinate. The coordinates are 

160 first grouped by the hour of day, and then aggregated by the hour of day to 

161 create a diurnal cycle. This operator is applicable for both single 

162 forecasts and for multiple forecasts. The hour used is based on the units of 

163 the time coordinate. If the time coordinate is in UTC, hour will be in UTC. 

164 

165 To apply this operator successfully there must only be one time dimension. 

166 Should a MultiDim exception be raised the user first needs to apply the 

167 collapse operator to reduce the time dimensions before applying this 

168 operator. A cube containing the two time dimensions 

169 'forecast_reference_time' and 'forecast_period' will be automatically 

170 collapsed by lead time before being being collapsed by hour of day. 

171 """ 

172 if method == "PERCENTILE" and additional_percent is None: 

173 raise ValueError("Must specify additional_percent") 

174 

175 # Retain only common time points between different models if multiple model inputs. 

176 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 

177 logging.debug( 

178 "Extracting common time points as multiple model inputs detected." 

179 ) 

180 for cube in cubes: 

181 cube.coord("forecast_reference_time").bounds = None 

182 cube.coord("forecast_period").bounds = None 

183 cubes = cubes.extract_overlapping( 

184 ["forecast_reference_time", "forecast_period"] 

185 ) 

186 if len(cubes) == 0: 

187 raise ValueError("No overlapping times detected in input cubes.") 

188 

189 collapsed_cubes = iris.cube.CubeList([]) 

190 for cube in iter_maybe(cubes): 

191 # Ensure hour coordinate in each input is sorted, and data adjusted if needed. 

192 sorted_cube = iris.cube.CubeList() 

193 for fcst_slice in cube.slices_over(["forecast_reference_time"]): 

194 # Categorise the time coordinate by hour of the day. 

195 fcst_slice = add_hour_coordinate(fcst_slice) 

196 if method == "PERCENTILE": 

197 by_hour = fcst_slice.aggregated_by( 

198 "hour", getattr(iris.analysis, method), percent=additional_percent 

199 ) 

200 else: 

201 by_hour = fcst_slice.aggregated_by( 

202 "hour", getattr(iris.analysis, method) 

203 ) 

204 # Compute if data needs sorting to lie in increasing order [0..23]. 

205 # Note multiple forecasts can sit in same cube spanning different 

206 # initialisation times and data ranges. 

207 time_points = by_hour.coord("hour").points 

208 time_points_sorted = np.sort(by_hour.coord("hour").points) 

209 if time_points[0] != time_points_sorted[0]: 209 ↛ 217line 209 didn't jump to line 217 because the condition on line 209 was always true

210 nroll = time_points[0] / (time_points[1] - time_points[0]) 

211 # Shift hour coordinate and data cube to be in time of day order. 

212 by_hour.coord("hour").points = np.roll(time_points, nroll, 0) 

213 by_hour.data = np.roll(by_hour.data, nroll, axis=0) 

214 

215 # Remove unnecessary time coordinate. 

216 # "hour" and "forecast_period" remain as AuxCoord. 

217 by_hour.remove_coord("time") 

218 

219 sorted_cube.append(by_hour) 

220 

221 # Recombine cube slices. 

222 cube = sorted_cube.merge_cube() 

223 

224 # Apply a mask to check for invalid data, this will allow NaNs to 

225 # be ignored. 

226 cube.data = np.ma.masked_invalid(cube.data) 

227 

228 if cube.coords("forecast_reference_time", dim_coords=True): 

229 # Collapse by forecast reference time to get a single cube. 

230 cube = collapse( 

231 cube, 

232 "forecast_reference_time", 

233 method, 

234 additional_percent=additional_percent, 

235 ) 

236 else: 

237 # Or remove forecast reference time if a single case, as collapse 

238 # will have effectively done this. 

239 cube.remove_coord("forecast_reference_time") 

240 

241 # Promote "hour" to dim_coord. 

242 iris.util.promote_aux_coord_to_dim_coord(cube, "hour") 

243 collapsed_cubes.append(cube) 

244 

245 if len(collapsed_cubes) == 1: 

246 return collapsed_cubes[0] 

247 else: 

248 return collapsed_cubes 

249 

250 

251def collapse_by_validity_time( 

252 cubes: iris.cube.Cube | iris.cube.CubeList, 

253 method: str, 

254 additional_percent: float = None, 

255 **kwargs, 

256) -> iris.cube.Cube: 

257 """Collapse a cube around validity time for multiple cases. 

258 

259 First checks if the data can be aggregated easily. Then creates a new cube 

260 by slicing over the time dimensions, removing the time dimensions, 

261 re-merging the data, and creating a new time coordinate. It then collapses 

262 by the new time coordinate for a specified method using the collapse 

263 function. 

264 

265 Arguments 

266 --------- 

267 cubes: iris.cube.Cube | iris.cube.CubeList 

268 Cube to collapse by validity time or CubeList that will be converted 

269 to a cube before collapsing by validity time. 

270 method: str 

271 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

272 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified. 

273 

274 Returns 

275 ------- 

276 cube: iris.cube.Cube | iris.cube.CubeList 

277 Single variable collapsed by lead time based on chosen method. 

278 

279 Raises 

280 ------ 

281 ValueError 

282 If additional_percent wasn't supplied while using PERCENTILE method. 

283 """ 

284 if method == "PERCENTILE" and additional_percent is None: 

285 raise ValueError("Must specify additional_percent") 

286 

287 collapsed_cubes = iris.cube.CubeList([]) 

288 for cube in iter_maybe(cubes): 

289 # Slice over cube by both time dimensions to create a CubeList. 

290 new_cubelist = iris.cube.CubeList( 

291 cube.slices_over(["forecast_period", "forecast_reference_time"]) 

292 ) 

293 for sub_cube in new_cubelist: 

294 # Reconstruct the time coordinate if it is missing. 

295 if "time" not in [coord.name() for coord in sub_cube.coords()]: 

296 ref_time_coord = sub_cube.coord("forecast_reference_time") 

297 ref_units = ref_time_coord.units 

298 ref_time = ref_units.num2date(ref_time_coord.points) 

299 period_coord = sub_cube.coord("forecast_period") 

300 period_units = period_coord.units 

301 # Given how we are slicing there will only be one point. 

302 period_seconds = period_units.convert(period_coord.points[0], "seconds") 

303 period_duration = datetime.timedelta(seconds=period_seconds) 

304 time = ref_time + period_duration 

305 time_points = ref_units.date2num(time) 

306 time_coord = iris.coords.AuxCoord( 

307 points=time_points, standard_name="time", units=ref_units 

308 ) 

309 sub_cube.add_aux_coord(time_coord) 

310 # Remove forecast_period and forecast_reference_time coordinates. 

311 sub_cube.remove_coord("forecast_period") 

312 sub_cube.remove_coord("forecast_reference_time") 

313 # Create new CubeList by merging with unique = False to produce a validity 

314 # time cube. 

315 merged_list_1 = new_cubelist.merge(unique=False) 

316 # Create a new "fake" coordinate and apply to each remaining cube to allow 

317 # final merging to take place into a single cube. 

318 equalised_validity_time = iris.coords.AuxCoord( 

319 points=0, long_name="equalised_validity_time", units="1" 

320 ) 

321 for sub_cube, eq_valid_time in zip( 

322 merged_list_1, range(len(merged_list_1)), strict=True 

323 ): 

324 sub_cube.add_aux_coord(equalised_validity_time.copy(points=eq_valid_time)) 

325 

326 # Merge CubeList to create final cube. 

327 final_cube = merged_list_1.merge_cube() 

328 logging.debug("Pre-collapse validity time cube:\n%s", final_cube) 

329 

330 # Apply a mask to check for invalid data, this will allow NaNs to 

331 # be ignored. 

332 final_cube.data = np.ma.masked_invalid(final_cube.data) 

333 

334 # Collapse over equalised_validity_time as a proxy for equal validity 

335 # time. 

336 try: 

337 collapsed_cube = collapse( 

338 final_cube, 

339 "equalised_validity_time", 

340 method, 

341 additional_percent=additional_percent, 

342 ) 

343 except iris.exceptions.CoordinateCollapseError as err: 

344 raise ValueError( 

345 "Cubes do not overlap therefore cannot collapse across validity time." 

346 ) from err 

347 collapsed_cube.remove_coord("equalised_validity_time") 

348 collapsed_cubes.append(collapsed_cube) 

349 

350 if len(collapsed_cubes) == 1: 

351 return collapsed_cubes[0] 

352 else: 

353 return collapsed_cubes 

354 

355 

356def proportion( 

357 cubes: iris.cube.Cube | iris.cube.CubeList, 

358 coordinate: str | list[str], 

359 condition: str, 

360 threshold: float, 

361 **kwargs, 

362) -> iris.cube.Cube | iris.cube.CubeList: 

363 """Find the proportion of an event for all cubes. 

364 

365 Find the proportion of points at a specified threhsold in each cube into a 

366 cube collapsing around the specified coordinate(s). 

367 

368 Arguments 

369 --------- 

370 cubes: iris.cube.Cube | iris.cube.CubeList 

371 Cube or CubeList to collapse and iterate over one dimension 

372 coordinate: str | list[str] 

373 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude', 

374 'model_level_number', 'realization'. A list of multiple coordinates can 

375 be given. 

376 condition: str 

377 The condition for the event. Expected arguments are eq, ne, lt, gt, le, ge. 

378 The letters correspond to the following conditions 

379 eq: equal to; 

380 ne: not equal to; 

381 lt: less than; 

382 gt: greater than; 

383 le: less than or equal to; 

384 ge: greater than or equal to. 

385 threshold: float 

386 The value for the event. 

387 

388 Returns 

389 ------- 

390 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList 

391 The proportion of the event. 

392 """ 

393 # Set method 

394 method = "PROPORTION" 

395 # Retain only common time points between different models if multiple model inputs. 

396 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 logging.debug( 

398 "Extracting common time points as multiple model inputs detected." 

399 ) 

400 for cube in cubes: 

401 cube.coord("forecast_reference_time").bounds = None 

402 cube.coord("forecast_period").bounds = None 

403 cubes = cubes.extract_overlapping( 

404 ["forecast_reference_time", "forecast_period"] 

405 ) 

406 if len(cubes) == 0: 

407 raise ValueError("No overlapping times detected in input cubes.") 

408 

409 collapsed_cubes = iris.cube.CubeList([]) 

410 with warnings.catch_warnings(): 

411 warnings.filterwarnings( 

412 "ignore", "Cannot check if coordinate is contiguous", UserWarning 

413 ) 

414 warnings.filterwarnings( 

415 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning 

416 ) 

417 for cube in iter_maybe(cubes): 

418 # Apply a mask to check for invalid data, this will allow NaNs to 

419 # be ignored. 

420 cube.data = np.ma.masked_invalid(cube.data) 

421 match condition: 

422 case "eq": 

423 new_cube = cube.collapsed( 

424 coordinate, 

425 getattr(iris.analysis, method), 

426 function=lambda values: values == threshold, 

427 ) 

428 case "ne": 

429 new_cube = cube.collapsed( 

430 coordinate, 

431 getattr(iris.analysis, method), 

432 function=lambda values: values != threshold, 

433 ) 

434 case "gt": 

435 new_cube = cube.collapsed( 

436 coordinate, 

437 getattr(iris.analysis, method), 

438 function=lambda values: values > threshold, 

439 ) 

440 case "ge": 

441 new_cube = cube.collapsed( 

442 coordinate, 

443 getattr(iris.analysis, method), 

444 function=lambda values: values >= threshold, 

445 ) 

446 case "lt": 

447 new_cube = cube.collapsed( 

448 coordinate, 

449 getattr(iris.analysis, method), 

450 function=lambda values: values < threshold, 

451 ) 

452 case "le": 

453 new_cube = cube.collapsed( 

454 coordinate, 

455 getattr(iris.analysis, method), 

456 function=lambda values: values <= threshold, 

457 ) 

458 case _: 

459 raise ValueError( 

460 """Unexpected value for condition. Expected eq, ne, gt, ge, lt, le. Got {condition}.""" 

461 ) 

462 name = cube.long_name if cube.long_name else cube.name() 

463 new_cube.rename(f"probability_of_{name}_{condition}_{threshold}") 

464 new_cube.units = "1" 

465 collapsed_cubes.append(new_cube) 

466 

467 if len(collapsed_cubes) == 1: 467 ↛ 470line 467 didn't jump to line 470 because the condition on line 467 was always true

468 return collapsed_cubes[0] 

469 else: 

470 return collapsed_cubes 

471 

472 

473# TODO 

474# Collapse function that calculates means, medians etc across members of an 

475# ensemble or stratified groups. Need to allow collapse over realisation 

476# dimension for fixed time. Hence will require reading in of CubeList