Coverage for src / CSET / operators / read.py: 90%

407 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 07:38 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43from CSET.operators.regrid import restructure_ugrid 

44 

45 

46class NoDataError(FileNotFoundError): 

47 """Error that no data has been loaded.""" 

48 

49 

50def read_cube( 

51 file_paths: list[str] | str, 

52 constraint: iris.Constraint = None, 

53 model_names: list[str] | str | None = None, 

54 subarea_type: str = None, 

55 subarea_extent: list[float] = None, 

56 **kwargs, 

57) -> iris.cube.Cube: 

58 """Read a single cube from files. 

59 

60 Read operator that takes a path string (can include shell-style glob 

61 patterns), and loads the cube matching the constraint. If any paths point to 

62 directory, all the files contained within are loaded. 

63 

64 Ensemble data can also be loaded. If it has a realization coordinate 

65 already, it will be directly used. If not, it will have its member number 

66 guessed from the filename, based on one of several common patterns. For 

67 example the pattern *emXX*, where XX is the realization. 

68 

69 Deterministic data will be loaded with a realization of 0, allowing it to be 

70 processed in the same way as ensemble data. 

71 

72 Arguments 

73 --------- 

74 file_paths: str | list[str] 

75 Path or paths to where .pp/.nc files are located 

76 constraint: iris.Constraint | iris.ConstraintCombination, optional 

77 Constraints to filter data by. Defaults to unconstrained. 

78 model_names: str | list[str], optional 

79 Names of the models that correspond to respective paths in file_paths. 

80 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

81 Whether to constrain data by model relative coordinates or real world 

82 coordinates. 

83 subarea_extent: list, optional 

84 List of coordinates to constraint data by, in order lower latitude, 

85 upper latitude, lower longitude, upper longitude. 

86 

87 Returns 

88 ------- 

89 cubes: iris.cube.Cube 

90 Cube loaded 

91 

92 Raises 

93 ------ 

94 FileNotFoundError 

95 If the provided path does not exist 

96 ValueError 

97 If the constraint doesn't produce a single cube. 

98 """ 

99 cubes = read_cubes( 

100 file_paths=file_paths, 

101 constraint=constraint, 

102 model_names=model_names, 

103 subarea_type=subarea_type, 

104 subarea_extent=subarea_extent, 

105 ) 

106 # Check filtered cubes is a CubeList containing one cube. 

107 if len(cubes) == 1: 

108 return cubes[0] 

109 else: 

110 raise ValueError( 

111 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

112 ) 

113 

114 

115def read_cubes( 

116 file_paths: list[str] | str, 

117 constraint: iris.Constraint | None = None, 

118 model_names: str | list[str] | None = None, 

119 subarea_type: str = None, 

120 subarea_extent: list = None, 

121 **kwargs, 

122) -> iris.cube.CubeList: 

123 """Read cubes from files. 

124 

125 Read operator that takes a path string (can include shell-style glob 

126 patterns), and loads the cubes matching the constraint. If any paths point 

127 to directory, all the files contained within are loaded. 

128 

129 Ensemble data can also be loaded. If it has a realization coordinate 

130 already, it will be directly used. If not, it will have its member number 

131 guessed from the filename, based on one of several common patterns. For 

132 example the pattern *emXX*, where XX is the realization. 

133 

134 Deterministic data will be loaded with a realization of 0, allowing it to be 

135 processed in the same way as ensemble data. 

136 

137 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

138 that the cubes merge across files. 

139 

140 Arguments 

141 --------- 

142 file_paths: str | list[str] 

143 Path or paths to where .pp/.nc files are located. Can include globs. 

144 constraint: iris.Constraint | iris.ConstraintCombination, optional 

145 Constraints to filter data by. Defaults to unconstrained. 

146 model_names: str | list[str], optional 

147 Names of the models that correspond to respective paths in file_paths. 

148 subarea_type: str, optional 

149 Whether to constrain data by model relative coordinates or real world 

150 coordinates. 

151 subarea_extent: list[float], optional 

152 List of coordinates to constraint data by, in order lower latitude, 

153 upper latitude, lower longitude, upper longitude. 

154 

155 Returns 

156 ------- 

157 cubes: iris.cube.CubeList 

158 Cubes loaded after being merged and concatenated. 

159 

160 Raises 

161 ------ 

162 FileNotFoundError 

163 If the provided path does not exist 

164 """ 

165 # Get iterable of paths. Each path corresponds to 1 model. 

166 paths = iter_maybe(file_paths) 

167 model_names = iter_maybe(model_names) 

168 

169 # Check we have appropriate number of model names. 

170 if model_names != (None,) and len(model_names) != len(paths): 

