Coverage for src/CSET/operators/collapse.py: 97%

136 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-09 12:53 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to perform various kind of collapse on either 1 or 2 dimensions.""" 

16 

17import datetime 

18import logging 

19import warnings 

20 

21import iris 

22import iris.analysis 

23import iris.coord_categorisation 

24import iris.coords 

25import iris.cube 

26import iris.exceptions 

27import iris.util 

28import numpy as np 

29 

30from CSET._common import iter_maybe 

31from CSET.operators.aggregate import add_hour_coordinate 

32 

33 

34def collapse( 

35 cubes: iris.cube.Cube | iris.cube.CubeList, 

36 coordinate: str | list[str], 

37 method: str, 

38 additional_percent: float = None, 

39 condition: str = None, 

40 threshold: float = None, 

41 **kwargs, 

42) -> iris.cube.Cube | iris.cube.CubeList: 

43 """Collapse coordinate(s) of a single cube or of every cube in a cube list. 

44 

45 Collapses similar fields in each cube into a cube collapsing around the 

46 specified coordinate(s) and method. This could be a (weighted) mean or 

47 percentile. 

48 

49 Arguments 

50 --------- 

51 cubes: iris.cube.Cube | iris.cube.CubeList 

52 Cube or CubeList to collapse and iterate over one dimension 

53 coordinate: str | list[str] 

54 Coordinate(s) to collapse over e.g. 'time', 'longitude', 'latitude', 

55 'model_level_number', 'realization'. A list of multiple coordinates can 

56 be given. 

57 method: str 

58 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

59 'PERCENTILE' getattr creates iris.analysis.MEAN, etc. For PERCENTILE YAML 

60 file requires i.e. method: 'PERCENTILE' additional_percent: 90. For 

61 PROPORTION YAML file requires i.e. method: 'PROPORTION', condition: lt, threshold: 273.15. 

62 additional_percent: float, optional 

63 Required for the PERCENTILE method. This is a number between 0 and 100. 

64 condition: str, optional 

65 Required for the PROPORTION method. Expected arguments are eq, ne, lt, gt, le, ge. 

66 The letters correspond to the following conditions 

67 eq: equal to; 

68 ne: not equal to; 

69 lt: less than; 

70 gt: greater than; 

71 le: less than or equal to; 

72 ge: greater than or equal to. 

73 threshold: float, optional 

74 Required for the PROPORTION method. 

75 

76 Returns 

77 ------- 

78 collapsed_cubes: iris.cube.Cube | iris.cube.CubeList 

79 Single variable but several methods of aggregation 

80 

81 Raises 

82 ------ 

83 ValueError 

84 If additional_percent wasn't supplied while using PERCENTILE method. 

85 If condition wasn't supplied while using PROPORTION method. 

86 If threshold wasn't supplied while using PROPORTION method. 

