Coverage for src/CSET/operators/_utils.py: 94%

183 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-17 15:44 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common operator functionality used across CSET. 

17 

18Functions below should only be added if it is not suitable as a standalone 

19operator, and will be used across multiple operators. 

20""" 

21 

22import logging 

23import re 

24from datetime import timedelta 

25 

26import iris 

27import iris.coords 

28import iris.cube 

29import iris.exceptions 

30import iris.util 

31from iris.time import PartialDateTime 

32 

33from CSET._common import iter_maybe 

34 

35 

36def pdt_fromisoformat( 

37 datestring, 

38) -> tuple[iris.time.PartialDateTime, timedelta | None]: 

39 """Generate PartialDateTime object. 

40 

41 Function that takes an ISO 8601 date string and returns a PartialDateTime object. 

42 

43 Arguments 

44 --------- 

45 datestring: str 

46 ISO 8601 date. 

47 

48 Returns 

49 ------- 

50 time_object: iris.time.PartialDateTime 

51 """ 

52 

53 def make_offset(sign, value) -> timedelta: 

54 if len(value) not in [2, 4, 5]: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}') 

56 

57 hours = int(value[:2]) 

58 minutes = 0 

59 if len(value) in [4, 5]: 

60 minutes = int(value[-2:]) 

61 return timedelta(hours=sign * hours, minutes=sign * minutes) 

62 

63 # Remove the microseconds coord due to no support in PartialDateTime 

64 datestring = re.sub(r"\.\d+", "", datestring) 

65 

66 datetime_split = datestring.split("T") 

67 date = datetime_split[0] 

68 if len(datetime_split) == 1: 

69 time = "" 

70 elif len(datetime_split) == 2: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 time = datetime_split[1] 

72 else: 

73 raise ValueError("datesting in an unexpected format") 

74 

75 offset = None 

76 time_split = time.split("+") 

77 if len(time_split) == 2: 

78 time = time_split[0] 

79 offset = make_offset(1, time_split[1]) 

80 else: 

81 time_split = time.split("-") 

82 if len(time_split) == 2: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 time = time_split[0] 

84 offset = make_offset(-1, time_split[1]) 

85 else: 

86 offset = None 

87 

88 if re.fullmatch(r"\d{8}", date): 

89 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}" 

90 elif re.fullmatch(r"\d{6}", date): 

91 date = f"{date[0:4]}-{date[4:6]}" 

92 

93 if len(date) < 7: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM") 

95 

96 # Returning a PartialDateTime for the special case of string form "YYYY-MM" 

97 if re.fullmatch(r"\d{4}-\d{2}", date): 

98 pdt = PartialDateTime( 

99 year=int(date[0:4]), 

100 month=int(date[5:7]), 

101 day=None, 

102 hour=None, 

103 minute=None, 

104 second=None, 

105 ) 

106 return pdt, offset 

107 

108 year = int(date[0:4]) 

109 month = int(date[5:7]) 

110 day = int(date[8:10]) 

111 

112 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0) 

113 

114 # Normalise the time parts into standard format 

115 if re.fullmatch(r"\d{4}", time): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 time = f"{time[0:2]}:{time[2:4]}" 

117 if re.fullmatch(r"\d{6}", time): 

118 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}" 

119 

120 if len(time) >= 2: 

121 kwargs["hour"] = int(time[0:2]) 

122 if len(time) >= 5: 

123 kwargs["minute"] = int(time[3:5]) 

124 if len(time) >= 8: 

125 kwargs["second"] = int(time[6:8]) 

126 

127 pdt = PartialDateTime(**kwargs) 

128 

129 return pdt, offset 

130 

131 

132def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]: 

133 """ 

134 Return horizontal dimension coordinate name(s) from a given cube. 

135 

136 Arguments 

137 --------- 

138 

139 cube: iris.cube.Cube 

140 An iris cube which will be checked to see if it contains coordinate 

141 names that match a pre-defined list of acceptable horizontal 

142 dimension coordinate names. 

143 

144 Returns 

145 ------- 

146 (y_coord, x_coord) 

147 A tuple containing the horizontal coordinate name for latitude and longitude respectively 

148 found within the cube. 

149 

150 Raises 

151 ------ 

152 ValueError 

153 If a unique y/x horizontal coordinate cannot be found. 