171 raise ValueError( 

172 f"The number of model names ({len(model_names)}) should equal " 

173 f"the number of paths given ({len(paths)})." 

174 ) 

175 

176 # Load the data for each model into a CubeList per model. 

177 model_cubes = ( 

178 _load_model(path, name, constraint) 

179 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

180 ) 

181 

182 # Split out first model's cubes and mark it as the base for comparisons. 

183 cubes = next(model_cubes) 

184 for cube in cubes: 

185 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

186 cube.attributes["cset_comparison_base"] = 1 

187 

188 # Load the rest of the models. 

189 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

190 

191 # Unify time units so different case studies can merge. 

192 iris.util.unify_time_units(cubes) 

193 

194 # Select sub region. 

195 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

196 

197 # Merge and concatenate cubes now metadata has been fixed. 

198 cubes = _merge_cubes_check_ensemble(cubes) 

199 cubes = cubes.concatenate() 

200 

201 # Squeeze single valued coordinates into scalar coordinates. 

202 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

203 

204 # Ensure dimension coordinates are bounded. 

205 for cube in cubes: 

206 for dim_coord in cube.coords(dim_coords=True): 

207 # Iris can't guess the bounds of a scalar coordinate. 

208 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

209 dim_coord.guess_bounds() 

210 

211 logging.info("Loaded cubes: %s", cubes) 

212 if len(cubes) == 0: 

213 raise NoDataError("No cubes loaded, check your constraints!") 

214 return cubes 

215 

216 

217def _load_model( 

218 paths: str | list[str], 

219 model_name: str | None, 

220 constraint: iris.Constraint | None, 

221) -> iris.cube.CubeList: 

222 """Load a single model's data into a CubeList.""" 

223 input_files = _check_input_files(paths) 

224 # If unset, a constraint of None lets everything be loaded. 

225 logging.debug("Constraint: %s", constraint) 

226 

227 cubes = iris.load(input_files) 

228 

229 # If a cube called latitude exists, chances are its unstructured. 

230 # Using extract, not extract_cubes in a try as there might be more 

231 # than one cube called latitude if we are aggregating. 

232 if len(cubes.extract("latitude")) > 0: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 cubes = restructure_ugrid(cubes) 

234 

235 for cube in cubes: 

236 _loading_callback(cube, None, None) 

237 

238 cubes = cubes.extract(constraint) 

239 

240 # cubes = iris.load(input_files, constraint, callback=_loading_callback) 

241 # Make the UM's winds consistent with LFRic. 

242 _fix_um_winds(cubes) 

243 

244 # Add model_name attribute to each cube to make it available at any further 

245 # step without needing to pass it as function parameter. 

246 if model_name is not None: 

247 for cube in cubes: 

248 cube.attributes["model_name"] = model_name 

249 return cubes 

250 

251 

252def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

253 """Get an iterable of files to load, and check that they all exist. 

254 

255 Arguments 

256 --------- 

257 input_paths: list[str] 

258 List of paths to input files or directories. The path may itself contain 

259 glob patterns, but unlike in shells it will match directly first. 

260 

261 Returns 

262 ------- 

263 list[Path] 

264 A list of files to load. 

265 

266 Raises 

267 ------ 

268 FileNotFoundError: 

269 If the provided arguments don't resolve to at least one existing file. 

270 """ 

271 files = [] 

272 for raw_filename in iter_maybe(input_paths): 

273 # Match glob-like files first, if they exist. 

274 raw_path = Path(raw_filename) 

275 if raw_path.is_file(): 

276 files.append(raw_path) 

277 else: 

278 for input_path in glob.glob(raw_filename): 

279 # Convert string paths into Path objects. 

280 input_path = Path(input_path) 

281 # Get the list of files in the directory, or use it directly. 

282 if input_path.is_dir(): 

283 logging.debug("Checking directory '%s' for files", input_path) 

284 files.extend(p for p in input_path.iterdir() if p.is_file()) 

285 else: 

286 files.append(input_path) 

287 

288 files.sort() 

289 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

290 if len(files) == 0: 

291 raise FileNotFoundError(f"No files found for {input_paths}") 

292 return files 

293 

294 

295def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

296 """Merge CubeList, renumbering realizations of 0 if required. 

297 

298 An unsuccessful merge indicates common input cube attributes, most 

299 commonly from ensemble members missing an explicit realization 

300 coordinate. Therefore the members are renumbered before being merged 

301 again. 

302 """ 

303 try: 

304 cubes = cubes.merge() 

305 except iris.exceptions.MergeError: 

306 _log_once( 

307 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

308 level=logging.WARNING, 

309 ) 

310 for ir, cube in enumerate(cubes): 

311 if cube.coord("realization").points == 0: 311 ↛ 310line 311 didn't jump to line 310 because the condition on line 311 was always true