87 """ 

88 if method == "SEQ" or method == "" or method is None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return cubes 

90 if method == "PERCENTILE" and additional_percent is None: 

91 raise ValueError("Must specify additional_percent") 

92 if method == "PROPORTION" and condition is None: 

93 raise ValueError("Must specify a condition for the probability") 

94 if method == "PROPORTION" and threshold is None: 

95 raise ValueError("Must specify a threshold for the probability") 

96 

97 # Retain only common time points between different models if multiple model inputs. 

98 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 

99 logging.debug( 

100 "Extracting common time points as multiple model inputs detected." 

101 ) 

102 for cube in cubes: 

103 cube.coord("forecast_reference_time").bounds = None 

104 cube.coord("forecast_period").bounds = None 

105 cubes = cubes.extract_overlapping( 

106 ["forecast_reference_time", "forecast_period"] 

107 ) 

108 if len(cubes) == 0: 

109 raise ValueError("No overlapping times detected in input cubes.") 

110 

111 collapsed_cubes = iris.cube.CubeList([]) 

112 with warnings.catch_warnings(): 

113 warnings.filterwarnings( 

114 "ignore", "Cannot check if coordinate is contiguous", UserWarning 

115 ) 

116 warnings.filterwarnings( 

117 "ignore", "Collapsing spatial coordinate.+without weighting", UserWarning 

118 ) 

119 for cube in iter_maybe(cubes): 

120 if method == "PERCENTILE": 

121 collapsed_cubes.append( 

122 cube.collapsed( 

123 coordinate, 

124 getattr(iris.analysis, method), 

125 percent=additional_percent, 

126 ) 

127 ) 

128 elif method == "RANGE": 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 cube_max = cube.collapsed(coordinate, iris.analysis.MAX) 

130 cube_min = cube.collapsed(coordinate, iris.analysis.MIN) 

131 collapsed_cubes.append(cube_max - cube_min) 

132 elif method == "PROPORTION": 

133 match condition: 

134 case "eq": 

135 new_cube = cube.collapsed( 

136 coordinate, 

137 getattr(iris.analysis, method), 

138 function=lambda values: values == threshold, 

139 ) 

140 case "ne": 

141 new_cube = cube.collapsed( 

142 coordinate, 

143 getattr(iris.analysis, method), 

144 function=lambda values: values != threshold, 

145 ) 

146 case "gt": 

147 new_cube = cube.collapsed( 

148 coordinate, 

149 getattr(iris.analysis, method), 

150 function=lambda values: values > threshold, 

151 ) 

152 case "ge": 

153 new_cube = cube.collapsed( 

154 coordinate, 

155 getattr(iris.analysis, method), 

156 function=lambda values: values >= threshold, 

157 ) 

158 case "lt": 

159 new_cube = cube.collapsed( 

160 coordinate, 

161 getattr(iris.analysis, method), 

162 function=lambda values: values < threshold, 

163 ) 

164 case "le": 

165 new_cube = cube.collapsed( 

166 coordinate, 

167 getattr(iris.analysis, method), 

168 function=lambda values: values <= threshold, 

169 ) 

170 case _: 

171 raise ValueError( 

172 """Unexpected value for condition. Expected eq, ne, gt, ge, lt, le. Got {condition}.""" 

173 ) 

174 new_cube.rename(f"probability_of_{cube.name()}_{condition}_{threshold}") 

175 new_cube.units = "1" 

176 collapsed_cubes.append(new_cube) 

177 else: 

178 collapsed_cubes.append( 

179 cube.collapsed(coordinate, getattr(iris.analysis, method)) 

180 ) 

181 if len(collapsed_cubes) == 1: 

182 return collapsed_cubes[0] 

183 else: 

184 return collapsed_cubes 

185 

186 

187def collapse_by_hour_of_day( 

188 cubes: iris.cube.Cube | iris.cube.CubeList, 

189 method: str, 

190 additional_percent: float = None, 

191 **kwargs, 

192) -> iris.cube.Cube: 

193 """Collapse a cube by hour of the day. 

194 

195 Collapses a cube by hour of the day in the time coordinates provided by the 

196 model. It is useful for creating diurnal cycle plots. It aggregates all 00 

197 UTC together regardless of lead time. 

198 

199 Arguments 

200 --------- 

201 cubes: iris.cube.Cube | iris.cube.CubeList 

202 Cube to collapse and iterate over one dimension or CubeList to convert 

203 to a cube and then collapse prior to aggregating by hour. If a CubeList 

204 is provided each cube is handled separately. 

205 method: str 

206 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

207 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified. 

208 

209 Returns 

210 ------- 

211 cube: iris.cube.Cube 

212 Single variable but several methods of aggregation. 

213 

214 Raises 

215 ------ 

216 ValueError 

217 If additional_percent wasn't supplied while using PERCENTILE method. 

218 

219 Notes 

220 ----- 

221 Collapsing of the cube is around the 'time' coordinate. The coordinates are 

222 first grouped by the hour of day, and then aggregated by the hour of day to 

223 create a diurnal cycle. This operator is applicable for both single 

224 forecasts and for multiple forecasts. The hour used is based on the units of 

225 the time coordinate. If the time coordinate is in UTC, hour will be in UTC. 

226 

227 To apply this operator successfully there must only be one time dimension. 

228 Should a MultiDim exception be raised the user first needs to apply the 

229 collapse operator to reduce the time dimensions before applying this 

