Coverage for src / CSET / operators / read.py: 91%

407 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 15:21 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43 

44 

45class NoDataError(FileNotFoundError): 

46 """Error that no data has been loaded.""" 

47 

48 

49def read_cube( 

50 file_paths: list[str] | str, 

51 constraint: iris.Constraint = None, 

52 model_names: list[str] | str | None = None, 

53 subarea_type: str = None, 

54 subarea_extent: list[float] = None, 

55 **kwargs, 

56) -> iris.cube.Cube: 

57 """Read a single cube from files. 

58 

59 Read operator that takes a path string (can include shell-style glob 

60 patterns), and loads the cube matching the constraint. If any paths point to 

61 directory, all the files contained within are loaded. 

62 

63 Ensemble data can also be loaded. If it has a realization coordinate 

64 already, it will be directly used. If not, it will have its member number 

65 guessed from the filename, based on one of several common patterns. For 

66 example the pattern *emXX*, where XX is the realization. 

67 

68 Deterministic data will be loaded with a realization of 0, allowing it to be 

69 processed in the same way as ensemble data. 

70 

71 Arguments 

72 --------- 

73 file_paths: str | list[str] 

74 Path or paths to where .pp/.nc files are located 

75 constraint: iris.Constraint | iris.ConstraintCombination, optional 

76 Constraints to filter data by. Defaults to unconstrained. 

77 model_names: str | list[str], optional 

78 Names of the models that correspond to respective paths in file_paths. 

79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

80 Whether to constrain data by model relative coordinates or real world 

81 coordinates. 

82 subarea_extent: list, optional 

83 List of coordinates to constraint data by, in order lower latitude, 

84 upper latitude, lower longitude, upper longitude. 

85 

86 Returns 

87 ------- 

88 cubes: iris.cube.Cube 

89 Cube loaded 

90 

91 Raises 

92 ------ 

93 FileNotFoundError 

94 If the provided path does not exist 

95 ValueError 

96 If the constraint doesn't produce a single cube. 

97 """ 

98 cubes = read_cubes( 

99 file_paths=file_paths, 

100 constraint=constraint, 

101 model_names=model_names, 

102 subarea_type=subarea_type, 

103 subarea_extent=subarea_extent, 

104 ) 

105 # Check filtered cubes is a CubeList containing one cube. 

106 if len(cubes) == 1: 

107 return cubes[0] 

108 else: 

109 raise ValueError( 

110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

111 ) 

112 

113 

114def read_cubes( 

115 file_paths: list[str] | str, 

116 constraint: iris.Constraint | None = None, 

117 model_names: str | list[str] | None = None, 

118 subarea_type: str = None, 

119 subarea_extent: list = None, 

120 **kwargs, 

121) -> iris.cube.CubeList: 

122 """Read cubes from files. 

123 

124 Read operator that takes a path string (can include shell-style glob 

125 patterns), and loads the cubes matching the constraint. If any paths point 

126 to directory, all the files contained within are loaded. 

127 

128 Ensemble data can also be loaded. If it has a realization coordinate 

129 already, it will be directly used. If not, it will have its member number 

130 guessed from the filename, based on one of several common patterns. For 

131 example the pattern *emXX*, where XX is the realization. 

132 

133 Deterministic data will be loaded with a realization of 0, allowing it to be 

134 processed in the same way as ensemble data. 

135 

136 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

137 that the cubes merge across files. 

138 

139 Arguments 

140 --------- 

141 file_paths: str | list[str] 

142 Path or paths to where .pp/.nc files are located. Can include globs. 

143 constraint: iris.Constraint | iris.ConstraintCombination, optional 

144 Constraints to filter data by. Defaults to unconstrained. 

145 model_names: str | list[str], optional 

146 Names of the models that correspond to respective paths in file_paths. 

147 subarea_type: str, optional 

148 Whether to constrain data by model relative coordinates or real world 

149 coordinates. 

150 subarea_extent: list[float], optional 

151 List of coordinates to constraint data by, in order lower latitude, 

152 upper latitude, lower longitude, upper longitude. 

153 

154 Returns 

155 ------- 

156 cubes: iris.cube.CubeList 

157 Cubes loaded after being merged and concatenated. 

158 

159 Raises 

160 ------ 

161 FileNotFoundError 

162 If the provided path does not exist 

163 """ 

164 # Get iterable of paths. Each path corresponds to 1 model. 

165 paths = iter_maybe(file_paths) 

166 model_names = iter_maybe(model_names) 

167 

168 # Check we have appropriate number of model names. 

169 if model_names != (None,) and len(model_names) != len(paths): 

170 raise ValueError( 

171 f"The number of model names ({len(model_names)}) should equal " 

172 f"the number of paths given ({len(paths)})." 

173 ) 