312 cube.coord("realization").points = ir + 1 

313 cubes = cubes.merge() 

314 return cubes 

315 

316 

317def _cutout_cubes( 

318 cubes: iris.cube.CubeList, 

319 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

320 subarea_extent: list[float], 

321): 

322 """Cut out a subarea from a CubeList.""" 

323 if subarea_type is None: 

324 logging.debug("Subarea selection is disabled.") 

325 return cubes 

326 

327 # If selected, cutout according to number of grid cells to trim from each edge. 

328 cutout_cubes = iris.cube.CubeList() 

329 # Find spatial coordinates 

330 for cube in cubes: 

331 # Find dimension coordinates. 

332 lat_name, lon_name = get_cube_yxcoordname(cube) 

333 

334 # Compute cutout based on number of cells to trim from edges. 

335 if subarea_type == "gridcells": 

336 logging.debug( 

337 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

338 subarea_extent[0], 

339 subarea_extent[1], 

340 subarea_extent[2], 

341 subarea_extent[3], 

342 ) 

343 lat_points = np.sort(cube.coord(lat_name).points) 

344 lon_points = np.sort(cube.coord(lon_name).points) 

345 # Define cutout region using user provided cell points. 

346 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

347 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

348 

349 # Compute cutout based on specified coordinate values. 

350 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

351 # If not gridcells, cutout by requested geographic area, 

352 logging.debug( 

353 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

354 subarea_extent[0], 

355 subarea_extent[1], 

356 subarea_extent[2], 

357 subarea_extent[3], 

358 ) 

359 # Define cutout region using user provided coordinates. 

360 lats = np.array(subarea_extent[0:2]) 

361 lons = np.array(subarea_extent[2:4]) 

362 # Ensure cutout longitudes are within +/- 180.0 bounds. 

363 while lons[0] < -180.0: 

364 lons += 360.0 

365 while lons[1] > 180.0: 

366 lons -= 360.0 

367 # If the coordinate system is rotated we convert coordinates into 

368 # model-relative coordinates to extract the appropriate cutout. 

369 coord_system = cube.coord(lat_name).coord_system 

370 if subarea_type == "realworld" and isinstance( 

371 coord_system, iris.coord_systems.RotatedGeogCS 

372 ): 

373 lons, lats = rotate_pole( 

374 lons, 

375 lats, 

376 pole_lon=coord_system.grid_north_pole_longitude, 

377 pole_lat=coord_system.grid_north_pole_latitude, 

378 ) 

379 else: 

380 raise ValueError("Unknown subarea_type:", subarea_type) 

381 

382 # Do cutout and add to cutout_cubes. 

383 intersection_args = {lat_name: lats, lon_name: lons} 

384 logging.debug("Cutting out coords: %s", intersection_args) 

385 try: 

386 cutout_cubes.append(cube.intersection(**intersection_args)) 

387 except IndexError as err: 

388 raise ValueError( 

389 "Region cutout error. Check and update SUBAREA_EXTENT." 

390 "Cutout region requested should be contained within data area. " 

391 "Also check if cutout region requested is smaller than input grid spacing." 

392 ) from err 

393 

394 return cutout_cubes 

395 

396 

397def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

398 """Compose together the needed callbacks into a single function.""" 

399 # Most callbacks operate in-place, but save the cube when returned! 

400 _realization_callback(cube) 

401 _um_normalise_callback(cube) 

402 _lfric_normalise_callback(cube) 

403 cube = _lfric_time_coord_fix_callback(cube) 

404 _normalise_var0_varname(cube) 

405 cube = _fix_no_spatial_coords_callback(cube) 

406 _fix_spatial_coords_callback(cube) 

407 _fix_pressure_coord_callback(cube) 

408 _fix_um_radtime(cube) 

409 _fix_cell_methods(cube) 

410 cube = _convert_cube_units_callback(cube) 

411 cube = _grid_longitude_fix_callback(cube) 

412 _fix_lfric_cloud_base_altitude(cube) 

413 _proleptic_gregorian_fix(cube) 

414 _lfric_time_callback(cube) 

415 _lfric_forecast_period_callback(cube) 

416 cube = _fix_no_time_coords_callback(cube) 

417 _normalise_ML_varname(cube) 

418 return cube 

419 

420 

421def _realization_callback(cube): 

422 """Add a realization coordinate initialised to 0 if missing. 

423 

424 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

425 of the code. 

426 """ 

427 # Only add if realization coordinate does not exist. 

428 if not cube.coords("realization"): 

429 cube.add_aux_coord( 

430 iris.coords.DimCoord(0, standard_name="realization", units="1") 

431 ) 

432 

433 