230 operator. A cube containing the two time dimensions 

231 'forecast_reference_time' and 'forecast_period' will be automatically 

232 collapsed by lead time before being being collapsed by hour of day. 

233 """ 

234 if method == "PERCENTILE" and additional_percent is None: 

235 raise ValueError("Must specify additional_percent") 

236 

237 # Retain only common time points between different models if multiple model inputs. 

238 if isinstance(cubes, iris.cube.CubeList) and len(cubes) > 1: 

239 logging.debug( 

240 "Extracting common time points as multiple model inputs detected." 

241 ) 

242 for cube in cubes: 

243 cube.coord("forecast_reference_time").bounds = None 

244 cube.coord("forecast_period").bounds = None 

245 cubes = cubes.extract_overlapping( 

246 ["forecast_reference_time", "forecast_period"] 

247 ) 

248 if len(cubes) == 0: 

249 raise ValueError("No overlapping times detected in input cubes.") 

250 

251 collapsed_cubes = iris.cube.CubeList([]) 

252 for cube in iter_maybe(cubes): 

253 # Ensure hour coordinate in each input is sorted, and data adjusted if needed. 

254 sorted_cube = iris.cube.CubeList() 

255 for fcst_slice in cube.slices_over(["forecast_reference_time"]): 

256 # Categorise the time coordinate by hour of the day. 

257 fcst_slice = add_hour_coordinate(fcst_slice) 

258 if method == "PERCENTILE": 

259 by_hour = fcst_slice.aggregated_by( 

260 "hour", getattr(iris.analysis, method), percent=additional_percent 

261 ) 

262 else: 

263 by_hour = fcst_slice.aggregated_by( 

264 "hour", getattr(iris.analysis, method) 

265 ) 

266 # Compute if data needs sorting to lie in increasing order [0..23]. 

267 # Note multiple forecasts can sit in same cube spanning different 

268 # initialisation times and data ranges. 

269 time_points = by_hour.coord("hour").points 

270 time_points_sorted = np.sort(by_hour.coord("hour").points) 

271 if time_points[0] != time_points_sorted[0]: 271 ↛ 279line 271 didn't jump to line 279 because the condition on line 271 was always true

272 nroll = time_points[0] / (time_points[1] - time_points[0]) 

273 # Shift hour coordinate and data cube to be in time of day order. 

274 by_hour.coord("hour").points = np.roll(time_points, nroll, 0) 

275 by_hour.data = np.roll(by_hour.data, nroll, axis=0) 

276 

277 # Remove unnecessary time coordinate. 

278 # "hour" and "forecast_period" remain as AuxCoord. 

279 by_hour.remove_coord("time") 

280 

281 sorted_cube.append(by_hour) 

282 

283 # Recombine cube slices. 

284 cube = sorted_cube.merge_cube() 

285 

286 if cube.coords("forecast_reference_time", dim_coords=True): 

287 # Collapse by forecast reference time to get a single cube. 

288 cube = collapse( 

289 cube, 

290 "forecast_reference_time", 

291 method, 

292 additional_percent=additional_percent, 

293 ) 

294 else: 

295 # Or remove forecast reference time if a single case, as collapse 

296 # will have effectively done this. 

297 cube.remove_coord("forecast_reference_time") 

298 

299 # Promote "hour" to dim_coord. 

300 iris.util.promote_aux_coord_to_dim_coord(cube, "hour") 

301 collapsed_cubes.append(cube) 

302 

303 if len(collapsed_cubes) == 1: 

304 return collapsed_cubes[0] 

305 else: 

306 return collapsed_cubes 

307 

308 

309def collapse_by_validity_time( 

310 cubes: iris.cube.Cube | iris.cube.CubeList, 

311 method: str, 

312 additional_percent: float = None, 

313 **kwargs, 

314) -> iris.cube.Cube: 

315 """Collapse a cube around validity time for multiple cases. 

316 

317 First checks if the data can be aggregated easily. Then creates a new cube 

318 by slicing over the time dimensions, removing the time dimensions, 

319 re-merging the data, and creating a new time coordinate. It then collapses 

320 by the new time coordinate for a specified method using the collapse 

321 function. 