154 """ 

155 # Acceptable horizontal coordinate names. 

156 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

157 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

158 

159 # Get a list of dimension coordinate names for the cube 

160 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

161 coord_names = [coord.name() for coord in cube.coords()] 

162 

163 # Check which x-coordinate we have, if any 

164 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

165 if len(x_coords) != 1: 

166 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES] 

167 if len(x_coords) != 1: 

168 raise ValueError("Could not identify a unique x-coordinate in cube") 

169 

170 # Check which y-coordinate we have, if any 

171 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

172 if len(y_coords) != 1: 

173 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES] 

174 if len(y_coords) != 1: 

175 raise ValueError("Could not identify a unique y-coordinate in cube") 

176 

177 return (y_coords[0], x_coords[0]) 

178 

179 

180def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int: 

181 """ 

182 Return coordinate dimension for a named coordinate from a given cube. 

183 

184 Arguments 

185 --------- 

186 

187 cube: iris.cube.Cube 

188 An iris cube which will be checked to see if it contains coordinate 

189 names that match a pre-defined list of acceptable horizontal 

190 coordinate names. 

191 

192 coord_name: str 

193 A cube dimension name 

194 

195 Returns 

196 ------- 

197 coord_index 

198 An integer specifying where in the cube dimension list a specified coordinate name is found. 

199 

200 Raises 

201 ------ 

202 ValueError 

203 If a specified dimension coordinate cannot be found. 

204 """ 

205 # Get a list of dimension coordinate names for the cube 

206 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

207 

208 # Check if requested dimension is found in cube and get index 

209 if coord_name in coord_names: 

210 coord_index = cube.coord_dims(coord_name)[0] 

211 else: 

212 raise ValueError("Could not find requested dimension %s", coord_name) 

213 

214 return coord_index 

215 

216 

217def is_spatialdim(cube: iris.cube.Cube) -> bool: 

218 """Determine whether a cube is has two spatial dimension coordinates. 

219 

220 If cube has both spatial dims, it will contain two unique coordinates 

221 that explain space (latitude and longitude). The coordinates have to 

222 be iterable/contain usable dimension data, as cubes may contain these 

223 coordinates as scalar dimensions after being collapsed. 

224 

225 Arguments 

226 --------- 

227 cube: iris.cube.Cube 

228 An iris cube which will be checked to see if it contains coordinate 

229 names that match a pre-defined list of acceptable coordinate names. 

230 

231 Returns 

232 ------- 

233 bool 

234 If true, then the cube has a spatial projection and thus can be plotted 

235 as a map. 

236 """ 

237 # Acceptable horizontal coordinate names. 

238 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

239 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

240 

241 # Get a list of coordinate names for the cube 

242 coord_names = [coord.name() for coord in cube.dim_coords] 

243 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

244 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

245 

246 # If there is one coordinate for both x and y direction return True. 

247 if len(x_coords) == 1 and len(y_coords) == 1: 

248 return True 

249 else: 

250 return False 

251 

252 

253def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool: 

254 """Determine whether a cube has specified dimension coordinates. 

255 

256 Arguments 

257 --------- 

258 cube: iris.cube.Cube 

259 An iris cube which will be checked to see if it contains coordinate 

260 names that match a pre-defined list of acceptable coordinate names. 

261 

262 coord_name: str 

263 A cube dimension name 

264 

265 Returns 

266 ------- 

267 bool 

268 If true, then the cube has a spatial projection and thus can be plotted 

269 as a map. 

270 """ 

271 # Get a list of dimension coordinate names for the cube 

272 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

273 

274 # Check if requested dimension is found in cube and get index 

275 if coord_name in coord_names: 

276 return True 

277 else: 

278 return False 

279 

280 

281def is_transect(cube: iris.cube.Cube) -> bool: 

282 """Determine whether a cube is a transect. 

283 

284 If cube is a transect, it will contain only one spatial (map) coordinate, 

285 and one vertical coordinate (either pressure or model level). 

286 

287 Arguments 

288 --------- 

289 cube: iris.cube.Cube 

290 An iris cube which will be checked to see if it contains coordinate 

291 names that match a pre-defined list of acceptable coordinate names. 

292 

293 Returns 

294 ------- 

295 bool 

296 If true, then the cube is a transect that contains one spatial (map) 

297 coordinate and one vertical coordinate. 