434@functools.lru_cache(None) 

435def _log_once(msg, level=logging.WARNING): 

436 """Print a warning message, skipping recent duplicates.""" 

437 logging.log(level, msg) 

438 

439 

440def _um_normalise_callback(cube: iris.cube.Cube): 

441 """Normalise UM STASH variable long names to LFRic variable names. 

442 

443 Note standard names will remain associated with cubes where different. 

444 Long name will be used consistently in output filename and titles. 

445 """ 

446 # Convert STASH to LFRic variable name 

447 if "STASH" in cube.attributes: 

448 stash = cube.attributes["STASH"] 

449 try: 

450 (name, grid) = STASH_TO_LFRIC[str(stash)] 

451 cube.long_name = name 

452 except KeyError: 

453 # Don't change cubes with unknown stash codes. 

454 _log_once( 

455 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

456 level=logging.WARNING, 

457 ) 

458 

459 

460def _lfric_normalise_callback(cube: iris.cube.Cube): 

461 """Normalise attributes that prevents LFRic cube from merging. 

462 

463 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

464 no relation to the data contained. These attributes are removed. 

465 

466 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

467 multiple UM fields, however it can be encoded in any order. This attribute 

468 is sorted to prevent this. This attribute is only present in LFRic data that 

469 has been converted to look like UM data. 

470 """ 

471 # Remove unwanted attributes. 

472 cube.attributes.pop("timeStamp", None) 

473 cube.attributes.pop("uuid", None) 

474 cube.attributes.pop("name", None) 

475 cube.attributes.pop("source", None) 

476 cube.attributes.pop("analysis_source", None) 

477 cube.attributes.pop("history", None) 

478 

479 # Sort STASH code list. 

480 stash_list = cube.attributes.get("um_stash_source") 

481 if stash_list: 

482 # Parse the string as a list, sort, then re-encode as a string. 

483 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

484 

485 

486def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

487 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

488 

489 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

490 issue, though the coordinate satisfies all the properties for a DimCoord. 

491 Scalar time values are left as AuxCoords. 

492 """ 

493 # This issue seems to come from iris's handling of NetCDF files where time 

494 # always ends up as an AuxCoord. 

495 if cube.coords("time"): 

496 time_coord = cube.coord("time") 

497 if ( 

498 not isinstance(time_coord, iris.coords.DimCoord) 

499 and len(cube.coord_dims(time_coord)) == 1 

500 ): 

501 # Fudge the bounds to foil checking for strict monotonicity. 

502 if time_coord.has_bounds(): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

504 time_coord.bounds = [ 

505 [ 

506 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

507 time_coord.bounds[i][1], 

508 ] 

509 for i in range(len(time_coord.bounds)) 

510 ] 

511 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

512 return cube 

513 

514 

515def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

516 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

517 

518 This is necessary if comparing two models with different conventions -- 

519 for example, models where the prime meridian is defined as 0 deg or 

520 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

521 so that it falls in this range. Checks are for near-180 bounds given 

522 model data bounds may not extend exactly to 0. or 360. 

523 Input cubes on non-rotated grid coordinates are not impacted. 

524 """ 

525 try: 

526 y, x = get_cube_yxcoordname(cube) 

527 except ValueError: 

528 # Don't modify non-spatial cubes. 

529 return cube 

530 

531 long_coord = cube.coord(x) 

532 # Wrap longitudes if rotated pole coordinates 

533 coord_system = long_coord.coord_system 

534 if x == "grid_longitude" and isinstance( 

535 coord_system, iris.coord_systems.RotatedGeogCS 

536 ): 

537 long_points = long_coord.points.copy() 

538 long_centre = np.median(long_points) 

539 while long_centre < -175.0: 

540 long_centre += 360.0 

541 long_points += 360.0 

542 while long_centre >= 175.0: 

543 long_centre -= 360.0 

544 long_points -= 360.0 

545 long_coord.points = long_points 

546 

547 # Update coord bounds to be consistent with wrapping. 

548 if long_coord.has_bounds(): 

549 long_coord.bounds = None 

550 long_coord.guess_bounds() 

551 

552 return cube 

553 

554 

555def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

556 import CSET.operators._utils as utils 

557 

558 # Don't modify spatial cubes that already have spatial dimensions 

559 if utils.is_spatialdim(cube): 

560 return cube 

561 

562 else: 

563 # attempt to get lat/long from cube attributes 

564 try: 

565 lat_min = cube.attributes.get("geospatial_lat_min") 

566 lat_max = cube.attributes.get("geospatial_lat_max") 

567 lon_min = cube.attributes.get("geospatial_lon_min") 

568 lon_max = cube.attributes.get("geospatial_lon_max") 

569 