322 

323 Arguments 

324 --------- 

325 cubes: iris.cube.Cube | iris.cube.CubeList 

326 Cube to collapse by validity time or CubeList that will be converted 

327 to a cube before collapsing by validity time. 

328 method: str 

329 Type of collapse i.e. method: 'MEAN', 'MAX', 'MIN', 'MEDIAN', 

330 'PERCENTILE'. For 'PERCENTILE' the additional_percent must be specified. 

331 

332 Returns 

333 ------- 

334 cube: iris.cube.Cube | iris.cube.CubeList 

335 Single variable collapsed by lead time based on chosen method. 

336 

337 Raises 

338 ------ 

339 ValueError 

340 If additional_percent wasn't supplied while using PERCENTILE method. 

341 """ 

342 if method == "PERCENTILE" and additional_percent is None: 

343 raise ValueError("Must specify additional_percent") 

344 

345 collapsed_cubes = iris.cube.CubeList([]) 

346 for cube in iter_maybe(cubes): 

347 # Slice over cube by both time dimensions to create a CubeList. 

348 new_cubelist = iris.cube.CubeList( 

349 cube.slices_over(["forecast_period", "forecast_reference_time"]) 

350 ) 

351 for sub_cube in new_cubelist: 

352 # Reconstruct the time coordinate if it is missing. 

353 if "time" not in [coord.name() for coord in sub_cube.coords()]: 

354 ref_time_coord = sub_cube.coord("forecast_reference_time") 

355 ref_units = ref_time_coord.units 

356 ref_time = ref_units.num2date(ref_time_coord.points) 

357 period_coord = sub_cube.coord("forecast_period") 

358 period_units = period_coord.units 

359 # Given how we are slicing there will only be one point. 

360 period_seconds = period_units.convert(period_coord.points[0], "seconds") 

361 period_duration = datetime.timedelta(seconds=period_seconds) 

362 time = ref_time + period_duration 

363 time_points = ref_units.date2num(time) 

364 time_coord = iris.coords.AuxCoord( 

365 points=time_points, standard_name="time", units=ref_units 

366 ) 

367 sub_cube.add_aux_coord(time_coord) 

368 # Remove forecast_period and forecast_reference_time coordinates. 

369 sub_cube.remove_coord("forecast_period") 

370 sub_cube.remove_coord("forecast_reference_time") 

371 # Create new CubeList by merging with unique = False to produce a validity 

372 # time cube. 

373 merged_list_1 = new_cubelist.merge(unique=False) 

374 # Create a new "fake" coordinate and apply to each remaining cube to allow 

375 # final merging to take place into a single cube. 

376 equalised_validity_time = iris.coords.AuxCoord( 

377 points=0, long_name="equalised_validity_time", units="1" 

378 ) 

379 for sub_cube, eq_valid_time in zip( 

380 merged_list_1, range(len(merged_list_1)), strict=True 

381 ): 

382 sub_cube.add_aux_coord(equalised_validity_time.copy(points=eq_valid_time)) 

383 

384 # Merge CubeList to create final cube. 

385 final_cube = merged_list_1.merge_cube() 

386 logging.debug("Pre-collapse validity time cube:\n%s", final_cube) 

387 

388 # Collapse over equalised_validity_time as a proxy for equal validity 

389 # time. 

390 try: 

391 collapsed_cube = collapse( 

392 final_cube, 

393 "equalised_validity_time", 

394 method, 

395 additional_percent=additional_percent, 

396 ) 

397 except iris.exceptions.CoordinateCollapseError as err: 

398 raise ValueError( 

399 "Cubes do not overlap therefore cannot collapse across validity time." 

400 ) from err 

401 collapsed_cube.remove_coord("equalised_validity_time") 

402 collapsed_cubes.append(collapsed_cube) 

403 

404 if len(collapsed_cubes) == 1: 

405 return collapsed_cubes[0] 

406 else: 

407 return collapsed_cubes 

408 

409 

410# TODO 

411# Collapse function that calculates means, medians etc across members of an 

412# ensemble or stratified groups. Need to allow collapse over realisation 

413# dimension for fixed time. Hence will require reading in of CubeList