Coverage for src / CSET / operators / _utils.py: 94%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-31 08:59 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common operator functionality used across CSET. 

17 

18Functions below should only be added if it is not suitable as a standalone 

19operator, and will be used across multiple operators. 

20""" 

21 

22import logging 

23import re 

24from datetime import timedelta 

25 

26import iris 

27import iris.cube 

28import iris.exceptions 

29import iris.util 

30from iris.time import PartialDateTime 

31 

32 

33def pdt_fromisoformat( 

34 datestring, 

35) -> tuple[iris.time.PartialDateTime, timedelta | None]: 

36 """Generate PartialDateTime object. 

37 

38 Function that takes an ISO 8601 date string and returns a PartialDateTime object. 

39 

40 Arguments 

41 --------- 

42 datestring: str 

43 ISO 8601 date. 

44 

45 Returns 

46 ------- 

47 time_object: iris.time.PartialDateTime 

48 """ 

49 

50 def make_offset(sign, value) -> timedelta: 

51 if len(value) not in [2, 4, 5]: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}') 

53 

54 hours = int(value[:2]) 

55 minutes = 0 

56 if len(value) in [4, 5]: 

57 minutes = int(value[-2:]) 

58 return timedelta(hours=sign * hours, minutes=sign * minutes) 

59 

60 # Remove the microseconds coord due to no support in PartialDateTime 

61 datestring = re.sub(r"\.\d+", "", datestring) 

62 

63 datetime_split = datestring.split("T") 

64 date = datetime_split[0] 

65 if len(datetime_split) == 1: 

66 time = "" 

67 elif len(datetime_split) == 2: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true

68 time = datetime_split[1] 

69 else: 

70 raise ValueError("datesting in an unexpected format") 

71 

72 offset = None 

73 time_split = time.split("+") 

74 if len(time_split) == 2: 

75 time = time_split[0] 

76 offset = make_offset(1, time_split[1]) 

77 else: 

78 time_split = time.split("-") 

79 if len(time_split) == 2: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 time = time_split[0] 

81 offset = make_offset(-1, time_split[1]) 

82 else: 

83 offset = None 

84 

85 if re.fullmatch(r"\d{8}", date): 

86 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}" 

87 elif re.fullmatch(r"\d{6}", date): 

88 date = f"{date[0:4]}-{date[4:6]}" 

89 

90 if len(date) < 7: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM") 

92 

93 # Returning a PartialDateTime for the special case of string form "YYYY-MM" 

94 if re.fullmatch(r"\d{4}-\d{2}", date): 

95 pdt = PartialDateTime( 

96 year=int(date[0:4]), 

97 month=int(date[5:7]), 

98 day=None, 

99 hour=None, 

100 minute=None, 

101 second=None, 

102 ) 

103 return pdt, offset 

104 

105 year = int(date[0:4]) 

106 month = int(date[5:7]) 

107 day = int(date[8:10]) 

108 

109 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0) 

110 

111 # Normalise the time parts into standard format 

112 if re.fullmatch(r"\d{4}", time): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 time = f"{time[0:2]}:{time[2:4]}" 

114 if re.fullmatch(r"\d{6}", time): 

115 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}" 

116 

117 if len(time) >= 2: 

118 kwargs["hour"] = int(time[0:2]) 

119 if len(time) >= 5: 

120 kwargs["minute"] = int(time[3:5]) 

121 if len(time) >= 8: 

122 kwargs["second"] = int(time[6:8]) 

123 

124 pdt = PartialDateTime(**kwargs) 

125 

126 return pdt, offset 

127 

128 

129def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]: 

130 """ 

131 Return horizontal dimension coordinate name(s) from a given cube. 

132 

133 Arguments 

134 --------- 

135 

136 cube: iris.cube.Cube 

137 An iris cube which will be checked to see if it contains coordinate 

138 names that match a pre-defined list of acceptable horizontal 

139 dimension coordinate names. 

140 

141 Returns 

142 ------- 

143 (y_coord, x_coord) 

144 A tuple containing the horizontal coordinate name for latitude and longitude respectively 

145 found within the cube. 

146 

147 Raises 

148 ------ 

149 ValueError 

150 If a unique y/x horizontal coordinate cannot be found. 

