Coverage for src/CSET/operators/read.py: 91%

403 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 13:58 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43 

44 

45class NoDataError(FileNotFoundError): 

46 """Error that no data has been loaded.""" 

47 

48 

49def read_cube( 

50 file_paths: list[str] | str, 

51 constraint: iris.Constraint = None, 

52 model_names: list[str] | str | None = None, 

53 subarea_type: str = None, 

54 subarea_extent: list[float] = None, 

55 **kwargs, 

56) -> iris.cube.Cube: 

57 """Read a single cube from files. 

58 

59 Read operator that takes a path string (can include shell-style glob 

60 patterns), and loads the cube matching the constraint. If any paths point to 

61 directory, all the files contained within are loaded. 

62 

63 Ensemble data can also be loaded. If it has a realization coordinate 

64 already, it will be directly used. If not, it will have its member number 

65 guessed from the filename, based on one of several common patterns. For 

66 example the pattern *emXX*, where XX is the realization. 

67 

68 Deterministic data will be loaded with a realization of 0, allowing it to be 

69 processed in the same way as ensemble data. 

70 

71 Arguments 

72 --------- 

73 file_paths: str | list[str] 

74 Path or paths to where .pp/.nc files are located 

75 constraint: iris.Constraint | iris.ConstraintCombination, optional 

76 Constraints to filter data by. Defaults to unconstrained. 

77 model_names: str | list[str], optional 

78 Names of the models that correspond to respective paths in file_paths. 

79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

80 Whether to constrain data by model relative coordinates or real world 

81 coordinates. 

82 subarea_extent: list, optional 

83 List of coordinates to constraint data by, in order lower latitude, 

84 upper latitude, lower longitude, upper longitude. 

85 

86 Returns 

87 ------- 

88 cubes: iris.cube.Cube 

89 Cube loaded 

90 

91 Raises 

92 ------ 

93 FileNotFoundError 

94 If the provided path does not exist 

95 ValueError 

96 If the constraint doesn't produce a single cube. 

97 """ 

98 cubes = read_cubes( 

99 file_paths=file_paths, 

100 constraint=constraint, 

101 model_names=model_names, 

102 subarea_type=subarea_type, 

103 subarea_extent=subarea_extent, 

104 ) 

105 # Check filtered cubes is a CubeList containing one cube. 

106 if len(cubes) == 1: 

107 return cubes[0] 

108 else: 

109 raise ValueError( 

110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

111 ) 

112 

113 

114def read_cubes( 

115 file_paths: list[str] | str, 

116 constraint: iris.Constraint | None = None, 

117 model_names: str | list[str] | None = None, 

118 subarea_type: str = None, 

119 subarea_extent: list = None, 

120 **kwargs, 

121) -> iris.cube.CubeList: 

122 """Read cubes from files. 

123 

124 Read operator that takes a path string (can include shell-style glob 

125 patterns), and loads the cubes matching the constraint. If any paths point 

126 to directory, all the files contained within are loaded. 

127 

128 Ensemble data can also be loaded. If it has a realization coordinate 

129 already, it will be directly used. If not, it will have its member number 

130 guessed from the filename, based on one of several common patterns. For 

131 example the pattern *emXX*, where XX is the realization. 

132 

133 Deterministic data will be loaded with a realization of 0, allowing it to be 

134 processed in the same way as ensemble data. 

135 

136 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

137 that the cubes merge across files. 

138 

139 Arguments 

140 --------- 

141 file_paths: str | list[str] 

142 Path or paths to where .pp/.nc files are located. Can include globs. 

143 constraint: iris.Constraint | iris.ConstraintCombination, optional 

144 Constraints to filter data by. Defaults to unconstrained. 

145 model_names: str | list[str], optional 

146 Names of the models that correspond to respective paths in file_paths. 

147 subarea_type: str, optional 

148 Whether to constrain data by model relative coordinates or real world 

149 coordinates. 

150 subarea_extent: list[float], optional 

151 List of coordinates to constraint data by, in order lower latitude, 

152 upper latitude, lower longitude, upper longitude. 

153 

154 Returns 

155 ------- 

156 cubes: iris.cube.CubeList 

157 Cubes loaded after being merged and concatenated. 

158 

159 Raises 

160 ------ 

161 FileNotFoundError 

162 If the provided path does not exist 

163 """ 

164 # Get iterable of paths. Each path corresponds to 1 model. 

165 paths = iter_maybe(file_paths) 

166 model_names = iter_maybe(model_names) 

167 

168 # Check we have appropriate number of model names. 

169 if model_names != (None,) and len(model_names) != len(paths): 

170 raise ValueError( 

171 f"The number of model names ({len(model_names)}) should equal " 

172 f"the number of paths given ({len(paths)})." 

173 ) 