174 

175 # Load the data for each model into a CubeList per model. 

176 model_cubes = ( 

177 _load_model(path, name, constraint) 

178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

179 ) 

180 

181 # Split out first model's cubes and mark it as the base for comparisons. 

182 cubes = next(model_cubes) 

183 for cube in cubes: 

184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

185 cube.attributes["cset_comparison_base"] = 1 

186 

187 # Load the rest of the models. 

188 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

189 

190 # Unify time units so different case studies can merge. 

191 iris.util.unify_time_units(cubes) 

192 

193 # Select sub region. 

194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

195 

196 # Merge and concatenate cubes now metadata has been fixed. 

197 cubes = _merge_cubes_check_ensemble(cubes) 

198 cubes = cubes.concatenate() 

199 

200 # Squeeze single valued coordinates into scalar coordinates. 

201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

202 

203 # Ensure dimension coordinates are bounded. 

204 for cube in cubes: 

205 for dim_coord in cube.coords(dim_coords=True): 

206 # Iris can't guess the bounds of a scalar coordinate. 

207 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

208 dim_coord.guess_bounds() 

209 

210 logging.info("Loaded cubes: %s", cubes) 

211 if len(cubes) == 0: 

212 raise NoDataError("No cubes loaded, check your constraints!") 

213 return cubes 

214 

215 

216def _load_model( 

217 paths: str | list[str], 

218 model_name: str | None, 

219 constraint: iris.Constraint | None, 

220) -> iris.cube.CubeList: 

221 """Load a single model's data into a CubeList.""" 

222 input_files = _check_input_files(paths) 

223 # If unset, a constraint of None lets everything be loaded. 

224 logging.debug("Constraint: %s", constraint) 

225 cubes = iris.load(input_files, constraint, callback=_loading_callback) 

226 # Make the UM's winds consistent with LFRic. 

227 _fix_um_winds(cubes) 

228 

229 # Add model_name attribute to each cube to make it available at any further 

230 # step without needing to pass it as function parameter. 

231 if model_name is not None: 

232 for cube in cubes: 

233 cube.attributes["model_name"] = model_name 

234 return cubes 

235 

236 

237def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

238 """Get an iterable of files to load, and check that they all exist. 

239 

240 Arguments 

241 --------- 

242 input_paths: list[str] 

243 List of paths to input files or directories. The path may itself contain 

244 glob patterns, but unlike in shells it will match directly first. 

245 

246 Returns 

247 ------- 

248 list[Path] 

249 A list of files to load. 

250 

251 Raises 

252 ------ 

253 FileNotFoundError: 

254 If the provided arguments don't resolve to at least one existing file. 

255 """ 

256 files = [] 

257 for raw_filename in iter_maybe(input_paths): 

258 # Match glob-like files first, if they exist. 

259 raw_path = Path(raw_filename) 

260 if raw_path.is_file(): 

261 files.append(raw_path) 

262 else: 

263 for input_path in glob.glob(raw_filename): 

264 # Convert string paths into Path objects. 

265 input_path = Path(input_path) 

266 # Get the list of files in the directory, or use it directly. 

267 if input_path.is_dir(): 

268 logging.debug("Checking directory '%s' for files", input_path) 

269 files.extend(p for p in input_path.iterdir() if p.is_file()) 

270 else: 

271 files.append(input_path) 

272 

273 files.sort() 

274 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

275 if len(files) == 0: 

276 raise FileNotFoundError(f"No files found for {input_paths}") 

277 return files 

278 

279 

280def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

281 """Merge CubeList, renumbering realizations of 0 if required. 

282 

283 An unsuccessful merge indicates common input cube attributes, most 

284 commonly from ensemble members missing an explicit realization 

285 coordinate. Therefore the members are renumbered before being merged 

286 again. 

287 """ 

288 try: 

289 cubes = cubes.merge() 

290 except iris.exceptions.MergeError: 

291 _log_once( 

292 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

293 level=logging.WARNING, 

294 ) 

295 for ir, cube in enumerate(cubes): 

296 if cube.coord("realization").points == 0: 296 ↛ 295line 296 didn't jump to line 295 because the condition on line 296 was always true

297 cube.coord("realization").points = ir + 1 

298 cubes = cubes.merge() 

299 return cubes 

300 

301 

302def _cutout_cubes( 

303 cubes: iris.cube.CubeList, 

304 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

305 subarea_extent: list[float], 

306): 

307 """Cut out a subarea from a CubeList.""" 

308 if subarea_type is None: 

309 logging.debug("Subarea selection is disabled.") 

310 return cubes 

311 

312 # If selected, cutout according to number of grid cells to trim from each edge. 

313 cutout_cubes = iris.cube.CubeList() 

