Coverage for src / CSET / operators / _utils.py: 94%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:48 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common operator functionality used across CSET. 

17 

18Functions below should only be added if it is not suitable as a standalone 

19operator, and will be used across multiple operators. 

20""" 

21 

22import logging 

23import re 

24from datetime import timedelta 

25 

26import iris 

27import iris.coords 

28import iris.cube 

29import iris.exceptions 

30import iris.util 

31from iris.time import PartialDateTime 

32 

33 

34def pdt_fromisoformat( 

35 datestring, 

36) -> tuple[iris.time.PartialDateTime, timedelta | None]: 

37 """Generate PartialDateTime object. 

38 

39 Function that takes an ISO 8601 date string and returns a PartialDateTime object. 

40 

41 Arguments 

42 --------- 

43 datestring: str 

44 ISO 8601 date. 

45 

46 Returns 

47 ------- 

48 time_object: iris.time.PartialDateTime 

49 """ 

50 

51 def make_offset(sign, value) -> timedelta: 

52 if len(value) not in [2, 4, 5]: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}') 

54 

55 hours = int(value[:2]) 

56 minutes = 0 

57 if len(value) in [4, 5]: 

58 minutes = int(value[-2:]) 

59 return timedelta(hours=sign * hours, minutes=sign * minutes) 

60 

61 # Remove the microseconds coord due to no support in PartialDateTime 

62 datestring = re.sub(r"\.\d+", "", datestring) 

63 

64 datetime_split = datestring.split("T") 

65 date = datetime_split[0] 

66 if len(datetime_split) == 1: 

67 time = "" 

68 elif len(datetime_split) == 2: 68 ↛ 71line 68 didn't jump to line 71 because the condition on line 68 was always true

69 time = datetime_split[1] 

70 else: 

71 raise ValueError("datesting in an unexpected format") 

72 

73 offset = None 

74 time_split = time.split("+") 

75 if len(time_split) == 2: 

76 time = time_split[0] 

77 offset = make_offset(1, time_split[1]) 

78 else: 

79 time_split = time.split("-") 

80 if len(time_split) == 2: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 time = time_split[0] 

82 offset = make_offset(-1, time_split[1]) 

83 else: 

84 offset = None 

85 

86 if re.fullmatch(r"\d{8}", date): 

87 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}" 

88 elif re.fullmatch(r"\d{6}", date): 

89 date = f"{date[0:4]}-{date[4:6]}" 

90 

91 if len(date) < 7: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM") 

93 

94 # Returning a PartialDateTime for the special case of string form "YYYY-MM" 

95 if re.fullmatch(r"\d{4}-\d{2}", date): 

96 pdt = PartialDateTime( 

97 year=int(date[0:4]), 

98 month=int(date[5:7]), 

99 day=None, 

100 hour=None, 

101 minute=None, 

102 second=None, 

103 ) 

104 return pdt, offset 

105 

106 year = int(date[0:4]) 

107 month = int(date[5:7]) 

108 day = int(date[8:10]) 

109 

110 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0) 

111 

112 # Normalise the time parts into standard format 

113 if re.fullmatch(r"\d{4}", time): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 time = f"{time[0:2]}:{time[2:4]}" 

115 if re.fullmatch(r"\d{6}", time): 

116 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}" 

117 

118 if len(time) >= 2: 

119 kwargs["hour"] = int(time[0:2]) 

120 if len(time) >= 5: 

121 kwargs["minute"] = int(time[3:5]) 

122 if len(time) >= 8: 

123 kwargs["second"] = int(time[6:8]) 

124 

125 pdt = PartialDateTime(**kwargs) 

126 

127 return pdt, offset 

128 

129 

130def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]: 

131 """ 

132 Return horizontal dimension coordinate name(s) from a given cube. 

133 

134 Arguments 

135 --------- 

136 

137 cube: iris.cube.Cube 

138 An iris cube which will be checked to see if it contains coordinate 

139 names that match a pre-defined list of acceptable horizontal 

140 dimension coordinate names. 

141 

142 Returns 

143 ------- 

144 (y_coord, x_coord) 

145 A tuple containing the horizontal coordinate name for latitude and longitude respectively 

146 found within the cube. 

147 

148 Raises 

149 ------ 

150 ValueError 

151 If a unique y/x horizontal coordinate cannot be found. 

152 """ 

153 # Acceptable horizontal coordinate names. 

154 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

155 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

156 

157 # Get a list of dimension coordinate names for the cube 

158 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

159 coord_names = [coord.name() for coord in cube.coords()] 

160 

161 # Check which x-coordinate we have, if any 

162 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

163 if len(x_coords) != 1: 

164 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES] 

165 if len(x_coords) != 1: 

166 raise ValueError("Could not identify a unique x-coordinate in cube") 

167 

168 # Check which y-coordinate we have, if any 

169 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

170 if len(y_coords) != 1: 

171 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES] 

172 if len(y_coords) != 1: 

173 raise ValueError("Could not identify a unique y-coordinate in cube") 

174 

175 return (y_coords[0], x_coords[0]) 

176 

177 

178def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int: 

179 """ 