174 

175 # Load the data for each model into a CubeList per model. 

176 model_cubes = ( 

177 _load_model(path, name, constraint) 

178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

179 ) 

180 

181 # Split out first model's cubes and mark it as the base for comparisons. 

182 cubes = next(model_cubes) 

183 for cube in cubes: 

184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

185 cube.attributes["cset_comparison_base"] = 1 

186 

187 # Load the rest of the models. 

188 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

189 

190 # Unify time units so different case studies can merge. 

191 iris.util.unify_time_units(cubes) 

192 

193 # Select sub region. 

194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

195 

196 # Merge and concatenate cubes now metadata has been fixed. 

197 cubes = _merge_cubes_check_ensemble(cubes) 

198 cubes = cubes.concatenate() 

199 

200 # Squeeze single valued coordinates into scalar coordinates. 

201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

202 

203 # Ensure dimension coordinates are bounded. 

204 for cube in cubes: 

205 for dim_coord in cube.coords(dim_coords=True): 

206 if (dim_coord.standard_name == "time") and ( 

207 dim_coord.name() 

208 not in itertools.chain.from_iterable( 

209 m.coord_names for m in cube.cell_methods if m.method != "point" 

210 ) 

211 ): 

212 # Instantaneous time coordinate 

213 continue 

214 # Iris can't guess the bounds of a scalar coordinate. 

215 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

216 dim_coord.guess_bounds() 

217 

218 logging.info("Loaded cubes: %s", cubes) 

219 if len(cubes) == 0: 

220 raise NoDataError("No cubes loaded, check your constraints!") 

221 return cubes 

222 

223 

224def _load_model( 

225 paths: str | list[str], 

226 model_name: str | None, 

227 constraint: iris.Constraint | None, 

228) -> iris.cube.CubeList: 

229 """Load a single model's data into a CubeList.""" 

230 input_files = _check_input_files(paths) 

231 # If unset, a constraint of None lets everything be loaded. 

232 logging.debug("Constraint: %s", constraint) 

233 cubes = iris.load(input_files, constraint, callback=_loading_callback) 

234 # Make the UM's winds consistent with LFRic. 

235 _fix_um_winds(cubes) 

236 

237 # Add model_name attribute to each cube to make it available at any further 

238 # step without needing to pass it as function parameter. 

239 if model_name is not None: 

240 for cube in cubes: 

241 cube.attributes["model_name"] = model_name 

242 return cubes 

243 

244 

245def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

246 """Get an iterable of files to load, and check that they all exist. 

247 

248 Arguments 

249 --------- 

250 input_paths: list[str] 

251 List of paths to input files or directories. The path may itself contain 

252 glob patterns, but unlike in shells it will match directly first. 

253 

254 Returns 

255 ------- 

256 list[Path] 

257 A list of files to load. 

258 

259 Raises 

260 ------ 

261 FileNotFoundError: 

262 If the provided arguments don't resolve to at least one existing file. 

263 """ 

264 files = [] 

265 for raw_filename in iter_maybe(input_paths): 

266 # Match glob-like files first, if they exist. 

267 raw_path = Path(raw_filename) 

268 if raw_path.is_file(): 

269 files.append(raw_path) 

270 else: 

271 for input_path in glob.glob(raw_filename): 

272 # Convert string paths into Path objects. 

273 input_path = Path(input_path) 

274 # Get the list of files in the directory, or use it directly. 

275 if input_path.is_dir(): 

276 logging.debug("Checking directory '%s' for files", input_path) 

277 files.extend(p for p in input_path.iterdir() if p.is_file()) 

278 else: 

279 files.append(input_path) 

280 

281 files.sort() 

282 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

283 if len(files) == 0: 

284 raise FileNotFoundError(f"No files found for {input_paths}") 

285 return files 

286 

287 

288def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

289 """Merge CubeList, renumbering realizations of 0 if required. 

290 

291 An unsuccessful merge indicates common input cube attributes, most 

292 commonly from ensemble members missing an explicit realization 

293 coordinate. Therefore the members are renumbered before being merged 

294 again. 

295 """ 

296 try: 

297 cubes = cubes.merge() 

298 except iris.exceptions.MergeError: 

299 _log_once( 

300 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

301 level=logging.WARNING, 

302 ) 

303 for ir, cube in enumerate(cubes): 

304 if cube.coord("realization").points == 0: 304 ↛ 303line 304 didn't jump to line 303 because the condition on line 304 was always true

305 cube.coord("realization").points = ir + 1 

306 cubes = cubes.merge() 

307 return cubes 

308 

309 