314 # Find spatial coordinates 

315 for cube in cubes: 

316 # Find dimension coordinates. 

317 lat_name, lon_name = get_cube_yxcoordname(cube) 

318 

319 # Compute cutout based on number of cells to trim from edges. 

320 if subarea_type == "gridcells": 

321 logging.debug( 

322 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

323 subarea_extent[0], 

324 subarea_extent[1], 

325 subarea_extent[2], 

326 subarea_extent[3], 

327 ) 

328 lat_points = np.sort(cube.coord(lat_name).points) 

329 lon_points = np.sort(cube.coord(lon_name).points) 

330 # Define cutout region using user provided cell points. 

331 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

332 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

333 

334 # Compute cutout based on specified coordinate values. 

335 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

336 # If not gridcells, cutout by requested geographic area, 

337 logging.debug( 

338 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

339 subarea_extent[0], 

340 subarea_extent[1], 

341 subarea_extent[2], 

342 subarea_extent[3], 

343 ) 

344 # Define cutout region using user provided coordinates. 

345 lats = np.array(subarea_extent[0:2]) 

346 lons = np.array(subarea_extent[2:4]) 

347 # Ensure cutout longitudes are within +/- 180.0 bounds. 

348 while lons[0] < -180.0: 

349 lons += 360.0 

350 while lons[1] > 180.0: 

351 lons -= 360.0 

352 # If the coordinate system is rotated we convert coordinates into 

353 # model-relative coordinates to extract the appropriate cutout. 

354 coord_system = cube.coord(lat_name).coord_system 

355 if subarea_type == "realworld" and isinstance( 

356 coord_system, iris.coord_systems.RotatedGeogCS 

357 ): 

358 lons, lats = rotate_pole( 

359 lons, 

360 lats, 

361 pole_lon=coord_system.grid_north_pole_longitude, 

362 pole_lat=coord_system.grid_north_pole_latitude, 

363 ) 

364 else: 

365 raise ValueError("Unknown subarea_type:", subarea_type) 

366 

367 # Do cutout and add to cutout_cubes. 

368 intersection_args = {lat_name: lats, lon_name: lons} 

369 logging.debug("Cutting out coords: %s", intersection_args) 

370 try: 

371 cutout_cubes.append(cube.intersection(**intersection_args)) 

372 except IndexError as err: 

373 raise ValueError( 

374 "Region cutout error. Check and update SUBAREA_EXTENT." 

375 "Cutout region requested should be contained within data area. " 

376 "Also check if cutout region requested is smaller than input grid spacing." 

377 ) from err 

378 

379 return cutout_cubes 

380 

381 

382def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

383 """Compose together the needed callbacks into a single function.""" 

384 # Most callbacks operate in-place, but save the cube when returned! 

385 _realization_callback(cube) 

386 _um_normalise_callback(cube) 

387 _lfric_normalise_callback(cube) 

388 _nimrod_normalise_callback(cube) 

389 cube = _lfric_time_coord_fix_callback(cube) 

390 _normalise_var0_varname(cube) 

391 cube = _fix_no_spatial_coords_callback(cube) 

392 _fix_spatial_coords_callback(cube) 

393 _fix_pressure_coord_callback(cube) 

394 _fix_um_radtime(cube) 

395 _fix_cell_methods(cube) 

396 cube = _convert_cube_units_callback(cube) 

397 cube = _grid_longitude_fix_callback(cube) 

398 _fix_lfric_cloud_base_altitude(cube) 

399 _proleptic_gregorian_fix(cube) 

400 _lfric_time_callback(cube) 

401 _lfric_forecast_period_callback(cube) 

402 cube = _fix_no_time_coords_callback(cube) 

403 _normalise_ML_varname(cube) 

404 return cube 

405 

406 

407def _realization_callback(cube): 

408 """Add a realization coordinate initialised to 0 if missing. 

409 

410 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

411 of the code. 

412 """ 

413 # Only add if realization coordinate does not exist. 

414 if not cube.coords("realization"): 

415 cube.add_aux_coord( 

416 iris.coords.DimCoord(0, standard_name="realization", units="1") 

417 ) 

418 

419 

420@functools.lru_cache(None) 

421def _log_once(msg, level=logging.WARNING): 

422 """Print a warning message, skipping recent duplicates.""" 

423 logging.log(level, msg) 

424 

425 

426def _um_normalise_callback(cube: iris.cube.Cube): 

427 """Normalise UM STASH variable long names to LFRic variable names. 

428 

429 Note standard names will remain associated with cubes where different. 

430 Long name will be used consistently in output filename and titles. 

431 """ 

432 # Convert STASH to LFRic variable name 

433 if "STASH" in cube.attributes: 