298 """ 

299 # Acceptable spatial (map) coordinate names. 

300 SPATIAL_MAP_COORD_NAMES = [ 

301 "longitude", 

302 "grid_longitude", 

303 "projection_x_coordinate", 

304 "x", 

305 "latitude", 

306 "grid_latitude", 

307 "projection_y_coordinate", 

308 "y", 

309 "distance", 

310 ] 

311 

312 # Acceptable vertical coordinate names 

313 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"] 

314 

315 # Get a list of coordinate names for the cube 

316 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

317 

318 # Check which spatial coordinates we have. 

319 spatial_coords = [ 

320 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES 

321 ] 

322 if len(spatial_coords) != 1: 

323 return False 

324 

325 # Check which vertical coordinates we have. 

326 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES] 

327 if len(vertical_coords) != 1: 

328 return False 

329 

330 # Passed criteria so return True 

331 return True 

332 

333 

334def check_stamp_coordinate(cube: iris.cube.Cube) -> str: 

335 """ 

336 Return stamp dimension coordinate name from a given cube, if exists. 

337 

338 If cube contains a valid stamp coordinate as a dimension coordinate, 

339 function will return name of the stamp coordinate. 

340 

341 Arguments 

342 --------- 

343 cube: iris.cube.Cube 

344 An iris cube which will be checked to see if it contains coordinate 

345 names that match a pre-defined list of acceptable coordinate names. 

346 

347 Returns 

348 ------- 

349 str 

350 If available, then return name of stamp coordinate. 

351 Defaults to "realization" if alternative stamp coordinate not found. 

352 """ 

353 # Acceptable stamp coordinate names 

354 STAMP_COORD_NAMES = ["realization", "member", "sample", "pseudo_level"] 

355 

356 # Check which dimension coordinates we have. 

357 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

358 

359 # Check if any acceptable stamp coordinates are cube dimensions. 

360 stamp_coords = [coord for coord in dim_coord_names if coord in STAMP_COORD_NAMES] 

361 if len(stamp_coords) == 1: 

362 stamp_coordinate = stamp_coords[0] 

363 else: 

364 stamp_coordinate = "realization" 

365 

366 return stamp_coordinate 

367 

368 

369def fully_equalise_attributes(cubes: iris.cube.CubeList): 

370 """Remove any unique attributes between cubes or coordinates in place.""" 

371 # Equalise cube attributes. 

372 removed = iris.util.equalise_attributes(cubes) 

373 logging.debug("Removed attributes from cube: %s", removed) 

374 

375 # Equalise coordinate attributes. 

376 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes] 

377 

378 all_coords = set.union(*coord_sets) 

379 coords_to_equalise = set.intersection(*coord_sets) 

380 coords_to_remove = set.difference(all_coords, coords_to_equalise) 

381 

382 logging.debug("All coordinates: %s", all_coords) 

383 logging.debug("Coordinates to remove: %s", coords_to_remove) 

384 logging.debug("Coordinates to equalise: %s", coords_to_equalise) 

385 

386 for coord in coords_to_remove: 

387 for cube in cubes: 

388 try: 

389 cube.remove_coord(coord) 

390 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name()) 

391 except iris.exceptions.CoordinateNotFoundError: 

392 pass 

393 

394 for coord in coords_to_equalise: 

395 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes]) 

396 logging.debug("Removed attributes from coordinate %s: %s", coord, removed) 

397 

398 return cubes 

399 

400 

401def slice_over_maybe(cube: iris.cube.Cube, coord_name, index): 

402 """Test slicing over cube if exists. 

403 

404 Return None if not existing. 

405 

406 Arguments 

407 --------- 

408 cube: iris.cube.Cube 

409 An iris cube which will be checked to see if it can be sliced over 

410 given coordinate. 

411 coord_name: coord 

412 An iris coordinate over which to slice cube. 

413 index: 

414 Coordinate index value to extract 

415 

416 Returns 

417 ------- 

418 cube_slice: iris.cube.Cube 

419 A slice of iris cube, if available to slice. 

420 """ 

421 if cube is None: 

422 return None 

423 

424 # Check if coord exists as dimension coordinate 

425 if not is_coorddim(cube, coord_name): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 return cube 

427 

428 # Use iris to find which axis the dimension coordinate corresponds to 

429 dim = cube.coord_dims(coord_name)[0] 

430 

431 # Create list of slices for each dimension 

432 slices = [slice(None)] * cube.ndim 

433 

434 # Only replace the slice for the dim to be extracted 

435 slices[dim] = index 

436 

437 return cube[tuple(slices)] 

438 

439 

440def is_time_aggregatable(cube: iris.cube.Cube) -> bool: 

441 """Determine whether a cube can be aggregated in time. 