310def _cutout_cubes( 

311 cubes: iris.cube.CubeList, 

312 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

313 subarea_extent: list[float], 

314): 

315 """Cut out a subarea from a CubeList.""" 

316 if subarea_type is None: 

317 logging.debug("Subarea selection is disabled.") 

318 return cubes 

319 

320 # If selected, cutout according to number of grid cells to trim from each edge. 

321 cutout_cubes = iris.cube.CubeList() 

322 # Find spatial coordinates 

323 for cube in cubes: 

324 # Find dimension coordinates. 

325 lat_name, lon_name = get_cube_yxcoordname(cube) 

326 

327 # Compute cutout based on number of cells to trim from edges. 

328 if subarea_type == "gridcells": 

329 logging.debug( 

330 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

331 subarea_extent[0], 

332 subarea_extent[1], 

333 subarea_extent[2], 

334 subarea_extent[3], 

335 ) 

336 lat_points = np.sort(cube.coord(lat_name).points) 

337 lon_points = np.sort(cube.coord(lon_name).points) 

338 # Define cutout region using user provided cell points. 

339 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

340 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

341 

342 # Compute cutout based on specified coordinate values. 

343 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

344 # If not gridcells, cutout by requested geographic area, 

345 logging.debug( 

346 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

347 subarea_extent[0], 

348 subarea_extent[1], 

349 subarea_extent[2], 

350 subarea_extent[3], 

351 ) 

352 # Define cutout region using user provided coordinates. 

353 lats = np.array(subarea_extent[0:2]) 

354 lons = np.array(subarea_extent[2:4]) 

355 # Ensure cutout longitudes are within +/- 180.0 bounds. 

356 while lons[0] < -180.0: 

357 lons += 360.0 

358 while lons[1] > 180.0: 

359 lons -= 360.0 

360 # If the coordinate system is rotated we convert coordinates into 

361 # model-relative coordinates to extract the appropriate cutout. 

362 coord_system = cube.coord(lat_name).coord_system 

363 if subarea_type == "realworld" and isinstance( 

364 coord_system, iris.coord_systems.RotatedGeogCS 

365 ): 

366 lons, lats = rotate_pole( 

367 lons, 

368 lats, 

369 pole_lon=coord_system.grid_north_pole_longitude, 

370 pole_lat=coord_system.grid_north_pole_latitude, 

371 ) 

372 else: 

373 raise ValueError("Unknown subarea_type:", subarea_type) 

374 

375 # Do cutout and add to cutout_cubes. 

376 intersection_args = {lat_name: lats, lon_name: lons} 

377 logging.debug("Cutting out coords: %s", intersection_args) 

378 try: 

379 cutout_cubes.append(cube.intersection(**intersection_args)) 

380 except IndexError as err: 

381 raise ValueError( 

382 "Region cutout error. Check and update SUBAREA_EXTENT." 

383 "Cutout region requested should be contained within data area. " 

384 "Also check if cutout region requested is smaller than input grid spacing." 

385 ) from err 

386 

387 return cutout_cubes 

388 

389 

390def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

391 """Compose together the needed callbacks into a single function.""" 

392 # Most callbacks operate in-place, but save the cube when returned! 

393 _realization_callback(cube) 

394 _um_normalise_callback(cube) 

395 _lfric_normalise_callback(cube) 

396 cube = _lfric_time_coord_fix_callback(cube) 

397 _normalise_var0_varname(cube) 

398 cube = _fix_no_spatial_coords_callback(cube) 

399 _fix_spatial_coords_callback(cube) 

400 _fix_pressure_coord_callback(cube) 

401 _fix_um_radtime(cube) 

402 _fix_cell_methods(cube) 

403 cube = _convert_cube_units_callback(cube) 

404 cube = _grid_longitude_fix_callback(cube) 

405 _fix_lfric_cloud_base_altitude(cube) 

406 _proleptic_gregorian_fix(cube) 

407 _lfric_time_callback(cube) 

408 _lfric_forecast_period_callback(cube) 

409 cube = _fix_no_time_coords_callback(cube) 

410 _normalise_ML_varname(cube) 

411 return cube 

412 

413 

414def _realization_callback(cube): 

415 """Add a realization coordinate initialised to 0 if missing. 

416 

417 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

418 of the code. 

419 """ 

420 # Only add if realization coordinate does not exist. 

421 if not cube.coords("realization"): 

422 cube.add_aux_coord( 

423 iris.coords.DimCoord(0, standard_name="realization", units="1") 

424 ) 

425 

426 

427@functools.lru_cache(None) 

428def _log_once(msg, level=logging.WARNING): 

429 """Print a warning message, skipping recent duplicates.""" 

430 logging.log(level, msg) 

431 

432 