434 stash = cube.attributes["STASH"] 

435 try: 

436 (name, grid) = STASH_TO_LFRIC[str(stash)] 

437 cube.long_name = name 

438 except KeyError: 

439 # Don't change cubes with unknown stash codes. 

440 _log_once( 

441 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

442 level=logging.WARNING, 

443 ) 

444 

445 

446def _lfric_normalise_callback(cube: iris.cube.Cube): 

447 """Normalise attributes that prevents LFRic cube from merging. 

448 

449 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

450 no relation to the data contained. These attributes are removed. 

451 

452 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

453 multiple UM fields, however it can be encoded in any order. This attribute 

454 is sorted to prevent this. This attribute is only present in LFRic data that 

455 has been converted to look like UM data. 

456 """ 

457 # Remove unwanted attributes. 

458 cube.attributes.pop("timeStamp", None) 

459 cube.attributes.pop("uuid", None) 

460 cube.attributes.pop("name", None) 

461 cube.attributes.pop("source", None) 

462 cube.attributes.pop("analysis_source", None) 

463 cube.attributes.pop("history", None) 

464 

465 # Sort STASH code list. 

466 stash_list = cube.attributes.get("um_stash_source") 

467 if stash_list: 

468 # Parse the string as a list, sort, then re-encode as a string. 

469 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

470 

471 

472def _nimrod_normalise_callback(cube: iris.cube.Cube): 

473 """Normalise attributes that prevents NIMROD radar cubes from merging.""" 

474 # Remove unwanted attributes. 

475 cube.attributes.pop("radar_sites", None) 

476 cube.attributes.pop("additional_radar_sites", None) 

477 cube.attributes.pop("recursive_filter_iterations", None) 

478 cube.attributes.pop("Probability methods", None) 

479 

480 

481def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

482 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

483 

484 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

485 issue, though the coordinate satisfies all the properties for a DimCoord. 

486 Scalar time values are left as AuxCoords. 

487 """ 

488 # This issue seems to come from iris's handling of NetCDF files where time 

489 # always ends up as an AuxCoord. 

490 if cube.coords("time"): 

491 time_coord = cube.coord("time") 

492 if ( 

493 not isinstance(time_coord, iris.coords.DimCoord) 

494 and len(cube.coord_dims(time_coord)) == 1 

495 ): 

496 # Fudge the bounds to foil checking for strict monotonicity. 

497 if time_coord.has_bounds(): 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

499 time_coord.bounds = [ 

500 [ 

501 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

502 time_coord.bounds[i][1], 

503 ] 

504 for i in range(len(time_coord.bounds)) 

505 ] 

506 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

507 return cube 

508 

509 

510def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

511 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

512 

513 This is necessary if comparing two models with different conventions -- 

514 for example, models where the prime meridian is defined as 0 deg or 

515 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

516 so that it falls in this range. Checks are for near-180 bounds given 

517 model data bounds may not extend exactly to 0. or 360. 

518 Input cubes on non-rotated grid coordinates are not impacted. 

519 """ 

520 try: 

521 y, x = get_cube_yxcoordname(cube) 

522 except ValueError: 

523 # Don't modify non-spatial cubes. 

524 return cube 

525 

526 long_coord = cube.coord(x) 

527 # Wrap longitudes if rotated pole coordinates 

528 coord_system = long_coord.coord_system 

529 if x == "grid_longitude" and isinstance( 

530 coord_system, iris.coord_systems.RotatedGeogCS 

531 ): 

532 long_points = long_coord.points.copy() 

533 long_centre = np.median(long_points) 

534 while long_centre < -175.0: 

535 long_centre += 360.0 

536 long_points += 360.0 

537 while long_centre >= 175.0: 

538 long_centre -= 360.0 

539 long_points -= 360.0 

540 long_coord.points = long_points 

541 

542 # Update coord bounds to be consistent with wrapping. 

543 if long_coord.has_bounds(): 

544 long_coord.bounds = None 

545 long_coord.guess_bounds() 

546 

547 return cube 

548 

549 

550def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

551 import CSET.operators._utils as utils 

552 

553 # Don't modify spatial cubes that already have spatial dimensions 

554 if utils.is_spatialdim(cube): 

555 return cube 

556 

557 else: 

558 # attempt to get lat/long from cube attributes 

559 try: 

560 lat_min = cube.attributes.get("geospatial_lat_min") 

561 lat_max = cube.attributes.get("geospatial_lat_max") 

562 lon_min = cube.attributes.get("geospatial_lon_min") 

563 lon_max = cube.attributes.get("geospatial_lon_max") 

564 

565 lon_val = (lon_min + lon_max) / 2.0 

566 lat_val = (lat_min + lat_max) / 2.0 

567 

