Coverage for src / CSET / operators / _utils.py: 74%

220 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 15:02 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common operator functionality used across CSET. 

17 

18Functions below should only be added if it is not suitable as a standalone 

19operator, and will be used across multiple operators. 

20""" 

21 

22import logging 

23import re 

24from datetime import timedelta 

25 

26import iris 

27import iris.coords 

28import iris.cube 

29import iris.exceptions 

30import iris.util 

31from iris.time import PartialDateTime 

32 

33 

34def pdt_fromisoformat( 

35 datestring, 

36) -> tuple[iris.time.PartialDateTime, timedelta | None]: 

37 """Generate PartialDateTime object. 

38 

39 Function that takes an ISO 8601 date string and returns a PartialDateTime object. 

40 

41 Arguments 

42 --------- 

43 datestring: str 

44 ISO 8601 date. 

45 

46 Returns 

47 ------- 

48 time_object: iris.time.PartialDateTime 

49 """ 

50 

51 def make_offset(sign, value) -> timedelta: 

52 if len(value) not in [2, 4, 5]: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}') 

54 

55 hours = int(value[:2]) 

56 minutes = 0 

57 if len(value) in [4, 5]: 

58 minutes = int(value[-2:]) 

59 return timedelta(hours=sign * hours, minutes=sign * minutes) 

60 

61 # Remove the microseconds coord due to no support in PartialDateTime 

62 datestring = re.sub(r"\.\d+", "", datestring) 

63 

64 datetime_split = datestring.split("T") 

65 date = datetime_split[0] 

66 if len(datetime_split) == 1: 

67 time = "" 

68 elif len(datetime_split) == 2: 68 ↛ 71line 68 didn't jump to line 71 because the condition on line 68 was always true

69 time = datetime_split[1] 

70 else: 

71 raise ValueError("datesting in an unexpected format") 

72 

73 offset = None 

74 time_split = time.split("+") 

75 if len(time_split) == 2: 

76 time = time_split[0] 

77 offset = make_offset(1, time_split[1]) 

78 else: 

79 time_split = time.split("-") 

80 if len(time_split) == 2: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 time = time_split[0] 

82 offset = make_offset(-1, time_split[1]) 

83 else: 

84 offset = None 

85 

86 if re.fullmatch(r"\d{8}", date): 

87 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}" 

88 elif re.fullmatch(r"\d{6}", date): 

89 date = f"{date[0:4]}-{date[4:6]}" 

90 

91 if len(date) < 7: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM") 

93 

94 # Returning a PartialDateTime for the special case of string form "YYYY-MM" 

95 if re.fullmatch(r"\d{4}-\d{2}", date): 

96 pdt = PartialDateTime( 

97 year=int(date[0:4]), 

98 month=int(date[5:7]), 

99 day=None, 

100 hour=None, 

101 minute=None, 

102 second=None, 

103 ) 

104 return pdt, offset 

105 

106 year = int(date[0:4]) 

107 month = int(date[5:7]) 

108 day = int(date[8:10]) 

109 

110 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0) 

111 

112 # Normalise the time parts into standard format 

113 if re.fullmatch(r"\d{4}", time): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 time = f"{time[0:2]}:{time[2:4]}" 

115 if re.fullmatch(r"\d{6}", time): 

116 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}" 

117 

118 if len(time) >= 2: 

119 kwargs["hour"] = int(time[0:2]) 

120 if len(time) >= 5: 

121 kwargs["minute"] = int(time[3:5]) 

122 if len(time) >= 8: 

123 kwargs["second"] = int(time[6:8]) 

124 

125 pdt = PartialDateTime(**kwargs) 

126 

127 return pdt, offset 

128 

129 

130def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]: 

131 """ 

132 Return horizontal dimension coordinate name(s) from a given cube. 

133 

134 Arguments 

135 --------- 

136 

137 cube: iris.cube.Cube 

138 An iris cube which will be checked to see if it contains coordinate 

139 names that match a pre-defined list of acceptable horizontal 

140 dimension coordinate names. 

141 

142 Returns 

143 ------- 