433def _um_normalise_callback(cube: iris.cube.Cube): 

434 """Normalise UM STASH variable long names to LFRic variable names. 

435 

436 Note standard names will remain associated with cubes where different. 

437 Long name will be used consistently in output filename and titles. 

438 """ 

439 # Convert STASH to LFRic variable name 

440 if "STASH" in cube.attributes: 

441 stash = cube.attributes["STASH"] 

442 try: 

443 (name, grid) = STASH_TO_LFRIC[str(stash)] 

444 cube.long_name = name 

445 except KeyError: 

446 # Don't change cubes with unknown stash codes. 

447 _log_once( 

448 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

449 level=logging.WARNING, 

450 ) 

451 

452 

453def _lfric_normalise_callback(cube: iris.cube.Cube): 

454 """Normalise attributes that prevents LFRic cube from merging. 

455 

456 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

457 no relation to the data contained. These attributes are removed. 

458 

459 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

460 multiple UM fields, however it can be encoded in any order. This attribute 

461 is sorted to prevent this. This attribute is only present in LFRic data that 

462 has been converted to look like UM data. 

463 """ 

464 # Remove unwanted attributes. 

465 cube.attributes.pop("timeStamp", None) 

466 cube.attributes.pop("uuid", None) 

467 cube.attributes.pop("name", None) 

468 cube.attributes.pop("source", None) 

469 cube.attributes.pop("analysis_source", None) 

470 cube.attributes.pop("history", None) 

471 

472 # Sort STASH code list. 

473 stash_list = cube.attributes.get("um_stash_source") 

474 if stash_list: 

475 # Parse the string as a list, sort, then re-encode as a string. 

476 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

477 

478 

479def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

480 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

481 

482 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

483 issue, though the coordinate satisfies all the properties for a DimCoord. 

484 Scalar time values are left as AuxCoords. 

485 """ 

486 # This issue seems to come from iris's handling of NetCDF files where time 

487 # always ends up as an AuxCoord. 

488 if cube.coords("time"): 

489 time_coord = cube.coord("time") 

490 if ( 

491 not isinstance(time_coord, iris.coords.DimCoord) 

492 and len(cube.coord_dims(time_coord)) == 1 

493 ): 

494 # Fudge the bounds to foil checking for strict monotonicity. 

495 if time_coord.has_bounds(): 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true

496 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

497 time_coord.bounds = [ 

498 [ 

499 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

500 time_coord.bounds[i][1], 

501 ] 

502 for i in range(len(time_coord.bounds)) 

503 ] 

504 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

505 return cube 

506 

507 

508def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

509 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

510 

511 This is necessary if comparing two models with different conventions -- 

512 for example, models where the prime meridian is defined as 0 deg or 

513 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

514 so that it falls in this range. Checks are for near-180 bounds given 

515 model data bounds may not extend exactly to 0. or 360. 

516 Input cubes on non-rotated grid coordinates are not impacted. 

517 """ 

518 try: 

519 y, x = get_cube_yxcoordname(cube) 

520 except ValueError: 

521 # Don't modify non-spatial cubes. 

522 return cube 

523 

524 long_coord = cube.coord(x) 

525 # Wrap longitudes if rotated pole coordinates 

526 coord_system = long_coord.coord_system 

527 if x == "grid_longitude" and isinstance( 

528 coord_system, iris.coord_systems.RotatedGeogCS 

529 ): 

530 long_points = long_coord.points.copy() 

531 long_centre = np.median(long_points) 

532 while long_centre < -175.0: 

533 long_centre += 360.0 

534 long_points += 360.0 

535 while long_centre >= 175.0: 

536 long_centre -= 360.0 

537 long_points -= 360.0 

538 long_coord.points = long_points 

539 

540 # Update coord bounds to be consistent with wrapping. 

541 if long_coord.has_bounds(): 

542 long_coord.bounds = None 

543 long_coord.guess_bounds() 

544 

545 return cube 

546 

547 

548def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

549 import CSET.operators._utils as utils 

550 

551 # Don't modify spatial cubes that already have spatial dimensions 

552 if utils.is_spatialdim(cube): 

553 return cube 

554 

555 else: 

556 # attempt to get lat/long from cube attributes 

557 try: 

558 lat_min = cube.attributes.get("geospatial_lat_min") 

559 lat_max = cube.attributes.get("geospatial_lat_max") 

560 lon_min = cube.attributes.get("geospatial_lon_min") 

561 lon_max = cube.attributes.get("geospatial_lon_max") 

562 

563 lon_val = (lon_min + lon_max) / 2.0 

564 lat_val = (lat_min + lat_max) / 2.0 

565 