568 lat_coord = iris.coords.DimCoord( 

569 lat_val, 

570 standard_name="latitude", 

571 units="degrees_north", 

572 var_name="latitude", 

573 coord_system=iris.coord_systems.GeogCS(6371229.0), 

574 circular=True, 

575 ) 

576 

577 lon_coord = iris.coords.DimCoord( 

578 lon_val, 

579 standard_name="longitude", 

580 units="degrees_east", 

581 var_name="longitude", 

582 coord_system=iris.coord_systems.GeogCS(6371229.0), 

583 circular=True, 

584 ) 

585 

586 cube.add_aux_coord(lat_coord) 

587 cube.add_aux_coord(lon_coord) 

588 return cube 

589 

590 # if lat/long are not in attributes, then return cube unchanged: 

591 except TypeError: 

592 return cube 

593 

594 

595def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

596 """Check latitude and longitude coordinates name. 

597 

598 This is necessary as some models define their grid as on rotated 

599 'grid_latitude' and 'grid_longitude' coordinates while others define 

600 the grid on non-rotated 'latitude' and 'longitude'. 

601 Cube dimensions need to be made consistent to avoid recipe failures, 

602 particularly where comparing multiple input models with differing spatial 

603 coordinates. 

604 """ 

605 # Check if cube is spatial. 

606 if not is_spatialdim(cube): 

607 # Don't modify non-spatial cubes. 

608 return 

609 

610 # Get spatial coords and dimension index. 

611 y_name, x_name = get_cube_yxcoordname(cube) 

612 ny = get_cube_coordindex(cube, y_name) 

613 nx = get_cube_coordindex(cube, x_name) 

614 

615 # Remove spatial coords bounds if erroneous values detected. 

616 # Aims to catch some errors in input coord bounds by setting 

617 # invalid threshold of 10000.0 

618 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

619 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

620 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

621 if bx_max > 10000.0 or by_max > 10000.0: 

622 cube.coord(x_name).bounds = None 

623 cube.coord(y_name).bounds = None 

624 

625 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

626 # [latitude, longitude] for instances where rotated_pole=90.0 

627 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

628 coord_system = cube.coord("grid_latitude").coord_system 

629 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

630 if pole_lat == 90.0: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true

631 lats = cube.coord("grid_latitude").points 

632 lons = cube.coord("grid_longitude").points 

633 

634 cube.remove_coord("grid_latitude") 

635 cube.add_dim_coord( 

636 iris.coords.DimCoord( 

637 lats, 

638 standard_name="latitude", 

639 var_name="latitude", 

640 units="degrees", 

641 coord_system=iris.coord_systems.GeogCS(6371229.0), 

642 circular=True, 

643 ), 

644 ny, 

645 ) 

646 y_name = "latitude" 

647 cube.remove_coord("grid_longitude") 

648 cube.add_dim_coord( 

649 iris.coords.DimCoord( 

650 lons, 

651 standard_name="longitude", 

652 var_name="longitude", 

653 units="degrees", 

654 coord_system=iris.coord_systems.GeogCS(6371229.0), 

655 circular=True, 

656 ), 

657 nx, 

658 ) 

659 x_name = "longitude" 

660 

661 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

662 # rotated pole attributes for cases with [lat, lon] inputs 

663 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

664 "degrees", 

665 "degrees_north", 

666 "degrees_south", 

667 ]: 

668 # Add grid_latitude AuxCoord 

669 if "grid_latitude" not in [ 669 ↛ 682line 669 didn't jump to line 682 because the condition on line 669 was always true

670 coord.name() for coord in cube.coords(dim_coords=False) 

671 ]: 

672 cube.add_aux_coord( 

673 iris.coords.AuxCoord( 

674 cube.coord(y_name).points, 

675 var_name="grid_latitude", 

676 units="degrees", 

677 ), 

678 ny, 

679 ) 

680 # Ensure input latitude DimCoord has CoordSystem 

681 # This attribute is sometimes lost on iris.save 

682 if not cube.coord(y_name).coord_system: 

683 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

684 

685 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

686 "degrees", 

687 "degrees_west", 

688 "degrees_east", 

689 ]: 

690 # Add grid_longitude AuxCoord 

691 if "grid_longitude" not in [ 691 ↛ 705line 691 didn't jump to line 705 because the condition on line 691 was always true

692 coord.name() for coord in cube.coords(dim_coords=False) 

693 ]: 

694 cube.add_aux_coord( 

695 iris.coords.AuxCoord( 

696 cube.coord(x_name).points, 

697 var_name="grid_longitude", 

698 units="degrees", 

699 ), 

700 nx, 

701 ) 

702 

703 # Ensure input longitude DimCoord has CoordSystem 