570 lon_val = (lon_min + lon_max) / 2.0 

571 lat_val = (lat_min + lat_max) / 2.0 

572 

573 lat_coord = iris.coords.DimCoord( 

574 lat_val, 

575 standard_name="latitude", 

576 units="degrees_north", 

577 var_name="latitude", 

578 coord_system=iris.coord_systems.GeogCS(6371229.0), 

579 circular=True, 

580 ) 

581 

582 lon_coord = iris.coords.DimCoord( 

583 lon_val, 

584 standard_name="longitude", 

585 units="degrees_east", 

586 var_name="longitude", 

587 coord_system=iris.coord_systems.GeogCS(6371229.0), 

588 circular=True, 

589 ) 

590 

591 cube.add_aux_coord(lat_coord) 

592 cube.add_aux_coord(lon_coord) 

593 return cube 

594 

595 # if lat/long are not in attributes, then return cube unchanged: 

596 except TypeError: 

597 return cube 

598 

599 

600def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

601 """Check latitude and longitude coordinates name. 

602 

603 This is necessary as some models define their grid as on rotated 

604 'grid_latitude' and 'grid_longitude' coordinates while others define 

605 the grid on non-rotated 'latitude' and 'longitude'. 

606 Cube dimensions need to be made consistent to avoid recipe failures, 

607 particularly where comparing multiple input models with differing spatial 

608 coordinates. 

609 """ 

610 # Check if cube is spatial. 

611 if not is_spatialdim(cube): 

612 # Don't modify non-spatial cubes. 

613 return 

614 

615 # Get spatial coords and dimension index. 

616 y_name, x_name = get_cube_yxcoordname(cube) 

617 ny = get_cube_coordindex(cube, y_name) 

618 nx = get_cube_coordindex(cube, x_name) 

619 

620 # Remove spatial coords bounds if erroneous values detected. 

621 # Aims to catch some errors in input coord bounds by setting 

622 # invalid threshold of 10000.0 

623 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

624 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

625 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

626 if bx_max > 10000.0 or by_max > 10000.0: 

627 cube.coord(x_name).bounds = None 

628 cube.coord(y_name).bounds = None 

629 

630 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

631 # [latitude, longitude] for instances where rotated_pole=90.0 

632 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

633 coord_system = cube.coord("grid_latitude").coord_system 

634 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

635 if pole_lat == 90.0: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 lats = cube.coord("grid_latitude").points 

637 lons = cube.coord("grid_longitude").points 

638 

639 cube.remove_coord("grid_latitude") 

640 cube.add_dim_coord( 

641 iris.coords.DimCoord( 

642 lats, 

643 standard_name="latitude", 

644 var_name="latitude", 

645 units="degrees", 

646 coord_system=iris.coord_systems.GeogCS(6371229.0), 

647 circular=True, 

648 ), 

649 ny, 

650 ) 

651 y_name = "latitude" 

652 cube.remove_coord("grid_longitude") 

653 cube.add_dim_coord( 

654 iris.coords.DimCoord( 

655 lons, 

656 standard_name="longitude", 

657 var_name="longitude", 

658 units="degrees", 

659 coord_system=iris.coord_systems.GeogCS(6371229.0), 

660 circular=True, 

661 ), 

662 nx, 

663 ) 

664 x_name = "longitude" 

665 

666 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

667 # rotated pole attributes for cases with [lat, lon] inputs 

668 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

669 "degrees", 

670 "degrees_north", 

671 "degrees_south", 

672 ]: 

673 # Add grid_latitude AuxCoord 

674 if "grid_latitude" not in [ 674 ↛ 687line 674 didn't jump to line 687 because the condition on line 674 was always true

675 coord.name() for coord in cube.coords(dim_coords=False) 

676 ]: 

677 cube.add_aux_coord( 

678 iris.coords.AuxCoord( 

679 cube.coord(y_name).points, 

680 var_name="grid_latitude", 

681 units="degrees", 

682 ), 

683 ny, 

684 ) 

685 # Ensure input latitude DimCoord has CoordSystem 

686 # This attribute is sometimes lost on iris.save 

687 if not cube.coord(y_name).coord_system: 

688 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

689 

690 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

691 "degrees", 

692 "degrees_west", 

693 "degrees_east", 

694 ]: 

695 # Add grid_longitude AuxCoord 

696 if "grid_longitude" not in [ 696 ↛ 710line 696 didn't jump to line 710 because the condition on line 696 was always true

697 coord.name() for coord in cube.coords(dim_coords=False) 

698 ]: 

699 cube.add_aux_coord( 

700 iris.coords.AuxCoord( 

701 cube.coord(x_name).points, 

702 var_name="grid_longitude", 

703 units="degrees", 

704 ), 

705 nx, 

706 ) 