566 lat_coord = iris.coords.DimCoord( 

567 lat_val, 

568 standard_name="latitude", 

569 units="degrees_north", 

570 var_name="latitude", 

571 coord_system=iris.coord_systems.GeogCS(6371229.0), 

572 circular=True, 

573 ) 

574 

575 lon_coord = iris.coords.DimCoord( 

576 lon_val, 

577 standard_name="longitude", 

578 units="degrees_east", 

579 var_name="longitude", 

580 coord_system=iris.coord_systems.GeogCS(6371229.0), 

581 circular=True, 

582 ) 

583 

584 cube.add_aux_coord(lat_coord) 

585 cube.add_aux_coord(lon_coord) 

586 return cube 

587 

588 # if lat/long are not in attributes, then return cube unchanged: 

589 except TypeError: 

590 return cube 

591 

592 

593def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

594 """Check latitude and longitude coordinates name. 

595 

596 This is necessary as some models define their grid as on rotated 

597 'grid_latitude' and 'grid_longitude' coordinates while others define 

598 the grid on non-rotated 'latitude' and 'longitude'. 

599 Cube dimensions need to be made consistent to avoid recipe failures, 

600 particularly where comparing multiple input models with differing spatial 

601 coordinates. 

602 """ 

603 # Check if cube is spatial. 

604 if not is_spatialdim(cube): 

605 # Don't modify non-spatial cubes. 

606 return 

607 

608 # Get spatial coords and dimension index. 

609 y_name, x_name = get_cube_yxcoordname(cube) 

610 ny = get_cube_coordindex(cube, y_name) 

611 nx = get_cube_coordindex(cube, x_name) 

612 

613 # Remove spatial coords bounds if erroneous values detected. 

614 # Aims to catch some errors in input coord bounds by setting 

615 # invalid threshold of 10000.0 

616 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

617 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

618 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

619 if bx_max > 10000.0 or by_max > 10000.0: 

620 cube.coord(x_name).bounds = None 

621 cube.coord(y_name).bounds = None 

622 

623 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

624 # [latitude, longitude] for instances where rotated_pole=90.0 

625 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

626 coord_system = cube.coord("grid_latitude").coord_system 

627 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

628 if pole_lat == 90.0: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 lats = cube.coord("grid_latitude").points 

630 lons = cube.coord("grid_longitude").points 

631 

632 cube.remove_coord("grid_latitude") 

633 cube.add_dim_coord( 

634 iris.coords.DimCoord( 

635 lats, 

636 standard_name="latitude", 

637 var_name="latitude", 

638 units="degrees", 

639 coord_system=iris.coord_systems.GeogCS(6371229.0), 

640 circular=True, 

641 ), 

642 ny, 

643 ) 

644 y_name = "latitude" 

645 cube.remove_coord("grid_longitude") 

646 cube.add_dim_coord( 

647 iris.coords.DimCoord( 

648 lons, 

649 standard_name="longitude", 

650 var_name="longitude", 

651 units="degrees", 

652 coord_system=iris.coord_systems.GeogCS(6371229.0), 

653 circular=True, 

654 ), 

655 nx, 

656 ) 

657 x_name = "longitude" 

658 

659 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

660 # rotated pole attributes for cases with [lat, lon] inputs 

661 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

662 "degrees", 

663 "degrees_north", 

664 "degrees_south", 

665 ]: 

666 # Add grid_latitude AuxCoord 

667 if "grid_latitude" not in [ 

668 coord.name() for coord in cube.coords(dim_coords=False) 

669 ]: 

670 cube.add_aux_coord( 

671 iris.coords.AuxCoord( 

672 cube.coord(y_name).points, 

673 var_name="grid_latitude", 

674 units="degrees", 

675 ), 

676 ny, 

677 ) 

678 # Ensure input latitude DimCoord has CoordSystem 

679 # This attribute is sometimes lost on iris.save 

680 if not cube.coord(y_name).coord_system: 

681 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

682 

683 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

684 "degrees", 

685 "degrees_west", 

686 "degrees_east", 

687 ]: 

688 # Add grid_longitude AuxCoord 

689 if "grid_longitude" not in [ 

690 coord.name() for coord in cube.coords(dim_coords=False) 

691 ]: 

692 cube.add_aux_coord( 

693 iris.coords.AuxCoord( 

694 cube.coord(x_name).points, 

695 var_name="grid_longitude", 

696 units="degrees", 

697 ), 

698 nx, 

699 ) 

700 

701 # Ensure input longitude DimCoord has CoordSystem 

702 # This attribute is sometimes lost on iris.save 

703 if not cube.coord(x_name).coord_system: 

704 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

705 

706 

707def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

708 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

709 

710 This problem was raised because the AIFS model data from ECMWF 