704 # This attribute is sometimes lost on iris.save 

705 if not cube.coord(x_name).coord_system: 

706 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

707 

708 

709def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

710 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

711 

712 This problem was raised because the AIFS model data from ECMWF 

713 defines the pressure coordinate with the name "pressure_level" rather 

714 than compliant CF coordinate names. 

715 

716 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

717 and approach the coordinates in a unified way. 

718 """ 

719 for coord in cube.dim_coords: 

720 if coord.name() in ["pressure_level", "pressure_levels"]: 

721 coord.rename("pressure") 

722 

723 if coord.name() == "pressure": 

724 if str(cube.coord("pressure").units) != "hPa": 

725 cube.coord("pressure").convert_units("hPa") 

726 

727 

728def _fix_um_radtime(cube: iris.cube.Cube): 

729 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

730 

731 This callback does not have any effect for output diagnostics with 

732 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

733 diagnostics are checked. 

734 Note this callback does not interpolate the data in time, only adjust 

735 timestamps to sit on the hour to enable time-to-time difference plotting 

736 with models which may output radiation data on the hour. 

737 """ 

738 try: 

739 if cube.attributes["STASH"] in [ 

740 "m01s01i207", 

741 "m01s01i208", 

742 "m01s02i205", 

743 "m01s02i201", 

744 "m01s01i207", 

745 "m01s02i207", 

746 "m01s01i235", 

747 ]: 

748 time_coord = cube.coord("time") 

749 

750 # Convert time points to datetime objects 

751 time_unit = time_coord.units 

752 time_points = time_unit.num2date(time_coord.points) 

753 # Skip if times don't need fixing. 

754 if time_points[0].minute == 0 and time_points[0].second == 0: 

755 return 

756 if time_points[0].minute == 30 and time_points[0].second == 0: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 return 

758 

759 # Subtract time difference from the hour from each time point 

760 n_minute = time_points[0].minute 

761 n_second = time_points[0].second 

762 # If times closer to next hour, compute difference to add on to following hour 

763 if n_minute > 30: 

764 n_minute = n_minute - 60 

765 # Compute new diagnostic time stamp 

766 new_time_points = ( 

767 time_points 

768 - datetime.timedelta(minutes=n_minute) 

769 - datetime.timedelta(seconds=n_second) 

770 ) 

771 

772 # Convert back to numeric values using the original time unit. 

773 new_time_values = time_unit.date2num(new_time_points) 

774 

775 # Replace the time coordinate with updated values. 

776 time_coord.points = new_time_values 

777 

778 # Recompute forecast_period with corrected values. 

779 if cube.coord("forecast_period"): 779 ↛ exitline 779 didn't return from function '_fix_um_radtime' because the condition on line 779 was always true

780 fcst_prd_points = cube.coord("forecast_period").points 

781 new_fcst_points = ( 

782 time_unit.num2date(fcst_prd_points) 

783 - datetime.timedelta(minutes=n_minute) 

784 - datetime.timedelta(seconds=n_second) 

785 ) 

786 cube.coord("forecast_period").points = time_unit.date2num( 

787 new_fcst_points 

788 ) 

789 except KeyError: 

790 pass 

791 

792 

793def _fix_cell_methods(cube: iris.cube.Cube): 

794 """To fix the assumed cell_methods in accumulation STASH from UM. 

795 

796 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

797 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

798 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

799 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

800 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

801 enabling the cell_method constraint on reading to select correct input. 

802 """ 

803 # Shift "mean" cell_method to "sum" for selected UM inputs. 

804 if cube.attributes.get("STASH") in [ 

805 "m01s21i104", 

806 "m01s04i201", 

807 "m01s04i202", 

808 "m01s05i201", 

809 "m01s05i202", 

810 ]: 

811 # Check if input cell_method contains "mean" time-processing. 

812 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 812 ↛ exitline 812 didn't return from function '_fix_cell_methods' because the condition on line 812 was always true

813 # Retrieve interval and any comment information. 

814 for cell_method in cube.cell_methods: 

815 interval_str = cell_method.intervals 

816 comment_str = cell_method.comments 

817 

818 # Remove input aggregation method. 

819 cube.cell_methods = () 

820 

821 # Replace "mean" with "sum" cell_method to indicate aggregation. 

822 cube.add_cell_method( 

823 iris.coords.CellMethod( 

824 method="sum", 

825 coords="time", 

826 intervals=interval_str, 

827 comments=comment_str, 

828 ) 

829 ) 

830 

831 

832def _convert_cube_units_callback(cube: iris.cube.Cube): 

833 """Adjust diagnostic units for specific variables. 

834 

835 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

836 converted here to mm hr-1. 

837 

838 Visibility diagnostics are converted here from m to km to improve output 

839 formatting. 

840 """ 