707 

708 # Ensure input longitude DimCoord has CoordSystem 

709 # This attribute is sometimes lost on iris.save 

710 if not cube.coord(x_name).coord_system: 

711 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

712 

713 

714def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

715 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

716 

717 This problem was raised because the AIFS model data from ECMWF 

718 defines the pressure coordinate with the name "pressure_level" rather 

719 than compliant CF coordinate names. 

720 

721 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

722 and approach the coordinates in a unified way. 

723 """ 

724 for coord in cube.dim_coords: 

725 if coord.name() in ["pressure_level", "pressure_levels"]: 

726 coord.rename("pressure") 

727 

728 if coord.name() == "pressure": 

729 if str(cube.coord("pressure").units) != "hPa": 

730 cube.coord("pressure").convert_units("hPa") 

731 

732 

733def _fix_um_radtime(cube: iris.cube.Cube): 

734 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

735 

736 This callback does not have any effect for output diagnostics with 

737 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

738 diagnostics are checked. 

739 Note this callback does not interpolate the data in time, only adjust 

740 timestamps to sit on the hour to enable time-to-time difference plotting 

741 with models which may output radiation data on the hour. 

742 """ 

743 try: 

744 if cube.attributes["STASH"] in [ 

745 "m01s01i207", 

746 "m01s01i208", 

747 "m01s02i205", 

748 "m01s02i201", 

749 "m01s01i207", 

750 "m01s02i207", 

751 "m01s01i235", 

752 ]: 

753 time_coord = cube.coord("time") 

754 

755 # Convert time points to datetime objects 

756 time_unit = time_coord.units 

757 time_points = time_unit.num2date(time_coord.points) 

758 # Skip if times don't need fixing. 

759 if time_points[0].minute == 0 and time_points[0].second == 0: 

760 return 

761 if time_points[0].minute == 30 and time_points[0].second == 0: 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true

762 return 

763 

764 # Subtract time difference from the hour from each time point 

765 n_minute = time_points[0].minute 

766 n_second = time_points[0].second 

767 # If times closer to next hour, compute difference to add on to following hour 

768 if n_minute > 30: 

769 n_minute = n_minute - 60 

770 # Compute new diagnostic time stamp 

771 new_time_points = ( 

772 time_points 

773 - datetime.timedelta(minutes=n_minute) 

774 - datetime.timedelta(seconds=n_second) 

775 ) 

776 

777 # Convert back to numeric values using the original time unit. 

778 new_time_values = time_unit.date2num(new_time_points) 

779 

780 # Replace the time coordinate with updated values. 

781 time_coord.points = new_time_values 

782 

783 # Recompute forecast_period with corrected values. 

784 if cube.coord("forecast_period"): 784 ↛ exitline 784 didn't return from function '_fix_um_radtime' because the condition on line 784 was always true

785 fcst_prd_points = cube.coord("forecast_period").points 

786 new_fcst_points = ( 

787 time_unit.num2date(fcst_prd_points) 

788 - datetime.timedelta(minutes=n_minute) 

789 - datetime.timedelta(seconds=n_second) 

790 ) 

791 cube.coord("forecast_period").points = time_unit.date2num( 

792 new_fcst_points 

793 ) 

794 except KeyError: 

795 pass 

796 

797 

798def _fix_cell_methods(cube: iris.cube.Cube): 

799 """To fix the assumed cell_methods in accumulation STASH from UM. 

800 

801 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

802 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

803 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

804 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

805 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

806 enabling the cell_method constraint on reading to select correct input. 

807 """ 

808 # Shift "mean" cell_method to "sum" for selected UM inputs. 

809 if cube.attributes.get("STASH") in [ 

810 "m01s21i104", 

811 "m01s04i201", 

812 "m01s04i202", 

813 "m01s05i201", 

814 "m01s05i202", 

815 ]: 

816 # Check if input cell_method contains "mean" time-processing. 

817 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 817 ↛ exitline 817 didn't return from function '_fix_cell_methods' because the condition on line 817 was always true

818 # Retrieve interval and any comment information. 

819 for cell_method in cube.cell_methods: 

820 interval_str = cell_method.intervals 

821 comment_str = cell_method.comments 

822 

823 # Remove input aggregation method. 

824 cube.cell_methods = () 

825 

826 # Replace "mean" with "sum" cell_method to indicate aggregation. 

827 cube.add_cell_method( 

828 iris.coords.CellMethod( 

829 method="sum", 

830 coords="time", 

831 intervals=interval_str, 

832 comments=comment_str, 

833 ) 

834 ) 

835 

836 

837def _convert_cube_units_callback(cube: iris.cube.Cube): 