711 defines the pressure coordinate with the name "pressure_level" rather 

712 than compliant CF coordinate names. 

713 

714 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

715 and approach the coordinates in a unified way. 

716 """ 

717 for coord in cube.dim_coords: 

718 if coord.name() in ["pressure_level", "pressure_levels"]: 

719 coord.rename("pressure") 

720 

721 if coord.name() == "pressure": 

722 if str(cube.coord("pressure").units) != "hPa": 

723 cube.coord("pressure").convert_units("hPa") 

724 

725 

726def _fix_um_radtime(cube: iris.cube.Cube): 

727 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

728 

729 This callback does not have any effect for output diagnostics with 

730 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

731 diagnostics are checked. 

732 Note this callback does not interpolate the data in time, only adjust 

733 timestamps to sit on the hour to enable time-to-time difference plotting 

734 with models which may output radiation data on the hour. 

735 """ 

736 try: 

737 if cube.attributes["STASH"] in [ 

738 "m01s01i207", 

739 "m01s01i208", 

740 "m01s02i205", 

741 "m01s02i201", 

742 "m01s01i207", 

743 "m01s02i207", 

744 "m01s01i235", 

745 ]: 

746 time_coord = cube.coord("time") 

747 

748 # Convert time points to datetime objects 

749 time_unit = time_coord.units 

750 time_points = time_unit.num2date(time_coord.points) 

751 # Skip if times don't need fixing. 

752 if time_points[0].minute == 0 and time_points[0].second == 0: 

753 return 

754 if time_points[0].minute == 30 and time_points[0].second == 0: 754 ↛ 755line 754 didn't jump to line 755 because the condition on line 754 was never true

755 return 

756 

757 # Subtract time difference from the hour from each time point 

758 n_minute = time_points[0].minute 

759 n_second = time_points[0].second 

760 # If times closer to next hour, compute difference to add on to following hour 

761 if n_minute > 30: 

762 n_minute = n_minute - 60 

763 # Compute new diagnostic time stamp 

764 new_time_points = ( 

765 time_points 

766 - datetime.timedelta(minutes=n_minute) 

767 - datetime.timedelta(seconds=n_second) 

768 ) 

769 

770 # Convert back to numeric values using the original time unit. 

771 new_time_values = time_unit.date2num(new_time_points) 

772 

773 # Replace the time coordinate with updated values. 

774 time_coord.points = new_time_values 

775 

776 # Recompute forecast_period with corrected values. 

777 if cube.coord("forecast_period"): 777 ↛ exitline 777 didn't return from function '_fix_um_radtime' because the condition on line 777 was always true

778 fcst_prd_points = cube.coord("forecast_period").points 

779 new_fcst_points = ( 

780 time_unit.num2date(fcst_prd_points) 

781 - datetime.timedelta(minutes=n_minute) 

782 - datetime.timedelta(seconds=n_second) 

783 ) 

784 cube.coord("forecast_period").points = time_unit.date2num( 

785 new_fcst_points 

786 ) 

787 except KeyError: 

788 pass 

789 

790 

791def _fix_cell_methods(cube: iris.cube.Cube): 

792 """To fix the assumed cell_methods in accumulation STASH from UM. 

793 

794 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

795 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

796 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

797 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

798 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

799 enabling the cell_method constraint on reading to select correct input. 

800 """ 

801 # Shift "mean" cell_method to "sum" for selected UM inputs. 

802 if cube.attributes.get("STASH") in [ 

803 "m01s21i104", 

804 "m01s04i201", 

805 "m01s04i202", 

806 "m01s05i201", 

807 "m01s05i202", 

808 ]: 

809 # Check if input cell_method contains "mean" time-processing. 

810 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 810 ↛ exitline 810 didn't return from function '_fix_cell_methods' because the condition on line 810 was always true

811 # Retrieve interval and any comment information. 

812 for cell_method in cube.cell_methods: 

813 interval_str = cell_method.intervals 

814 comment_str = cell_method.comments 

815 

816 # Remove input aggregation method. 

817 cube.cell_methods = () 

818 

819 # Replace "mean" with "sum" cell_method to indicate aggregation. 

820 cube.add_cell_method( 

821 iris.coords.CellMethod( 

822 method="sum", 

823 coords="time", 

824 intervals=interval_str, 

825 comments=comment_str, 

826 ) 

827 ) 

828 

829 

830def _convert_cube_units_callback(cube: iris.cube.Cube): 

831 """Adjust diagnostic units for specific variables. 

832 

833 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

834 converted here to mm hr-1. 

835 

836 Visibility diagnostics are converted here from m to km to improve output 

837 formatting. 

838 """ 

839 # Convert precipitation diagnostic units if required. 