841 # Convert precipitation diagnostic units if required. 

842 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

843 if any("surface_microphysical" in name for name in varnames): 

844 if cube.units == "kg m-2 s-1": 

845 _log_once( 

846 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

847 level=logging.DEBUG, 

848 ) 

849 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

850 # This is a 1:1 conversion, so we just change the units. 

851 cube.units = "mm s-1" 

852 # Convert the units to per hour. 

853 cube.convert_units("mm hr-1") 

854 elif cube.units == "kg m-2": 854 ↛ 864line 854 didn't jump to line 864 because the condition on line 854 was always true

855 _log_once( 

856 "Converting precipitation amount units from kg m-2 to mm", 

857 level=logging.DEBUG, 

858 ) 

859 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

860 # This is a 1:1 conversion, so we just change the units. 

861 cube.units = "mm" 

862 

863 # Convert visibility diagnostic units if required. 

864 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

865 if any("visibility" in name for name in varnames): 

866 if cube.units == "m": 866 ↛ 871line 866 didn't jump to line 871 because the condition on line 866 was always true

867 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

868 # Convert the units to km. 

869 cube.convert_units("km") 

870 

871 return cube 

872 

873 

874def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

875 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

876 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

877 if any("cloud_base_altitude" in name for name in varnames): 

878 # Mask cube where set > 144kft to catch default 144.35695538058164 

879 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

880 

881 

882def _fix_um_winds(cubes: iris.cube.CubeList): 

883 """To make winds from the UM consistent with those from LFRic. 

884 

885 Diagnostics of wind are not always consistent between the UM 

886 and LFric. Here, winds from the UM are adjusted to make them i 

887 consistent with LFRic. 

888 """ 

889 # Check whether we have components of the wind identified by STASH, 

890 # (so this will apply only to cubes from the UM), but not the 

891 # wind speed and calculate it if it is missing. Note that 

892 # this will be biased low in general because the components will mostly 

893 # be time averages. For simplicity, we do this only if there is just one 

894 # cube of a component. A more complicated approach would be to consider 

895 # the cell methods, but it may not be warranted. 

896 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

897 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

898 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

899 try: 

900 if cubes.extract(u_constr) and cubes.extract(v_constr): 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true

901 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

902 _add_wind_speed_um(cubes) 

903 # Convert winds in the UM to be relative to true east and true north. 

904 _convert_wind_true_dirn_um(cubes) 

905 except (KeyError, AttributeError): 

906 pass 

907 

908 

909def _add_wind_speed_um(cubes: iris.cube.CubeList): 

910 """Add windspeeds to cubes from the UM.""" 

911 wspd10 = ( 

912 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

913 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

914 ) ** 0.5 

915 wspd10.attributes["STASH"] = "m01s03i227" 

916 wspd10.standard_name = "wind_speed" 

917 wspd10.long_name = "wind_speed_at_10m" 

918 cubes.append(wspd10) 

919 

920 

921def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

922 """To convert winds to true directions. 

923 

924 Convert from the components relative to the grid to true directions. 

925 This functionality only handles the simplest case. 

926 """ 

927 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

928 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

929 for u, v in zip(u_grids, v_grids, strict=True): 

930 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

931 u.data = true_u.core_data() 

932 v.data = true_v.core_data() 

933 

934 

935def _normalise_var0_varname(cube: iris.cube.Cube): 

936 """Fix varnames for consistency to allow merging. 

937 

938 Some model data netCDF sometimes have a coordinate name end in 

939 "_0" etc, where duplicate coordinates of same name are defined but 

940 with different attributes. This can be inconsistently managed in 

941 different model inputs and can cause cubes to fail to merge. 

942 """ 

943 for coord in cube.coords(): 

944 if coord.var_name and coord.var_name.endswith("_0"): 

945 coord.var_name = coord.var_name.removesuffix("_0") 

946 if coord.var_name and coord.var_name.endswith("_1"): 

947 coord.var_name = coord.var_name.removesuffix("_1") 

948 if coord.var_name and coord.var_name.endswith("_2"): 948 ↛ 949line 948 didn't jump to line 949 because the condition on line 948 was never true

949 coord.var_name = coord.var_name.removesuffix("_2") 

950 if coord.var_name and coord.var_name.endswith("_3"): 950 ↛ 951line 950 didn't jump to line 951 because the condition on line 950 was never true

951 coord.var_name = coord.var_name.removesuffix("_3") 

952 

953 if cube.var_name and cube.var_name.endswith("_0"): 

954 cube.var_name = cube.var_name.removesuffix("_0") 

955 

956 

957def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

958 """Convert the calendars of time units to use a standard calendar.""" 

959 try: 