144 (y_coord, x_coord) 

145 A tuple containing the horizontal coordinate name for latitude and longitude respectively 

146 found within the cube. 

147 

148 Raises 

149 ------ 

150 ValueError 

151 If a unique y/x horizontal coordinate cannot be found. 

152 """ 

153 # Acceptable horizontal coordinate names. 

154 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

155 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

156 

157 # Get a list of dimension coordinate names for the cube 

158 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

159 coord_names = [coord.name() for coord in cube.coords()] 

160 

161 # Check which x-coordinate we have, if any 

162 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

163 if len(x_coords) != 1: 

164 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES] 

165 if len(x_coords) != 1: 

166 raise ValueError("Could not identify a unique x-coordinate in cube") 

167 

168 # Check which y-coordinate we have, if any 

169 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

170 if len(y_coords) != 1: 

171 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES] 

172 if len(y_coords) != 1: 

173 raise ValueError("Could not identify a unique y-coordinate in cube") 

174 

175 return (y_coords[0], x_coords[0]) 

176 

177 

178def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int: 

179 """ 

180 Return coordinate dimension for a named coordinate from a given cube. 

181 

182 Arguments 

183 --------- 

184 

185 cube: iris.cube.Cube 

186 An iris cube which will be checked to see if it contains coordinate 

187 names that match a pre-defined list of acceptable horizontal 

188 coordinate names. 

189 

190 coord_name: str 

191 A cube dimension name 

192 

193 Returns 

194 ------- 

195 coord_index 

196 An integer specifying where in the cube dimension list a specified coordinate name is found. 

197 

198 Raises 

199 ------ 

200 ValueError 

201 If a specified dimension coordinate cannot be found. 

202 """ 

203 # Get a list of dimension coordinate names for the cube 

204 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

205 

206 # Check if requested dimension is found in cube and get index 

207 if coord_name in coord_names: 

208 coord_index = cube.coord_dims(coord_name)[0] 

209 else: 

210 raise ValueError("Could not find requested dimension %s", coord_name) 

211 

212 return coord_index 

213 

214 

215def is_spatialdim(cube: iris.cube.Cube) -> bool: 

216 """Determine whether a cube is has two spatial dimension coordinates. 

217 

218 If cube has both spatial dims, it will contain two unique coordinates 

219 that explain space (latitude and longitude). The coordinates have to 

220 be iterable/contain usable dimension data, as cubes may contain these 

221 coordinates as scalar dimensions after being collapsed. 

222 

223 Arguments 

224 --------- 

225 cube: iris.cube.Cube 

226 An iris cube which will be checked to see if it contains coordinate 

227 names that match a pre-defined list of acceptable coordinate names. 

228 

229 Returns 

230 ------- 

231 bool 

232 If true, then the cube has a spatial projection and thus can be plotted 

233 as a map. 

234 """ 

235 # Acceptable horizontal coordinate names. 

236 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

237 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

238 

239 # Get a list of coordinate names for the cube 

240 coord_names = [coord.name() for coord in cube.dim_coords] 

241 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

242 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

243 

244 # If there is one coordinate for both x and y direction return True. 

245 if len(x_coords) == 1 and len(y_coords) == 1: 

246 return True 

247 else: 

248 return False 

249 

250 

251def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool: 

252 """Determine whether a cube has specified dimension coordinates. 

253 

254 Arguments 

255 --------- 

256 cube: iris.cube.Cube 

257 An iris cube which will be checked to see if it contains coordinate 

258 names that match a pre-defined list of acceptable coordinate names. 

259 

260 coord_name: str 

261 A cube dimension name 

262 

263 Returns 

264 ------- 

265 bool 

266 If true, then the cube has a spatial projection and thus can be plotted 

267 as a map. 

268 """ 

269 # Get a list of dimension coordinate names for the cube 

270 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

271 

272 # Check if requested dimension is found in cube and get index 

273 if coord_name in coord_names: 

274 return True 

275 else: 

276 return False 

277 

278 

279def is_transect(cube: iris.cube.Cube) -> bool: 

280 """Determine whether a cube is a transect. 