840 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

841 if any("surface_microphysical" in name for name in varnames): 

842 if cube.units == "kg m-2 s-1": 

843 _log_once( 

844 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

845 level=logging.DEBUG, 

846 ) 

847 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

848 # This is a 1:1 conversion, so we just change the units. 

849 cube.units = "mm s-1" 

850 # Convert the units to per hour. 

851 cube.convert_units("mm hr-1") 

852 elif cube.units == "kg m-2": 852 ↛ 862line 852 didn't jump to line 862 because the condition on line 852 was always true

853 _log_once( 

854 "Converting precipitation amount units from kg m-2 to mm", 

855 level=logging.DEBUG, 

856 ) 

857 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

858 # This is a 1:1 conversion, so we just change the units. 

859 cube.units = "mm" 

860 

861 # Convert visibility diagnostic units if required. 

862 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

863 if any("visibility" in name for name in varnames): 

864 if cube.units == "m": 864 ↛ 869line 864 didn't jump to line 869 because the condition on line 864 was always true

865 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

866 # Convert the units to km. 

867 cube.convert_units("km") 

868 

869 return cube 

870 

871 

872def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

873 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

874 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

875 if any("cloud_base_altitude" in name for name in varnames): 

876 # Mask cube where set > 144kft to catch default 144.35695538058164 

877 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

878 

879 

880def _fix_um_winds(cubes: iris.cube.CubeList): 

881 """To make winds from the UM consistent with those from LFRic. 

882 

883 Diagnostics of wind are not always consistent between the UM 

884 and LFric. Here, winds from the UM are adjusted to make them i 

885 consistent with LFRic. 

886 """ 

887 # Check whether we have components of the wind identified by STASH, 

888 # (so this will apply only to cubes from the UM), but not the 

889 # wind speed and calculate it if it is missing. Note that 

890 # this will be biased low in general because the components will mostly 

891 # be time averages. For simplicity, we do this only if there is just one 

892 # cube of a component. A more complicated approach would be to consider 

893 # the cell methods, but it may not be warranted. 

894 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

895 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

896 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

897 try: 

898 if cubes.extract(u_constr) and cubes.extract(v_constr): 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true

899 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

900 _add_wind_speed_um(cubes) 

901 # Convert winds in the UM to be relative to true east and true north. 

902 _convert_wind_true_dirn_um(cubes) 

903 except (KeyError, AttributeError): 

904 pass 

905 

906 

907def _add_wind_speed_um(cubes: iris.cube.CubeList): 

908 """Add windspeeds to cubes from the UM.""" 

909 wspd10 = ( 

910 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

911 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

912 ) ** 0.5 

913 wspd10.attributes["STASH"] = "m01s03i227" 

914 wspd10.standard_name = "wind_speed" 

915 wspd10.long_name = "wind_speed_at_10m" 

916 cubes.append(wspd10) 

917 

918 

919def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

920 """To convert winds to true directions. 

921 

922 Convert from the components relative to the grid to true directions. 

923 This functionality only handles the simplest case. 

924 """ 

925 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

926 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

927 for u, v in zip(u_grids, v_grids, strict=True): 

928 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

929 u.data = true_u.core_data() 

930 v.data = true_v.core_data() 

931 

932 

933def _normalise_var0_varname(cube: iris.cube.Cube): 

934 """Fix varnames for consistency to allow merging. 

935 

936 Some model data netCDF sometimes have a coordinate name end in 

937 "_0" etc, where duplicate coordinates of same name are defined but 

938 with different attributes. This can be inconsistently managed in 

939 different model inputs and can cause cubes to fail to merge. 

940 """ 

941 for coord in cube.coords(): 

942 if coord.var_name and coord.var_name.endswith("_0"): 

943 coord.var_name = coord.var_name.removesuffix("_0") 

944 if coord.var_name and coord.var_name.endswith("_1"): 

945 coord.var_name = coord.var_name.removesuffix("_1") 

946 if coord.var_name and coord.var_name.endswith("_2"): 946 ↛ 947line 946 didn't jump to line 947 because the condition on line 946 was never true

947 coord.var_name = coord.var_name.removesuffix("_2") 

948 if coord.var_name and coord.var_name.endswith("_3"): 948 ↛ 949line 948 didn't jump to line 949 because the condition on line 948 was never true

949 coord.var_name = coord.var_name.removesuffix("_3") 

950 

951 if cube.var_name and cube.var_name.endswith("_0"): 

952 cube.var_name = cube.var_name.removesuffix("_0") 

953 

954 

955def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

956 """Convert the calendars of time units to use a standard calendar.""" 

957 try: 

958 time_coord = cube.coord("time") 