960 time_coord = cube.coord("time") 

961 if time_coord.units.calendar == "proleptic_gregorian": 

962 logging.debug( 

963 "Changing proleptic Gregorian calendar to standard calendar for %s", 

964 repr(time_coord.units), 

965 ) 

966 time_coord.units = time_coord.units.change_calendar("standard") 

967 except iris.exceptions.CoordinateNotFoundError: 

968 pass 

969 

970 

971def _lfric_time_callback(cube: iris.cube.Cube): 

972 """Fix time coordinate metadata if missing dimensions. 

973 

974 Some model data does not contain forecast_reference_time or forecast_period as 

975 expected coordinates, and so we cannot aggregate over case studies without this 

976 metadata. This callback fixes these issues. 

977 

978 This callback also ensures all time coordinates are referenced as hours since 

979 1970-01-01 00:00:00 for consistency across different model inputs. 

980 

981 Notes 

982 ----- 

983 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

984 """ 

985 # Construct forecast_reference time if it doesn't exist. 

986 try: 

987 tcoord = cube.coord("time") 

988 # Set time coordinate to common basis "hours since 1970" 

989 try: 

990 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

991 except ValueError: 

992 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

993 

994 if not cube.coords("forecast_reference_time"): 

995 try: 

996 init_time = datetime.datetime.fromisoformat( 

997 tcoord.attributes["time_origin"] 

998 ) 

999 frt_point = tcoord.units.date2num(init_time) 

1000 frt_coord = iris.coords.AuxCoord( 

1001 frt_point, 

1002 units=tcoord.units, 

1003 standard_name="forecast_reference_time", 

1004 long_name="forecast_reference_time", 

1005 ) 

1006 cube.add_aux_coord(frt_coord) 

1007 except KeyError: 

1008 logging.warning( 

1009 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1010 ) 

1011 

1012 # Remove time_origin to allow multiple case studies to merge. 

1013 tcoord.attributes.pop("time_origin", None) 

1014 

1015 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1016 if not cube.coords("forecast_period"): 

1017 try: 

1018 # Create array of forecast lead times. 

1019 init_coord = cube.coord("forecast_reference_time") 

1020 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1021 init_coord.units.num2date(init_coord.points) 

1022 ) 

1023 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1024 

1025 # Get unit for lead time from time coordinate's unit. 

1026 # Convert all lead time to hours for consistency between models. 

1027 if "seconds" in str(tcoord.units): 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true

1028 lead_times = lead_times / 3600.0 

1029 units = "hours" 

1030 elif "hours" in str(tcoord.units): 1030 ↛ 1033line 1030 didn't jump to line 1033 because the condition on line 1030 was always true

1031 units = "hours" 

1032 else: 

1033 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1034 

1035 # Create lead time coordinate. 

1036 lead_time_coord = iris.coords.AuxCoord( 

1037 lead_times, 

1038 standard_name="forecast_period", 

1039 long_name="forecast_period", 

1040 units=units, 

1041 ) 

1042 

1043 # Associate lead time coordinate with time dimension. 

1044 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1045 except iris.exceptions.CoordinateNotFoundError: 

1046 logging.warning( 

1047 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1048 ) 

1049 except iris.exceptions.CoordinateNotFoundError: 

1050 logging.warning("No time coordinate on cube.") 

1051 

1052 

1053def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1054 """Check forecast_period name and units.""" 

1055 try: 

1056 coord = cube.coord("forecast_period") 

1057 if coord.units != "hours": 

1058 cube.coord("forecast_period").convert_units("hours") 

1059 if not coord.standard_name: 

1060 coord.standard_name = "forecast_period" 

1061 except iris.exceptions.CoordinateNotFoundError: 

1062 pass 

1063 

1064 

1065def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1066 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1067 # Only add if time coordinate does not exist. 

1068 if not cube.coords("time"): 

1069 cube.add_aux_coord( 

1070 iris.coords.DimCoord( 

1071 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1072 ) 

1073 ) 

1074 

1075 return cube 

1076 

1077 

1078def _normalise_ML_varname(cube: iris.cube.Cube): 

1079 """Fix plev variable names to standard names.""" 

1080 if cube.coords("pressure"): 

1081 if cube.name() == "x_wind": 

1082 cube.long_name = "zonal_wind_at_pressure_levels" 

1083 if cube.name() == "y_wind": 

1084 cube.long_name = "meridional_wind_at_pressure_levels" 

1085 if cube.name() == "air_temperature": 

1086 cube.long_name = "temperature_at_pressure_levels" 

1087 if cube.name() == "specific_humidity": 1087 ↛ 1088line 1087 didn't jump to line 1088 because the condition on line 1087 was never true

1088 cube.long_name = ( 

1089 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1090 )