151 """ 

152 # Acceptable horizontal coordinate names. 

153 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

154 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

155 

156 # Get a list of dimension coordinate names for the cube 

157 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

158 coord_names = [coord.name() for coord in cube.coords()] 

159 

160 # Check which x-coordinate we have, if any 

161 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

162 if len(x_coords) != 1: 

163 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES] 

164 if len(x_coords) != 1: 

165 raise ValueError("Could not identify a unique x-coordinate in cube") 

166 

167 # Check which y-coordinate we have, if any 

168 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

169 if len(y_coords) != 1: 

170 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES] 

171 if len(y_coords) != 1: 

172 raise ValueError("Could not identify a unique y-coordinate in cube") 

173 

174 return (y_coords[0], x_coords[0]) 

175 

176 

177def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int: 

178 """ 

179 Return coordinate dimension for a named coordinate from a given cube. 

180 

181 Arguments 

182 --------- 

183 

184 cube: iris.cube.Cube 

185 An iris cube which will be checked to see if it contains coordinate 

186 names that match a pre-defined list of acceptable horizontal 

187 coordinate names. 

188 

189 coord_name: str 

190 A cube dimension name 

191 

192 Returns 

193 ------- 

194 coord_index 

195 An integer specifying where in the cube dimension list a specified coordinate name is found. 

196 

197 Raises 

198 ------ 

199 ValueError 

200 If a specified dimension coordinate cannot be found. 

201 """ 

202 # Get a list of dimension coordinate names for the cube 

203 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

204 

205 # Check if requested dimension is found in cube and get index 

206 if coord_name in coord_names: 

207 coord_index = cube.coord_dims(coord_name)[0] 

208 else: 

209 raise ValueError("Could not find requested dimension %s", coord_name) 

210 

211 return coord_index 

212 

213 

214def is_spatialdim(cube: iris.cube.Cube) -> bool: 

215 """Determine whether a cube is has two spatial dimension coordinates. 

216 

217 If cube has both spatial dims, it will contain two unique coordinates 

218 that explain space (latitude and longitude). The coordinates have to 

219 be iterable/contain usable dimension data, as cubes may contain these 

220 coordinates as scalar dimensions after being collapsed. 

221 

222 Arguments 

223 --------- 

224 cube: iris.cube.Cube 

225 An iris cube which will be checked to see if it contains coordinate 

226 names that match a pre-defined list of acceptable coordinate names. 

227 

228 Returns 

229 ------- 

230 bool 

231 If true, then the cube has a spatial projection and thus can be plotted 

232 as a map. 

233 """ 

234 # Acceptable horizontal coordinate names. 

235 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

236 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

237 

238 # Get a list of coordinate names for the cube 

239 coord_names = [coord.name() for coord in cube.dim_coords] 

240 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

241 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

242 

243 # If there is one coordinate for both x and y direction return True. 

244 if len(x_coords) == 1 and len(y_coords) == 1: 

245 return True 

246 else: 

247 return False 

248 

249 

250def is_coorddim(cube: iris.cube.Cube, coord_name) -> bool: 

251 """Determine whether a cube has specified dimension coordinates. 

252 

253 Arguments 

254 --------- 

255 cube: iris.cube.Cube 

256 An iris cube which will be checked to see if it contains coordinate 

257 names that match a pre-defined list of acceptable coordinate names. 

258 

259 coord_name: str 

260 A cube dimension name 

261 

262 Returns 

263 ------- 

264 bool 

265 If true, then the cube has a spatial projection and thus can be plotted 

266 as a map. 

267 """ 

268 # Get a list of dimension coordinate names for the cube 

269 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

270 

271 # Check if requested dimension is found in cube and get index 

272 if coord_name in coord_names: 

273 return True 

274 else: 

275 return False 

276 

277 

278def is_transect(cube: iris.cube.Cube) -> bool: 

279 """Determine whether a cube is a transect. 

280 

281 If cube is a transect, it will contain only one spatial (map) coordinate, 

282 and one vertical coordinate (either pressure or model level). 

283 

284 Arguments 

285 --------- 

286 cube: iris.cube.Cube 

287 An iris cube which will be checked to see if it contains coordinate 

288 names that match a pre-defined list of acceptable coordinate names. 

289 

290 Returns 

291 ------- 

292 bool 

293 If true, then the cube is a transect that contains one spatial (map) 

294 coordinate and one vertical coordinate. 