281 

282 If cube is a transect, it will contain only one spatial (map) coordinate, 

283 and one vertical coordinate (either pressure or model level). 

284 

285 Arguments 

286 --------- 

287 cube: iris.cube.Cube 

288 An iris cube which will be checked to see if it contains coordinate 

289 names that match a pre-defined list of acceptable coordinate names. 

290 

291 Returns 

292 ------- 

293 bool 

294 If true, then the cube is a transect that contains one spatial (map) 

295 coordinate and one vertical coordinate. 

296 """ 

297 # Acceptable spatial (map) coordinate names. 

298 SPATIAL_MAP_COORD_NAMES = [ 

299 "longitude", 

300 "grid_longitude", 

301 "projection_x_coordinate", 

302 "x", 

303 "latitude", 

304 "grid_latitude", 

305 "projection_y_coordinate", 

306 "y", 

307 "distance", 

308 ] 

309 

310 # Acceptable vertical coordinate names 

311 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"] 

312 

313 # Get a list of coordinate names for the cube 

314 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

315 

316 # Check which spatial coordinates we have. 

317 spatial_coords = [ 

318 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES 

319 ] 

320 if len(spatial_coords) != 1: 

321 return False 

322 

323 # Check which vertical coordinates we have. 

324 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES] 

325 if len(vertical_coords) != 1: 

326 return False 

327 

328 # Passed criteria so return True 

329 return True 

330 

331 

332def check_stamp_coordinate(cube: iris.cube.Cube) -> str: 

333 """ 

334 Return stamp dimension coordinate name from a given cube, if exists. 

335 

336 If cube contains a valid stamp coordinate as a dimension coordinate, 

337 function will return name of the stamp coordinate. 

338 

339 Arguments 

340 --------- 

341 cube: iris.cube.Cube 

342 An iris cube which will be checked to see if it contains coordinate 

343 names that match a pre-defined list of acceptable coordinate names. 

344 

345 Returns 

346 ------- 

347 str 

348 If available, then return name of stamp coordinate. 

349 Defaults to "realization" if alternative stamp coordinate not found. 

350 """ 

351 # Acceptable stamp coordinate names 

352 STAMP_COORD_NAMES = ["realization", "member", "sample", "pseudo_level"] 

353 

354 # Check which dimension coordinates we have. 

355 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

356 

357 # Check if any acceptable stamp coordinates are cube dimensions. 

358 stamp_coords = [coord for coord in dim_coord_names if coord in STAMP_COORD_NAMES] 

359 if len(stamp_coords) == 1: 

360 stamp_coordinate = stamp_coords[0] 

361 else: 

362 stamp_coordinate = "realization" 

363 

364 return stamp_coordinate 

365 

366 

367def fully_equalise_attributes(cubes: iris.cube.CubeList): 

368 """Remove any unique attributes between cubes or coordinates in place.""" 

369 # Equalise cube attributes. 

370 removed = iris.util.equalise_attributes(cubes) 

371 logging.debug("Removed attributes from cube: %s", removed) 

372 

373 # Equalise coordinate attributes. 

374 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes] 

375 

376 all_coords = set.union(*coord_sets) 

377 coords_to_equalise = set.intersection(*coord_sets) 

378 coords_to_remove = set.difference(all_coords, coords_to_equalise) 

379 

380 logging.debug("All coordinates: %s", all_coords) 

381 logging.debug("Coordinates to remove: %s", coords_to_remove) 

382 logging.debug("Coordinates to equalise: %s", coords_to_equalise) 

383 

384 for coord in coords_to_remove: 

385 for cube in cubes: 

386 try: 

387 cube.remove_coord(coord) 

388 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name()) 

389 except iris.exceptions.CoordinateNotFoundError: 

390 pass 

391 

392 for coord in coords_to_equalise: 

393 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes]) 

394 logging.debug("Removed attributes from coordinate %s: %s", coord, removed) 

395 

396 return cubes 

397 

398 

399def slice_over_maybe(cube: iris.cube.Cube, coord_name, index): 

400 """Test slicing over cube if exists. 