959 if time_coord.units.calendar == "proleptic_gregorian": 

960 logging.debug( 

961 "Changing proleptic Gregorian calendar to standard calendar for %s", 

962 repr(time_coord.units), 

963 ) 

964 time_coord.units = time_coord.units.change_calendar("standard") 

965 except iris.exceptions.CoordinateNotFoundError: 

966 pass 

967 

968 

969def _lfric_time_callback(cube: iris.cube.Cube): 

970 """Fix time coordinate metadata if missing dimensions. 

971 

972 Some model data does not contain forecast_reference_time or forecast_period as 

973 expected coordinates, and so we cannot aggregate over case studies without this 

974 metadata. This callback fixes these issues. 

975 

976 This callback also ensures all time coordinates are referenced as hours since 

977 1970-01-01 00:00:00 for consistency across different model inputs. 

978 

979 Notes 

980 ----- 

981 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

982 """ 

983 # Construct forecast_reference time if it doesn't exist. 

984 try: 

985 tcoord = cube.coord("time") 

986 # Set time coordinate to common basis "hours since 1970" 

987 try: 

988 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

989 except ValueError: 

990 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

991 

992 if not cube.coords("forecast_reference_time"): 

993 try: 

994 init_time = datetime.datetime.fromisoformat( 

995 tcoord.attributes["time_origin"] 

996 ) 

997 frt_point = tcoord.units.date2num(init_time) 

998 frt_coord = iris.coords.AuxCoord( 

999 frt_point, 

1000 units=tcoord.units, 

1001 standard_name="forecast_reference_time", 

1002 long_name="forecast_reference_time", 

1003 ) 

1004 cube.add_aux_coord(frt_coord) 

1005 except KeyError: 

1006 logging.warning( 

1007 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1008 ) 

1009 

1010 # Remove time_origin to allow multiple case studies to merge. 

1011 tcoord.attributes.pop("time_origin", None) 

1012 

1013 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1014 if not cube.coords("forecast_period"): 

1015 try: 

1016 # Create array of forecast lead times. 

1017 init_coord = cube.coord("forecast_reference_time") 

1018 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1019 init_coord.units.num2date(init_coord.points) 

1020 ) 

1021 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1022 

1023 # Get unit for lead time from time coordinate's unit. 

1024 # Convert all lead time to hours for consistency between models. 

1025 if "seconds" in str(tcoord.units): 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true

1026 lead_times = lead_times / 3600.0 

1027 units = "hours" 

1028 elif "hours" in str(tcoord.units): 1028 ↛ 1031line 1028 didn't jump to line 1031 because the condition on line 1028 was always true

1029 units = "hours" 

1030 else: 

1031 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1032 

1033 # Create lead time coordinate. 

1034 lead_time_coord = iris.coords.AuxCoord( 

1035 lead_times, 

1036 standard_name="forecast_period", 

1037 long_name="forecast_period", 

1038 units=units, 

1039 ) 

1040 

1041 # Associate lead time coordinate with time dimension. 

1042 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1043 except iris.exceptions.CoordinateNotFoundError: 

1044 logging.warning( 

1045 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1046 ) 

1047 except iris.exceptions.CoordinateNotFoundError: 

1048 logging.warning("No time coordinate on cube.") 

1049 

1050 

1051def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1052 """Check forecast_period name and units.""" 

1053 try: 

1054 coord = cube.coord("forecast_period") 

1055 if coord.units != "hours": 

1056 cube.coord("forecast_period").convert_units("hours") 

1057 if not coord.standard_name: 

1058 coord.standard_name = "forecast_period" 

1059 except iris.exceptions.CoordinateNotFoundError: 

1060 pass 

1061 

1062 

1063def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1064 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1065 # Only add if time coordinate does not exist. 

1066 if not cube.coords("time"): 

1067 cube.add_aux_coord( 

1068 iris.coords.DimCoord( 

1069 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1070 ) 

1071 ) 

1072 

1073 return cube 

1074 

1075 

1076def _normalise_ML_varname(cube: iris.cube.Cube): 

1077 """Fix plev variable names to standard names.""" 

1078 if cube.coords("pressure"): 

1079 if cube.name() == "x_wind": 

1080 cube.long_name = "zonal_wind_at_pressure_levels" 

1081 if cube.name() == "y_wind": 

1082 cube.long_name = "meridional_wind_at_pressure_levels" 

1083 if cube.name() == "air_temperature": 

1084 cube.long_name = "temperature_at_pressure_levels" 

1085 if cube.name() == "specific_humidity": 1085 ↛ 1086line 1085 didn't jump to line 1086 because the condition on line 1085 was never true

1086 cube.long_name = ( 

1087 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1088 )