442 

443 If a cube is aggregatable it will contain both a 'forecast_reference_time' 

444 and 'forecast_period' coordinate as dimension or scalar coordinates. 

445 

446 Arguments 

447 --------- 

448 cube: iris.cube.Cube 

449 An iris cube which will be checked to see if it is aggregatable based 

450 on a set of pre-defined dimensional time coordinates: 

451 'forecast_period' and 'forecast_reference_time'. 

452 

453 Returns 

454 ------- 

455 bool 

456 If true, then the cube is aggregatable and contains dimensional 

457 coordinates including both 'forecast_reference_time' and 

458 'forecast_period'. 

459 """ 

460 # Acceptable time coordinate names for aggregatable cube. 

461 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

462 

463 def strictly_monotonic(coord: iris.coords.Coord) -> bool: 

464 """Return whether a coord is strictly monotonic, catching errors.""" 

465 try: 

466 return coord.is_monotonic() 

467 except iris.exceptions.CoordinateMultiDimError: 

468 return False 

469 

470 # Strictly monotonic coordinate names for the cube. 

471 coord_names = [coord.name() for coord in cube.coords() if strictly_monotonic(coord)] 

472 

473 # Check which temporal coordinates we have. 

474 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES] 

475 # Return whether both coordinates are in the temporal coordinates. 

476 return len(temporal_coords) == 2 

477 

478 

479def check_single_cube(cube: iris.cube.Cube | iris.cube.CubeList) -> iris.cube.Cube: 

480 """Ensure a single cube is given. 

481 

482 If a CubeList of length one is given that the contained cube is returned, 

483 otherwise an error is raised. 

484 

485 Parameters 

486 ---------- 

487 cube: Cube | CubeList 

488 The cube to check. 

489 

490 Returns 

491 ------- 

492 cube: Cube 

493 The checked cube. 

494 

495 Raises 

496 ------ 

497 TypeError 

498 If the input cube is not a Cube or CubeList of a single Cube. 

499 """ 

500 if isinstance(cube, iris.cube.Cube): 

501 return cube 

502 if isinstance(cube, iris.cube.CubeList): 

503 if len(cube) == 1: 

504 return cube[0] 

505 else: 

506 raise ValueError("CubeList did not contain a single cube.", cube) 

507 raise TypeError( 

508 "check_single_cube requires a Cube or CubeList of a single cube.", cube 

509 ) 

510 

511 

512def check_sequence_coordinate(cubes, sequence_coordinate): 

513 # If several histograms are plotted with time as sequence_coordinate for the 

514 # time slider option. 

515 for cube in iter_maybe(cubes): 

516 try: 

517 cube.coord(sequence_coordinate) 

518 except iris.exceptions.CoordinateNotFoundError as err: 

519 raise ValueError( 

520 f"Cube must have a {sequence_coordinate} coordinate." 

521 ) from err 

522 

523 

524def get_num_models(cube: iris.cube.Cube | iris.cube.CubeList) -> int: 

525 """Return number of models based on cube attributes.""" 

526 model_names = {cb.attributes.get("model_name") for cb in iter_maybe(cube)} 

527 

528 if not model_names: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 logging.debug("Missing model names. Will assume single model.") 

530 return 1 

531 else: 

532 return len(model_names) 

533 

534 

535def validate_cube_shape( 

536 cube: iris.cube.Cube | iris.cube.CubeList, num_models: int 

537) -> None: 

538 """Check all cubes have a model name.""" 

539 if isinstance(cube, iris.cube.CubeList) and len(cube) != num_models: 

540 raise ValueError( 

541 f"The number of model names ({num_models}) should equal the number " 

542 f"of cubes ({len(cube)})." 

543 ) 

544 

545 

546def validate_cubes_coords( 

547 cubes: iris.cube.CubeList, coords: list[iris.coords.Coord] 

548) -> None: 

549 """Check same number of cubes as sequence coordinate for zip functions.""" 

550 if len(cubes) != len(coords): 

551 raise ValueError( 

552 f"The number of CubeList entries ({len(cubes)}) should equal the number " 

553 f"of sequence coordinates ({len(coords)})." 

554 f"Check that number of time entries in input data are consistent if " 

555 f"performing time-averaging steps prior to plotting outputs." 

556 )