401 

402 Return None if not existing. 

403 

404 Arguments 

405 --------- 

406 cube: iris.cube.Cube 

407 An iris cube which will be checked to see if it can be sliced over 

408 given coordinate. 

409 coord_name: coord 

410 An iris coordinate over which to slice cube. 

411 index: 

412 Coordinate index value to extract 

413 

414 Returns 

415 ------- 

416 cube_slice: iris.cube.Cube 

417 A slice of iris cube, if available to slice. 

418 """ 

419 if cube is None: 

420 return None 

421 

422 # Check if coord exists as dimension coordinate 

423 if not is_coorddim(cube, coord_name): 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 return cube 

425 

426 # Use iris to find which axis the dimension coordinate corresponds to 

427 dim = cube.coord_dims(coord_name)[0] 

428 

429 # Create list of slices for each dimension 

430 slices = [slice(None)] * cube.ndim 

431 

432 # Only replace the slice for the dim to be extracted 

433 slices[dim] = index 

434 

435 return cube[tuple(slices)] 

436 

437 

438def is_time_aux_coord(cube: iris.cube.Cube) -> bool: 

439 """Determine whether a cube has time coordinates as auxiliary coordinates. 

440 

441 Checks if 'forecast_period' and 'forecast_reference_time' exist as 

442 auxiliary coordinates rather than dimension coordinates. 

443 

444 Arguments 

445 --------- 

446 cube: iris.cube.Cube 

447 An iris cube which will be checked to see if it contains 

448 'forecast_period' and 'forecast_reference_time' as auxiliary 

449 coordinates. 

450 

451 Returns 

452 ------- 

453 bool 

454 If true, then the cube has both 'forecast_period' and 

455 'forecast_reference_time' as auxiliary coordinates (not dimension 

456 coordinates). 

457 """ 

458 # Acceptable time coordinate names 

459 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

460 

461 # Get dimension coordinate names 

462 dim_coord_names = [coord.name() for coord in cube.dim_coords] 

463 

464 # Check which temporal coordinates are being used as dimension coordinates 

465 temporal_dim_coords = [ 

466 coord for coord in dim_coord_names if coord in TEMPORAL_COORD_NAMES 

467 ] 

468 

469 # If both coordinates are dimension coordinates, return False 

470 if len(temporal_dim_coords) == 2: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 return False 

472 

473 # Check if both temporal coordinates exist in the cube 

474 has_forecast_period = False 

475 has_forecast_reference_time = False 

476 

477 try: 

478 cube.coord("forecast_period") 

479 has_forecast_period = True 

480 except iris.exceptions.CoordinateNotFoundError: 

481 pass 

482 

483 try: 

484 cube.coord("forecast_reference_time") 

485 has_forecast_reference_time = True 

486 except iris.exceptions.CoordinateNotFoundError: 

487 pass 

488 

489 # Check that both exist and are not used as dimension coordinates 

490 if has_forecast_period and has_forecast_reference_time: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 return len(temporal_dim_coords) == 0 

492 

493 return False 

494 

495 

496def is_time_aggregatable(cube: iris.cube.Cube) -> bool: 

497 """Determine whether a cube can be aggregated in time. 

498 

499 If a cube is aggregatable it will contain both a 'forecast_reference_time' 

500 and 'forecast_period' coordinate as dimension or scalar coordinates. 

501 

502 Arguments 

503 --------- 

504 cube: iris.cube.Cube 

505 An iris cube which will be checked to see if it is aggregatable based 

506 on a set of pre-defined dimensional time coordinates: 

507 'forecast_period' and 'forecast_reference_time'. 

508 

509 Returns 

510 ------- 

511 bool 

512 If true, then the cube is aggregatable and contains dimensional 

513 coordinates including both 'forecast_reference_time' and 

514 'forecast_period'. 