838 """Adjust diagnostic units for specific variables. 

839 

840 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

841 converted here to mm hr-1. 

842 

843 Visibility diagnostics are converted here from m to km to improve output 

844 formatting. 

845 """ 

846 # Convert precipitation diagnostic units if required. 

847 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

848 if any("surface_microphysical" in name for name in varnames): 

849 if cube.units == "kg m-2 s-1": 

850 _log_once( 

851 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

852 level=logging.DEBUG, 

853 ) 

854 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

855 # This is a 1:1 conversion, so we just change the units. 

856 cube.units = "mm s-1" 

857 # Convert the units to per hour. 

858 cube.convert_units("mm hr-1") 

859 elif cube.units == "kg m-2": 859 ↛ 869line 859 didn't jump to line 869 because the condition on line 859 was always true

860 _log_once( 

861 "Converting precipitation amount units from kg m-2 to mm", 

862 level=logging.DEBUG, 

863 ) 

864 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

865 # This is a 1:1 conversion, so we just change the units. 

866 cube.units = "mm" 

867 

868 # Convert visibility diagnostic units if required. 

869 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

870 if any("visibility" in name for name in varnames): 

871 if cube.units == "m": 871 ↛ 876line 871 didn't jump to line 876 because the condition on line 871 was always true

872 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

873 # Convert the units to km. 

874 cube.convert_units("km") 

875 

876 return cube 

877 

878 

879def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

880 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

881 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

882 if any("cloud_base_altitude" in name for name in varnames): 

883 # Mask cube where set > 144kft to catch default 144.35695538058164 

884 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

885 

886 

887def _fix_um_winds(cubes: iris.cube.CubeList): 

888 """To make winds from the UM consistent with those from LFRic. 

889 

890 Diagnostics of wind are not always consistent between the UM 

891 and LFric. Here, winds from the UM are adjusted to make them i 

892 consistent with LFRic. 

893 """ 

894 # Check whether we have components of the wind identified by STASH, 

895 # (so this will apply only to cubes from the UM), but not the 

896 # wind speed and calculate it if it is missing. Note that 

897 # this will be biased low in general because the components will mostly 

898 # be time averages. For simplicity, we do this only if there is just one 

899 # cube of a component. A more complicated approach would be to consider 

900 # the cell methods, but it may not be warranted. 

901 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

902 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

903 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

904 try: 

905 if cubes.extract(u_constr) and cubes.extract(v_constr): 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true

906 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

907 _add_wind_speed_um(cubes) 

908 # Convert winds in the UM to be relative to true east and true north. 

909 _convert_wind_true_dirn_um(cubes) 

910 except (KeyError, AttributeError): 

911 pass 

912 

913 

914def _add_wind_speed_um(cubes: iris.cube.CubeList): 

915 """Add windspeeds to cubes from the UM.""" 

916 wspd10 = ( 

917 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

918 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

919 ) ** 0.5 

920 wspd10.attributes["STASH"] = "m01s03i227" 

921 wspd10.standard_name = "wind_speed" 

922 wspd10.long_name = "wind_speed_at_10m" 

923 cubes.append(wspd10) 

924 

925 

926def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

927 """To convert winds to true directions. 

928 

929 Convert from the components relative to the grid to true directions. 

930 This functionality only handles the simplest case. 

931 """ 

932 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

933 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

934 for u, v in zip(u_grids, v_grids, strict=True): 

935 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

936 u.data = true_u.core_data() 

937 v.data = true_v.core_data() 

938 

939 

940def _normalise_var0_varname(cube: iris.cube.Cube): 

941 """Fix varnames for consistency to allow merging. 

942 

943 Some model data netCDF sometimes have a coordinate name end in 

944 "_0" etc, where duplicate coordinates of same name are defined but 

945 with different attributes. This can be inconsistently managed in 

946 different model inputs and can cause cubes to fail to merge. 

947 """ 

948 for coord in cube.coords(): 

949 if coord.var_name and coord.var_name.endswith("_0"): 

950 coord.var_name = coord.var_name.removesuffix("_0") 

951 if coord.var_name and coord.var_name.endswith("_1"): 

952 coord.var_name = coord.var_name.removesuffix("_1") 

953 if coord.var_name and coord.var_name.endswith("_2"): 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 coord.var_name = coord.var_name.removesuffix("_2") 

955 if coord.var_name and coord.var_name.endswith("_3"): 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 coord.var_name = coord.var_name.removesuffix("_3") 

957 

958 if cube.var_name and cube.var_name.endswith("_0"): 

959 cube.var_name = cube.var_name.removesuffix("_0") 

960 

961 

962def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

963 """Convert the calendars of time units to use a standard calendar.""" 

964 try: 

965 time_coord = cube.coord("time") 