180 Return coordinate dimension for a named coordinate from a given cube. 

181 

182 Arguments 

183 --------- 

184 

185 cube: iris.cube.Cube 

186 An iris cube which will be checked to see if it contains coordinate 

187 names that match a pre-defined list of acceptable horizontal 

188 coordinate names. 

189 

190 coord_name: str 

191 A cube dimension name 

192 

193 Returns 

194 ------- 

195 coord_index 

196 An integer specifying where in the cube dimension list a specified coordinate name is found. 

197 

198 Raises 

199 ------ 

200 ValueError 

201 If a specified dimension coordinate cannot be found. 

202 """ 

203 # Get a list of dimension coordinate names for the cube 

204 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

205 

206 # Check if requested dimension is found in cube and get index 

207 if coord_name in coord_names: 

208 coord_index = cube.coord_dims(coord_name)[0] 

209 else: 

210 raise ValueError("Could not find requested dimension %s", coord_name) 

211 

212 return coord_index 

213 

214 

215def is_spatialdim(cube: iris.cube.Cube) -> bool: 

216 """Determine whether a cube is has two spatial dimension coordinates. 

217 

218 If cube has both spatial dims, it will contain two unique coordinates 

219 that explain space (latitude and longitude). The coordinates have to 

220 be iterable/contain usable dimension data, as cubes may contain these 

221 coordinates as scalar dimensions after being collapsed. 

222 

223 Arguments 

224 --------- 

225 cube: iris.cube.Cube 

226 An iris cube which will be checked to see if it contains coordinate 

227 names that match a pre-defined list of acceptable coordinate names. 

228 

229 Returns 

230 ------- 

231 bool 

232 If true, then the cube has a spatial projection and thus can be plotted 

233 as a map. 

234 """ 

235 # Acceptable horizontal coordinate names. 

236 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

237 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

238 

239 # Get a list of coordinate names for the cube 

240 coord_names = [coord.name() for coord in cube.dim_coords] 

241 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

242 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

243 

244 # If there is one coordinate for both x and y direction return True. 

245 if len(x_coords) == 1 and len(y_coords) == 1: 

246 return True 

247 else: 

248 return False 

249 

250 

251def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool: 

252 """Determine whether a cube has specified dimension coordinates. 

253 

254 Arguments 

255 --------- 

256 cube: iris.cube.Cube 

257 An iris cube which will be checked to see if it contains coordinate 

258 names that match a pre-defined list of acceptable coordinate names. 

259 

260 coord_name: str 

261 A cube dimension name 

262 

263 Returns 

264 ------- 

265 bool 

266 If true, then the cube has a spatial projection and thus can be plotted 

267 as a map. 

268 """ 

269 # Get a list of dimension coordinate names for the cube 

270 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

271 

272 # Check if requested dimension is found in cube and get index 

273 if coord_name in coord_names: 

274 return True 

275 else: 

276 return False 

277 

278 

279def is_transect(cube: iris.cube.Cube) -> bool: 

280 """Determine whether a cube is a transect. 

281 

282 If cube is a transect, it will contain only one spatial (map) coordinate, 

283 and one vertical coordinate (either pressure or model level). 

284 

285 Arguments 

286 --------- 

287 cube: iris.cube.Cube 

288 An iris cube which will be checked to see if it contains coordinate 

289 names that match a pre-defined list of acceptable coordinate names. 

290 

291 Returns 

292 ------- 

293 bool 

294 If true, then the cube is a transect that contains one spatial (map) 

295 coordinate and one vertical coordinate. 

296 """ 

297 # Acceptable spatial (map) coordinate names. 

298 SPATIAL_MAP_COORD_NAMES = [ 

299 "longitude", 

300 "grid_longitude", 

301 "projection_x_coordinate", 

302 "x", 

303 "latitude", 

304 "grid_latitude", 

305 "projection_y_coordinate", 

306 "y", 

307 "distance", 

308 ] 

309 

310 # Acceptable vertical coordinate names 

311 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"] 

312 

313 # Get a list of coordinate names for the cube 

314 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

315 

316 # Check which spatial coordinates we have. 

317 spatial_coords = [ 

318 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES 

319 ] 

320 if len(spatial_coords) != 1: 

321 return False 

322 

323 # Check which vertical coordinates we have. 

324 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES] 

325 if len(vertical_coords) != 1: 

326 return False 

327 

328 # Passed criteria so return True 

329 return True 

330 

331 

332def check_stamp_coordinate(cube: iris.cube.Cube) -> str: 

333 """ 

334 Return stamp dimension coordinate name from a given cube, if exists. 

335 

336 If cube contains a valid stamp coordinate as a dimension coordinate, 