515 """ 

516 # Acceptable time coordinate names for aggregatable cube. 

517 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

518 

519 def strictly_monotonic(coord: iris.coords.Coord) -> bool: 

520 """Return whether a coord is strictly monotonic, catching errors.""" 

521 try: 

522 return coord.is_monotonic() 

523 except iris.exceptions.CoordinateMultiDimError: 

524 return False 

525 

526 # Strictly monotonic coordinate names for the cube. 

527 coord_names = [coord.name() for coord in cube.coords() if strictly_monotonic(coord)] 

528 

529 # Check which temporal coordinates we have. 

530 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES] 

531 # Return whether both coordinates are in the temporal coordinates. 

532 return len(temporal_coords) == 2 

533 

534 

535def guess_bounds(cube): 

536 """ 

537 Guess bounds for x and y coordinates on a cube. 

538 

539 Arguments 

540 --------- 

541 cube: iris.cube.Cube 

542 Input cube whose x and y coordinate bounds will be guessed if missing. 

543 

544 Returns 

545 ------- 

546 iris.cube.Cube 

547 The same cube with bounds added to x and y coordinates where absent. 

548 

549 Raises 

550 ------ 

551 ValueError 

552 If the cube uses a variable resolution grid where bounds cannot be 

553 guessed reliably. 

554 """ 

555 # Loop over spatial coordinates 

556 for axis in ["x", "y"]: 

557 coord = cube.coord(axis=axis) 

558 try: 

559 _ = iris.util.regular_step(coord) 

560 except ValueError as e: 

561 logging.warning( 

562 "Cannot guess bounds for a variable resolution (non-regular) grid: %s", 

563 e, 

564 ) 

565 # Guess bounds if there aren't any 

566 if coord.bounds is None: 

567 coord.guess_bounds() 

568 return cube 

569 

570 

571def identify_unique_times(cubes, time_coord_name): 

572 """Identify unique time points across a Cube or CubeList. 

573 

574 Arguments 

575 --------- 

576 cubes: iris.cube.Cube | iris.cube.CubeList 

577 A single cube or CubeList to extract unique times from. 

578 time_coord_name: str 

579 Name of the time coordinate to extract (e.g., "time", "forecast_period"). 

580 

581 Returns 

582 ------- 

583 time_coord: iris.coords.DimCoord 

584 A dimension coordinate containing all unique time points sorted in order. 

585 """ 

586 # Handle single cube input 

587 if isinstance(cubes, iris.cube.Cube): 

588 cubes = iris.cube.CubeList([cubes]) 

589 

590 times = [] 

591 time_unit = None 

592 # Loop over cubes 

593 for cube in cubes: 

594 # Extract the desired time coordinate from the cube 

595 time_coord = cube.coord(time_coord_name) 

596 

597 # Get the units for the specified time coordinate 

598 if time_unit is None: 

599 time_unit = time_coord.units 

600 

601 # Store the time coordinate points 

602 times.extend(time_coord.points) 

603 

604 # Construct a list of unique times and store them in a new time coordinate 

605 times = sorted(list(set(times))) 

606 time_coord = iris.coords.DimCoord(times, units=time_unit) 

607 time_coord.rename(time_coord_name) 

608 

609 return time_coord 

610 

611 

612def remove_cell_method(cube, cell_method): 

613 cell_methods = [cm for cm in cube.cell_methods if cm != cell_method] 

614 cube.cell_methods = () 

615 for cm in cell_methods: 

616 cube.add_cell_method(cm) 

617 return cube 

618 

619 

620def remove_duplicates(cubelist): 

621 # Nothing to do if the cubelist is empty 

622 if not cubelist: 

623 return cubelist 

624 # Build up a list of indices of the cubes to remove because they are 

625 # duplicated 

626 indices_to_remove = [] 

627 for i in range(len(cubelist) - 1): 

628 cube_i = cubelist[i] 

629 for j in range(i + 1, len(cubelist)): 

630 cube_j = cubelist[j] 

631 if cube_i == cube_j: 

632 if j not in indices_to_remove: 

633 indices_to_remove.append(j) 

634 # Only keep unique cubes 

635 cubelist = iris.cube.CubeList( 

636 [cube for index, cube in enumerate(cubelist) if index not in indices_to_remove] 

637 ) 

638 return cubelist