966 if time_coord.units.calendar == "proleptic_gregorian": 

967 logging.debug( 

968 "Changing proleptic Gregorian calendar to standard calendar for %s", 

969 repr(time_coord.units), 

970 ) 

971 time_coord.units = time_coord.units.change_calendar("standard") 

972 except iris.exceptions.CoordinateNotFoundError: 

973 pass 

974 

975 

976def _lfric_time_callback(cube: iris.cube.Cube): 

977 """Fix time coordinate metadata if missing dimensions. 

978 

979 Some model data does not contain forecast_reference_time or forecast_period as 

980 expected coordinates, and so we cannot aggregate over case studies without this 

981 metadata. This callback fixes these issues. 

982 

983 This callback also ensures all time coordinates are referenced as hours since 

984 1970-01-01 00:00:00 for consistency across different model inputs. 

985 

986 Notes 

987 ----- 

988 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

989 """ 

990 # Construct forecast_reference time if it doesn't exist. 

991 try: 

992 tcoord = cube.coord("time") 

993 # Set time coordinate to common basis "hours since 1970" 

994 try: 

995 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

996 except ValueError: 

997 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

998 

999 if not cube.coords("forecast_reference_time"): 

1000 try: 

1001 init_time = datetime.datetime.fromisoformat( 

1002 tcoord.attributes["time_origin"] 

1003 ) 

1004 frt_point = tcoord.units.date2num(init_time) 

1005 frt_coord = iris.coords.AuxCoord( 

1006 frt_point, 

1007 units=tcoord.units, 

1008 standard_name="forecast_reference_time", 

1009 long_name="forecast_reference_time", 

1010 ) 

1011 cube.add_aux_coord(frt_coord) 

1012 except KeyError: 

1013 logging.warning( 

1014 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1015 ) 

1016 

1017 # Remove time_origin to allow multiple case studies to merge. 

1018 tcoord.attributes.pop("time_origin", None) 

1019 

1020 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1021 if not cube.coords("forecast_period"): 

1022 try: 

1023 # Create array of forecast lead times. 

1024 init_coord = cube.coord("forecast_reference_time") 

1025 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1026 init_coord.units.num2date(init_coord.points) 

1027 ) 

1028 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1029 

1030 # Get unit for lead time from time coordinate's unit. 

1031 # Convert all lead time to hours for consistency between models. 

1032 if "seconds" in str(tcoord.units): 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true

1033 lead_times = lead_times / 3600.0 

1034 units = "hours" 

1035 elif "hours" in str(tcoord.units): 1035 ↛ 1038line 1035 didn't jump to line 1038 because the condition on line 1035 was always true

1036 units = "hours" 

1037 else: 

1038 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1039 

1040 # Create lead time coordinate. 

1041 lead_time_coord = iris.coords.AuxCoord( 

1042 lead_times, 

1043 standard_name="forecast_period", 

1044 long_name="forecast_period", 

1045 units=units, 

1046 ) 

1047 

1048 # Associate lead time coordinate with time dimension. 

1049 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1050 except iris.exceptions.CoordinateNotFoundError: 

1051 logging.warning( 

1052 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1053 ) 

1054 except iris.exceptions.CoordinateNotFoundError: 

1055 logging.warning("No time coordinate on cube.") 

1056 

1057 

1058def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1059 """Check forecast_period name and units.""" 

1060 try: 

1061 coord = cube.coord("forecast_period") 

1062 if coord.units != "hours": 

1063 cube.coord("forecast_period").convert_units("hours") 

1064 if not coord.standard_name: 

1065 coord.standard_name = "forecast_period" 

1066 except iris.exceptions.CoordinateNotFoundError: 

1067 pass 

1068 

1069 

1070def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1071 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1072 # Only add if time coordinate does not exist. 

1073 if not cube.coords("time"): 

1074 cube.add_aux_coord( 

1075 iris.coords.DimCoord( 

1076 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1077 ) 

1078 ) 

1079 

1080 return cube 

1081 

1082 

1083def _normalise_ML_varname(cube: iris.cube.Cube): 

1084 """Fix plev variable names to standard names.""" 

1085 if cube.coords("pressure"): 

1086 if cube.name() == "x_wind": 

1087 cube.long_name = "zonal_wind_at_pressure_levels" 

1088 if cube.name() == "y_wind": 

1089 cube.long_name = "meridional_wind_at_pressure_levels" 

1090 if cube.name() == "air_temperature": 

1091 cube.long_name = "temperature_at_pressure_levels" 

1092 if cube.name() == "specific_humidity": 1092 ↛ 1093line 1092 didn't jump to line 1093 because the condition on line 1092 was never true

1093 cube.long_name = ( 

1094 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1095 )