337 function will return name of the stamp coordinate. 

338 

339 Arguments 

340 --------- 

341 cube: iris.cube.Cube 

342 An iris cube which will be checked to see if it contains coordinate 

343 names that match a pre-defined list of acceptable coordinate names. 

344 

345 Returns 

346 ------- 

347 str 

348 If available, then return name of stamp coordinate. 

349 Defaults to "realization" if alternative stamp coordinate not found. 

350 """ 

351 # Acceptable stamp coordinate names 

352 STAMP_COORD_NAMES = ["realization", "member", "pseudo_level"] 

353 

354 # Check which dimension coordinates we have. 

355 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

356 

357 # Check if any acceptable stamp coordinates are cube dimensions. 

358 stamp_coords = [coord for coord in dim_coord_names if coord in STAMP_COORD_NAMES] 

359 if len(stamp_coords) == 1: 

360 stamp_coordinate = stamp_coords[0] 

361 else: 

362 stamp_coordinate = "realization" 

363 

364 return stamp_coordinate 

365 

366 

367def fully_equalise_attributes(cubes: iris.cube.CubeList): 

368 """Remove any unique attributes between cubes or coordinates in place.""" 

369 # Equalise cube attributes. 

370 removed = iris.util.equalise_attributes(cubes) 

371 logging.debug("Removed attributes from cube: %s", removed) 

372 

373 # Equalise coordinate attributes. 

374 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes] 

375 

376 all_coords = set.union(*coord_sets) 

377 coords_to_equalise = set.intersection(*coord_sets) 

378 coords_to_remove = set.difference(all_coords, coords_to_equalise) 

379 

380 logging.debug("All coordinates: %s", all_coords) 

381 logging.debug("Coordinates to remove: %s", coords_to_remove) 

382 logging.debug("Coordinates to equalise: %s", coords_to_equalise) 

383 

384 for coord in coords_to_remove: 

385 for cube in cubes: 

386 try: 

387 cube.remove_coord(coord) 

388 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name()) 

389 except iris.exceptions.CoordinateNotFoundError: 

390 pass 

391 

392 for coord in coords_to_equalise: 

393 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes]) 

394 logging.debug("Removed attributes from coordinate %s: %s", coord, removed) 

395 

396 return cubes 

397 

398 

399def slice_over_maybe(cube: iris.cube.Cube, coord_name, index): 

400 """Test slicing over cube if exists. 

401 

402 Return None if not existing. 

403 

404 Arguments 

405 --------- 

406 cube: iris.cube.Cube 

407 An iris cube which will be checked to see if it can be sliced over 

408 given coordinate. 

409 coord_name: coord 

410 An iris coordinate over which to slice cube. 

411 index: 

412 Coordinate index value to extract 

413 

414 Returns 

415 ------- 

416 cube_slice: iris.cube.Cube 

417 A slice of iris cube, if available to slice. 

418 """ 

419 if cube is None: 

420 return None 

421 

422 # Check if coord exists as dimension coordinate 

423 if not is_coorddim(cube, coord_name): 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 return cube 

425 

426 # Use iris to find which axis the dimension coordinate corresponds to 

427 dim = cube.coord_dims(coord_name)[0] 

428 

429 # Create list of slices for each dimension 

430 slices = [slice(None)] * cube.ndim 

431 

432 # Only replace the slice for the dim to be extracted 

433 slices[dim] = index 

434 

435 return cube[tuple(slices)] 

436 

437 

438def is_time_aggregatable(cube: iris.cube.Cube) -> bool: 

439 """Determine whether a cube can be aggregated in time. 

440 

441 If a cube is aggregatable it will contain both a 'forecast_reference_time' 

442 and 'forecast_period' coordinate as dimension or scalar coordinates. 

443 

444 Arguments 

445 --------- 

446 cube: iris.cube.Cube 

447 An iris cube which will be checked to see if it is aggregatable based 

448 on a set of pre-defined dimensional time coordinates: 

449 'forecast_period' and 'forecast_reference_time'. 

450 

451 Returns 

452 ------- 

453 bool 

454 If true, then the cube is aggregatable and contains dimensional 

455 coordinates including both 'forecast_reference_time' and 

456 'forecast_period'. 

457 """ 

458 # Acceptable time coordinate names for aggregatable cube. 

459 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

460 

461 def strictly_monotonic(coord: iris.coords.Coord) -> bool: 

462 """Return whether a coord is strictly monotonic, catching errors.""" 

463 try: 

464 return coord.is_monotonic() 

465 except iris.exceptions.CoordinateMultiDimError: 

466 return False 

467 

468 # Strictly monotonic coordinate names for the cube. 

469 coord_names = [coord.name() for coord in cube.coords() if strictly_monotonic(coord)] 

470 

471 # Check which temporal coordinates we have. 

472 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES] 

473 # Return whether both coordinates are in the temporal coordinates. 

474 return len(temporal_coords) == 2