295 """ 

296 # Acceptable spatial (map) coordinate names. 

297 SPATIAL_MAP_COORD_NAMES = [ 

298 "longitude", 

299 "grid_longitude", 

300 "projection_x_coordinate", 

301 "x", 

302 "latitude", 

303 "grid_latitude", 

304 "projection_y_coordinate", 

305 "y", 

306 "distance", 

307 ] 

308 

309 # Acceptable vertical coordinate names 

310 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"] 

311 

312 # Get a list of coordinate names for the cube 

313 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

314 

315 # Check which spatial coordinates we have. 

316 spatial_coords = [ 

317 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES 

318 ] 

319 if len(spatial_coords) != 1: 

320 return False 

321 

322 # Check which vertical coordinates we have. 

323 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES] 

324 if len(vertical_coords) != 1: 

325 return False 

326 

327 # Passed criteria so return True 

328 return True 

329 

330 

331def fully_equalise_attributes(cubes: iris.cube.CubeList): 

332 """Remove any unique attributes between cubes or coordinates in place.""" 

333 # Equalise cube attributes. 

334 removed = iris.util.equalise_attributes(cubes) 

335 logging.debug("Removed attributes from cube: %s", removed) 

336 

337 # Equalise coordinate attributes. 

338 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes] 

339 

340 all_coords = set.union(*coord_sets) 

341 coords_to_equalise = set.intersection(*coord_sets) 

342 coords_to_remove = set.difference(all_coords, coords_to_equalise) 

343 

344 logging.debug("All coordinates: %s", all_coords) 

345 logging.debug("Coordinates to remove: %s", coords_to_remove) 

346 logging.debug("Coordinates to equalise: %s", coords_to_equalise) 

347 

348 for coord in coords_to_remove: 

349 for cube in cubes: 

350 try: 

351 cube.remove_coord(coord) 

352 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name()) 

353 except iris.exceptions.CoordinateNotFoundError: 

354 pass 

355 

356 for coord in coords_to_equalise: 

357 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes]) 

358 logging.debug("Removed attributes from coordinate %s: %s", coord, removed) 

359 

360 return cubes 

361 

362 

363def slice_over_maybe(cube: iris.cube.Cube, coord_name, index): 

364 """Test slicing over cube if exists. 

365 

366 Return None if not existing. 

367 

368 Arguments 

369 --------- 

370 cube: iris.cube.Cube 

371 An iris cube which will be checked to see if it can be sliced over 

372 given coordinate. 

373 coord_name: coord 

374 An iris coordinate over which to slice cube. 

375 index: 

376 Coordinate index value to extract 

377 

378 Returns 

379 ------- 

380 cube_slice: iris.cube.Cube 

381 A slice of iris cube, if available to slice. 

382 """ 

383 if cube is None: 

384 return None 

385 

386 # Check if coord exists as dimension coordinate 

387 if not is_coorddim(cube, coord_name): 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 return cube 

389 

390 # Use iris to find which axis the dimension coordinate corresponds to 

391 dim = cube.coord_dims(coord_name)[0] 

392 

393 # Create list of slices for each dimension 

394 slices = [slice(None)] * cube.ndim 

395 

396 # Only replace the slice for the dim to be extracted 

397 slices[dim] = index 

398 

399 return cube[tuple(slices)] 

400 

401 

402def is_time_aggregatable(cube: iris.cube.Cube) -> bool: 

403 """Determine whether a cube can be aggregated in time. 

404 

405 If a cube is aggregatable it will contain both a 'forecast_reference_time' 

406 and 'forecast_period' coordinate as dimensional coordinates. 

407 

408 Arguments 

409 --------- 

410 cube: iris.cube.Cube 

411 An iris cube which will be checked to see if it is aggregatable based 

412 on a set of pre-defined dimensional time coordinates: 

413 'forecast_period' and 'forecast_reference_time'. 

414 

415 Returns 

416 ------- 

417 bool 

418 If true, then the cube is aggregatable and contains dimensional 

419 coordinates including both 'forecast_reference_time' and 

420 'forecast_period'. 

421 """ 

422 # Acceptable time coordinate names for aggregatable cube. 

423 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

424 

425 # Coordinate names for the cube. 

426 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

427 

428 # Check which temporal coordinates we have. 

429 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES] 

430 # Return whether both coordinates are in the temporal coordinates. 

431 return len(temporal_coords) == 2