Coverage for src/CSET/operators/read.py: 91%

407 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-27 12:45 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43from CSET.operators.regrid import restructure_ugrid 

44 

45 

46class NoDataError(FileNotFoundError): 

47 """Error that no data has been loaded.""" 

48 

49 

50def read_cube( 

51 file_paths: list[str] | str, 

52 constraint: iris.Constraint = None, 

53 model_names: list[str] | str | None = None, 

54 subarea_type: str = None, 

55 subarea_extent: list[float] = None, 

56 **kwargs, 

57) -> iris.cube.Cube: 

58 """Read a single cube from files. 

59 

60 Read operator that takes a path string (can include shell-style glob 

61 patterns), and loads the cube matching the constraint. If any paths point to 

62 directory, all the files contained within are loaded. 

63 

64 Ensemble data can also be loaded. If it has a realization coordinate 

65 already, it will be directly used. If not, it will have its member number 

66 guessed from the filename, based on one of several common patterns. For 

67 example the pattern *emXX*, where XX is the realization. 

68 

69 Deterministic data will be loaded with a realization of 0, allowing it to be 

70 processed in the same way as ensemble data. 

71 

72 Arguments 

73 --------- 

74 file_paths: str | list[str] 

75 Path or paths to where .pp/.nc files are located 

76 constraint: iris.Constraint | iris.ConstraintCombination, optional 

77 Constraints to filter data by. Defaults to unconstrained. 

78 model_names: str | list[str], optional 

79 Names of the models that correspond to respective paths in file_paths. 

80 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

81 Whether to constrain data by model relative coordinates or real world 

82 coordinates. 

83 subarea_extent: list, optional 

84 List of coordinates to constraint data by, in order lower latitude, 

85 upper latitude, lower longitude, upper longitude. 

86 

87 Returns 

88 ------- 

89 cubes: iris.cube.Cube 

90 Cube loaded 

91 

92 Raises 

93 ------ 

94 FileNotFoundError 

95 If the provided path does not exist 

96 ValueError 

97 If the constraint doesn't produce a single cube. 

98 """ 

99 cubes = read_cubes( 

100 file_paths=file_paths, 

101 constraint=constraint, 

102 model_names=model_names, 

103 subarea_type=subarea_type, 

104 subarea_extent=subarea_extent, 

105 ) 

106 # Check filtered cubes is a CubeList containing one cube. 

107 if len(cubes) == 1: 

108 return cubes[0] 

109 else: 

110 raise ValueError( 

111 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

112 ) 

113 

114 

115def read_cubes( 

116 file_paths: list[str] | str, 

117 constraint: iris.Constraint | None = None, 

118 model_names: str | list[str] | None = None, 

119 subarea_type: str = None, 

120 subarea_extent: list = None, 

121 **kwargs, 

122) -> iris.cube.CubeList: 

123 """Read cubes from files. 

124 

125 Read operator that takes a path string (can include shell-style glob 

126 patterns), and loads the cubes matching the constraint. If any paths point 

127 to directory, all the files contained within are loaded. 

128 

129 Ensemble data can also be loaded. If it has a realization coordinate 

130 already, it will be directly used. If not, it will have its member number 

131 guessed from the filename, based on one of several common patterns. For 

132 example the pattern *emXX*, where XX is the realization. 

133 

134 Deterministic data will be loaded with a realization of 0, allowing it to be 

135 processed in the same way as ensemble data. 

136 

137 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

138 that the cubes merge across files. 

139 

140 Arguments 

141 --------- 

142 file_paths: str | list[str] 

143 Path or paths to where .pp/.nc files are located. Can include globs. 

144 constraint: iris.Constraint | iris.ConstraintCombination, optional 

145 Constraints to filter data by. Defaults to unconstrained. 

146 model_names: str | list[str], optional 

147 Names of the models that correspond to respective paths in file_paths. 

148 subarea_type: str, optional 

149 Whether to constrain data by model relative coordinates or real world 

150 coordinates. 

151 subarea_extent: list[float], optional 

152 List of coordinates to constraint data by, in order lower latitude, 

153 upper latitude, lower longitude, upper longitude. 

154 

155 Returns 

156 ------- 

157 cubes: iris.cube.CubeList 

158 Cubes loaded after being merged and concatenated. 

159 

160 Raises 

161 ------ 

162 FileNotFoundError 

163 If the provided path does not exist 

164 """ 

165 # Get iterable of paths. Each path corresponds to 1 model. 

166 paths = iter_maybe(file_paths) 

167 model_names = iter_maybe(model_names) 

168 

169 # Check we have appropriate number of model names. 

170 if model_names != (None,) and len(model_names) != len(paths): 

171 raise ValueError( 

172 f"The number of model names ({len(model_names)}) should equal " 

173 f"the number of paths given ({len(paths)})." 

174 ) 

175 

176 # Load the data for each model into a CubeList per model. 

177 model_cubes = ( 

178 _load_model(path, name, constraint) 

179 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

180 ) 

181 

182 # Split out first model's cubes and mark it as the base for comparisons. 

183 cubes = next(model_cubes) 

184 for cube in cubes: 

185 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

186 cube.attributes["cset_comparison_base"] = 1 

187 

188 # Load the rest of the models. 

189 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

190 

191 # Unify time units so different case studies can merge. 

192 iris.util.unify_time_units(cubes) 

193 

194 # Select sub region. 

195 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

196 

197 # Merge and concatenate cubes now metadata has been fixed. 

198 cubes = _merge_cubes_check_ensemble(cubes) 

199 cubes = cubes.concatenate() 

200 

201 # Squeeze single valued coordinates into scalar coordinates. 

202 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

203 

204 # Ensure dimension coordinates are bounded. 

205 for cube in cubes: 

206 for dim_coord in cube.coords(dim_coords=True): 

207 # Iris can't guess the bounds of a scalar coordinate. 

208 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

209 dim_coord.guess_bounds() 

210 

211 logging.info("Loaded cubes: %s", cubes) 

212 if len(cubes) == 0: 

213 raise NoDataError("No cubes loaded, check your constraints!") 

214 return cubes 

215 

216 

217def _load_model( 

218 paths: str | list[str], 

219 model_name: str | None, 

220 constraint: iris.Constraint | None, 

221) -> iris.cube.CubeList: 

222 """Load a single model's data into a CubeList.""" 

223 input_files = _check_input_files(paths) 

224 # If unset, a constraint of None lets everything be loaded. 

225 logging.debug("Constraint: %s", constraint) 

226 

227 cubes = iris.load(input_files) 

228 

229 # If a cube called latitude exists, chances are its unstructured. 

230 # Using extract, not extract_cubes in a try as there might be more 

231 # than one cube called latitude if we are aggregating. 

232 if len(cubes.extract("latitude")) > 0: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 cubes = restructure_ugrid(cubes) 

234 

235 for cube in cubes: 

236 _loading_callback(cube, None, None) 

237 

238 # Extract required cubes based on constraints 

239 cubes = cubes.extract(constraint) 

240 

241 # cubes = iris.load(input_files, constraint, callback=_loading_callback) 

242 # Make the UM's winds consistent with LFRic. 

243 _fix_um_winds(cubes) 

244 

245 # Add model_name attribute to each cube to make it available at any further 

246 # step without needing to pass it as function parameter. 

247 if model_name is not None: 

248 for cube in cubes: 

249 cube.attributes["model_name"] = model_name 

250 return cubes 

251 

252 

253def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

254 """Get an iterable of files to load, and check that they all exist. 

255 

256 Arguments 

257 --------- 

258 input_paths: list[str] 

259 List of paths to input files or directories. The path may itself contain 

260 glob patterns, but unlike in shells it will match directly first. 

261 

262 Returns 

263 ------- 

264 list[Path] 

265 A list of files to load. 

266 

267 Raises 

268 ------ 

269 FileNotFoundError: 

270 If the provided arguments don't resolve to at least one existing file. 

271 """ 

272 files = [] 

273 for raw_filename in iter_maybe(input_paths): 

274 # Match glob-like files first, if they exist. 

275 raw_path = Path(raw_filename) 

276 if raw_path.is_file(): 

277 files.append(raw_path) 

278 else: 

279 for input_path in glob.glob(raw_filename): 

280 # Convert string paths into Path objects. 

281 input_path = Path(input_path) 

282 # Get the list of files in the directory, or use it directly. 

283 if input_path.is_dir(): 

284 logging.debug("Checking directory '%s' for files", input_path) 

285 files.extend(p for p in input_path.iterdir() if p.is_file()) 

286 else: 

287 files.append(input_path) 

288 

289 files.sort() 

290 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

291 if len(files) == 0: 

292 raise FileNotFoundError(f"No files found for {input_paths}") 

293 return files 

294 

295 

296def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

297 """Merge CubeList, renumbering realizations of 0 if required. 

298 

299 An unsuccessful merge indicates common input cube attributes, most 

300 commonly from ensemble members missing an explicit realization 

301 coordinate. Therefore the members are renumbered before being merged 

302 again. 

303 """ 

304 try: 

305 cubes = cubes.merge() 

306 except iris.exceptions.MergeError: 

307 _log_once( 

308 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

309 level=logging.WARNING, 

310 ) 

311 for ir, cube in enumerate(cubes): 

312 if cube.coord("realization").points == 0: 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true

313 cube.coord("realization").points = ir + 1 

314 cubes = cubes.merge() 

315 return cubes 

316 

317 

318def _cutout_cubes( 

319 cubes: iris.cube.CubeList, 

320 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

321 subarea_extent: list[float], 

322): 

323 """Cut out a subarea from a CubeList.""" 

324 if subarea_type is None: 

325 logging.debug("Subarea selection is disabled.") 

326 return cubes 

327 

328 # If selected, cutout according to number of grid cells to trim from each edge. 

329 cutout_cubes = iris.cube.CubeList() 

330 # Find spatial coordinates 

331 for cube in cubes: 

332 # Find dimension coordinates. 

333 lat_name, lon_name = get_cube_yxcoordname(cube) 

334 

335 # Compute cutout based on number of cells to trim from edges. 

336 if subarea_type == "gridcells": 

337 logging.debug( 

338 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

339 subarea_extent[0], 

340 subarea_extent[1], 

341 subarea_extent[2], 

342 subarea_extent[3], 

343 ) 

344 lat_points = np.sort(cube.coord(lat_name).points) 

345 lon_points = np.sort(cube.coord(lon_name).points) 

346 # Define cutout region using user provided cell points. 

347 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

348 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

349 

350 # Compute cutout based on specified coordinate values. 

351 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

352 # If not gridcells, cutout by requested geographic area, 

353 logging.debug( 

354 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

355 subarea_extent[0], 

356 subarea_extent[1], 

357 subarea_extent[2], 

358 subarea_extent[3], 

359 ) 

360 # Define cutout region using user provided coordinates. 

361 lats = np.array(subarea_extent[0:2]) 

362 lons = np.array(subarea_extent[2:4]) 

363 # Ensure cutout longitudes are within +/- 180.0 bounds. 

364 while lons[0] < -180.0: 

365 lons += 360.0 

366 while lons[1] > 180.0: 

367 lons -= 360.0 

368 # If the coordinate system is rotated we convert coordinates into 

369 # model-relative coordinates to extract the appropriate cutout. 

370 coord_system = cube.coord(lat_name).coord_system 

371 if subarea_type == "realworld" and isinstance( 

372 coord_system, iris.coord_systems.RotatedGeogCS 

373 ): 

374 lons, lats = rotate_pole( 

375 lons, 

376 lats, 

377 pole_lon=coord_system.grid_north_pole_longitude, 

378 pole_lat=coord_system.grid_north_pole_latitude, 

379 ) 

380 else: 

381 raise ValueError("Unknown subarea_type:", subarea_type) 

382 

383 # Do cutout and add to cutout_cubes. 

384 intersection_args = {lat_name: lats, lon_name: lons} 

385 logging.debug("Cutting out coords: %s", intersection_args) 

386 try: 

387 cutout_cubes.append(cube.intersection(**intersection_args)) 

388 except IndexError as err: 

389 raise ValueError( 

390 "Region cutout error. Check and update SUBAREA_EXTENT." 

391 "Cutout region requested should be contained within data area. " 

392 "Also check if cutout region requested is smaller than input grid spacing." 

393 ) from err 

394 

395 return cutout_cubes 

396 

397 

398def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

399 """Compose together the needed callbacks into a single function.""" 

400 # Most callbacks operate in-place, but save the cube when returned! 

401 _realization_callback(cube) 

402 _um_normalise_callback(cube) 

403 _lfric_normalise_callback(cube) 

404 cube = _lfric_time_coord_fix_callback(cube) 

405 _normalise_var0_varname(cube) 

406 cube = _fix_no_spatial_coords_callback(cube) 

407 _fix_spatial_coords_callback(cube) 

408 _fix_pressure_coord_callback(cube) 

409 _fix_um_radtime(cube) 

410 _fix_cell_methods(cube) 

411 cube = _convert_cube_units_callback(cube) 

412 cube = _grid_longitude_fix_callback(cube) 

413 _fix_lfric_cloud_base_altitude(cube) 

414 _proleptic_gregorian_fix(cube) 

415 _lfric_time_callback(cube) 

416 _lfric_forecast_period_callback(cube) 

417 cube = _fix_no_time_coords_callback(cube) 

418 _normalise_ML_varname(cube) 

419 return cube 

420 

421 

422def _realization_callback(cube): 

423 """Add a realization coordinate initialised to 0 if missing. 

424 

425 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

426 of the code. 

427 """ 

428 # Only add if realization coordinate does not exist. 

429 if not cube.coords("realization"): 

430 cube.add_aux_coord( 

431 iris.coords.DimCoord(0, standard_name="realization", units="1") 

432 ) 

433 

434 

435@functools.lru_cache(None) 

436def _log_once(msg, level=logging.WARNING): 

437 """Print a warning message, skipping recent duplicates.""" 

438 logging.log(level, msg) 

439 

440 

441def _um_normalise_callback(cube: iris.cube.Cube): 

442 """Normalise UM STASH variable long names to LFRic variable names. 

443 

444 Note standard names will remain associated with cubes where different. 

445 Long name will be used consistently in output filename and titles. 

446 """ 

447 # Convert STASH to LFRic variable name 

448 if "STASH" in cube.attributes: 

449 stash = cube.attributes["STASH"] 

450 try: 

451 (name, grid) = STASH_TO_LFRIC[str(stash)] 

452 cube.long_name = name 

453 except KeyError: 

454 # Don't change cubes with unknown stash codes. 

455 _log_once( 

456 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

457 level=logging.WARNING, 

458 ) 

459 

460 

461def _lfric_normalise_callback(cube: iris.cube.Cube): 

462 """Normalise attributes that prevents LFRic cube from merging. 

463 

464 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

465 no relation to the data contained. These attributes are removed. 

466 

467 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

468 multiple UM fields, however it can be encoded in any order. This attribute 

469 is sorted to prevent this. This attribute is only present in LFRic data that 

470 has been converted to look like UM data. 

471 """ 

472 # Remove unwanted attributes. 

473 cube.attributes.pop("timeStamp", None) 

474 cube.attributes.pop("uuid", None) 

475 cube.attributes.pop("name", None) 

476 cube.attributes.pop("source", None) 

477 cube.attributes.pop("analysis_source", None) 

478 cube.attributes.pop("history", None) 

479 

480 # Sort STASH code list. 

481 stash_list = cube.attributes.get("um_stash_source") 

482 if stash_list: 

483 # Parse the string as a list, sort, then re-encode as a string. 

484 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

485 

486 

487def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

488 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

489 

490 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

491 issue, though the coordinate satisfies all the properties for a DimCoord. 

492 Scalar time values are left as AuxCoords. 

493 """ 

494 # This issue seems to come from iris's handling of NetCDF files where time 

495 # always ends up as an AuxCoord. 

496 if cube.coords("time"): 

497 time_coord = cube.coord("time") 

498 if ( 

499 not isinstance(time_coord, iris.coords.DimCoord) 

500 and len(cube.coord_dims(time_coord)) == 1 

501 ): 

502 # Fudge the bounds to foil checking for strict monotonicity. 

503 if time_coord.has_bounds(): 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

505 time_coord.bounds = [ 

506 [ 

507 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

508 time_coord.bounds[i][1], 

509 ] 

510 for i in range(len(time_coord.bounds)) 

511 ] 

512 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

513 return cube 

514 

515 

516def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

517 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

518 

519 This is necessary if comparing two models with different conventions -- 

520 for example, models where the prime meridian is defined as 0 deg or 

521 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

522 so that it falls in this range. Checks are for near-180 bounds given 

523 model data bounds may not extend exactly to 0. or 360. 

524 Input cubes on non-rotated grid coordinates are not impacted. 

525 """ 

526 try: 

527 y, x = get_cube_yxcoordname(cube) 

528 except ValueError: 

529 # Don't modify non-spatial cubes. 

530 return cube 

531 

532 long_coord = cube.coord(x) 

533 # Wrap longitudes if rotated pole coordinates 

534 coord_system = long_coord.coord_system 

535 if x == "grid_longitude" and isinstance( 

536 coord_system, iris.coord_systems.RotatedGeogCS 

537 ): 

538 long_points = long_coord.points.copy() 

539 long_centre = np.median(long_points) 

540 while long_centre < -175.0: 

541 long_centre += 360.0 

542 long_points += 360.0 

543 while long_centre >= 175.0: 

544 long_centre -= 360.0 

545 long_points -= 360.0 

546 long_coord.points = long_points 

547 

548 # Update coord bounds to be consistent with wrapping. 

549 if long_coord.has_bounds(): 

550 long_coord.bounds = None 

551 long_coord.guess_bounds() 

552 

553 return cube 

554 

555 

556def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

557 import CSET.operators._utils as utils 

558 

559 # Don't modify spatial cubes that already have spatial dimensions 

560 if utils.is_spatialdim(cube): 

561 return cube 

562 

563 else: 

564 # attempt to get lat/long from cube attributes 

565 try: 

566 lat_min = cube.attributes.get("geospatial_lat_min") 

567 lat_max = cube.attributes.get("geospatial_lat_max") 

568 lon_min = cube.attributes.get("geospatial_lon_min") 

569 lon_max = cube.attributes.get("geospatial_lon_max") 

570 

571 lon_val = (lon_min + lon_max) / 2.0 

572 lat_val = (lat_min + lat_max) / 2.0 

573 

574 lat_coord = iris.coords.DimCoord( 

575 lat_val, 

576 standard_name="latitude", 

577 units="degrees_north", 

578 var_name="latitude", 

579 coord_system=iris.coord_systems.GeogCS(6371229.0), 

580 circular=True, 

581 ) 

582 

583 lon_coord = iris.coords.DimCoord( 

584 lon_val, 

585 standard_name="longitude", 

586 units="degrees_east", 

587 var_name="longitude", 

588 coord_system=iris.coord_systems.GeogCS(6371229.0), 

589 circular=True, 

590 ) 

591 

592 cube.add_aux_coord(lat_coord) 

593 cube.add_aux_coord(lon_coord) 

594 return cube 

595 

596 # if lat/long are not in attributes, then return cube unchanged: 

597 except TypeError: 

598 return cube 

599 

600 

601def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

602 """Check latitude and longitude coordinates name. 

603 

604 This is necessary as some models define their grid as on rotated 

605 'grid_latitude' and 'grid_longitude' coordinates while others define 

606 the grid on non-rotated 'latitude' and 'longitude'. 

607 Cube dimensions need to be made consistent to avoid recipe failures, 

608 particularly where comparing multiple input models with differing spatial 

609 coordinates. 

610 """ 

611 # Check if cube is spatial. 

612 if not is_spatialdim(cube): 

613 # Don't modify non-spatial cubes. 

614 return 

615 

616 # Get spatial coords and dimension index. 

617 y_name, x_name = get_cube_yxcoordname(cube) 

618 ny = get_cube_coordindex(cube, y_name) 

619 nx = get_cube_coordindex(cube, x_name) 

620 

621 # Remove spatial coords bounds if erroneous values detected. 

622 # Aims to catch some errors in input coord bounds by setting 

623 # invalid threshold of 10000.0 

624 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

625 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

626 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

627 if bx_max > 10000.0 or by_max > 10000.0: 

628 cube.coord(x_name).bounds = None 

629 cube.coord(y_name).bounds = None 

630 

631 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

632 # [latitude, longitude] for instances where rotated_pole=90.0 

633 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

634 coord_system = cube.coord("grid_latitude").coord_system 

635 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

636 if pole_lat == 90.0: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true

637 lats = cube.coord("grid_latitude").points 

638 lons = cube.coord("grid_longitude").points 

639 

640 cube.remove_coord("grid_latitude") 

641 cube.add_dim_coord( 

642 iris.coords.DimCoord( 

643 lats, 

644 standard_name="latitude", 

645 var_name="latitude", 

646 units="degrees", 

647 coord_system=iris.coord_systems.GeogCS(6371229.0), 

648 circular=True, 

649 ), 

650 ny, 

651 ) 

652 y_name = "latitude" 

653 cube.remove_coord("grid_longitude") 

654 cube.add_dim_coord( 

655 iris.coords.DimCoord( 

656 lons, 

657 standard_name="longitude", 

658 var_name="longitude", 

659 units="degrees", 

660 coord_system=iris.coord_systems.GeogCS(6371229.0), 

661 circular=True, 

662 ), 

663 nx, 

664 ) 

665 x_name = "longitude" 

666 

667 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

668 # rotated pole attributes for cases with [lat, lon] inputs 

669 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

670 "degrees", 

671 "degrees_north", 

672 "degrees_south", 

673 ]: 

674 # Add grid_latitude AuxCoord 

675 if "grid_latitude" not in [ 

676 coord.name() for coord in cube.coords(dim_coords=False) 

677 ]: 

678 cube.add_aux_coord( 

679 iris.coords.AuxCoord( 

680 cube.coord(y_name).points, 

681 var_name="grid_latitude", 

682 units="degrees", 

683 ), 

684 ny, 

685 ) 

686 # Ensure input latitude DimCoord has CoordSystem 

687 # This attribute is sometimes lost on iris.save 

688 if not cube.coord(y_name).coord_system: 

689 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

690 

691 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

692 "degrees", 

693 "degrees_west", 

694 "degrees_east", 

695 ]: 

696 # Add grid_longitude AuxCoord 

697 if "grid_longitude" not in [ 

698 coord.name() for coord in cube.coords(dim_coords=False) 

699 ]: 

700 cube.add_aux_coord( 

701 iris.coords.AuxCoord( 

702 cube.coord(x_name).points, 

703 var_name="grid_longitude", 

704 units="degrees", 

705 ), 

706 nx, 

707 ) 

708 

709 # Ensure input longitude DimCoord has CoordSystem 

710 # This attribute is sometimes lost on iris.save 

711 if not cube.coord(x_name).coord_system: 

712 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

713 

714 

715def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

716 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

717 

718 This problem was raised because the AIFS model data from ECMWF 

719 defines the pressure coordinate with the name "pressure_level" rather 

720 than compliant CF coordinate names. 

721 

722 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

723 and approach the coordinates in a unified way. 

724 """ 

725 for coord in cube.dim_coords: 

726 if coord.name() in ["pressure_level", "pressure_levels"]: 

727 coord.rename("pressure") 

728 

729 if coord.name() == "pressure": 

730 if str(cube.coord("pressure").units) != "hPa": 

731 cube.coord("pressure").convert_units("hPa") 

732 

733 

734def _fix_um_radtime(cube: iris.cube.Cube): 

735 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

736 

737 This callback does not have any effect for output diagnostics with 

738 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

739 diagnostics are checked. 

740 Note this callback does not interpolate the data in time, only adjust 

741 timestamps to sit on the hour to enable time-to-time difference plotting 

742 with models which may output radiation data on the hour. 

743 """ 

744 try: 

745 if cube.attributes["STASH"] in [ 

746 "m01s01i207", 

747 "m01s01i208", 

748 "m01s02i205", 

749 "m01s02i201", 

750 "m01s01i207", 

751 "m01s02i207", 

752 "m01s01i235", 

753 ]: 

754 time_coord = cube.coord("time") 

755 

756 # Convert time points to datetime objects 

757 time_unit = time_coord.units 

758 time_points = time_unit.num2date(time_coord.points) 

759 # Skip if times don't need fixing. 

760 if time_points[0].minute == 0 and time_points[0].second == 0: 

761 return 

762 if time_points[0].minute == 30 and time_points[0].second == 0: 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true

763 return 

764 

765 # Subtract time difference from the hour from each time point 

766 n_minute = time_points[0].minute 

767 n_second = time_points[0].second 

768 # If times closer to next hour, compute difference to add on to following hour 

769 if n_minute > 30: 

770 n_minute = n_minute - 60 

771 # Compute new diagnostic time stamp 

772 new_time_points = ( 

773 time_points 

774 - datetime.timedelta(minutes=n_minute) 

775 - datetime.timedelta(seconds=n_second) 

776 ) 

777 

778 # Convert back to numeric values using the original time unit. 

779 new_time_values = time_unit.date2num(new_time_points) 

780 

781 # Replace the time coordinate with updated values. 

782 time_coord.points = new_time_values 

783 

784 # Recompute forecast_period with corrected values. 

785 if cube.coord("forecast_period"): 785 ↛ exitline 785 didn't return from function '_fix_um_radtime' because the condition on line 785 was always true

786 fcst_prd_points = cube.coord("forecast_period").points 

787 new_fcst_points = ( 

788 time_unit.num2date(fcst_prd_points) 

789 - datetime.timedelta(minutes=n_minute) 

790 - datetime.timedelta(seconds=n_second) 

791 ) 

792 cube.coord("forecast_period").points = time_unit.date2num( 

793 new_fcst_points 

794 ) 

795 except KeyError: 

796 pass 

797 

798 

799def _fix_cell_methods(cube: iris.cube.Cube): 

800 """To fix the assumed cell_methods in accumulation STASH from UM. 

801 

802 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

803 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

804 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

805 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

806 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

807 enabling the cell_method constraint on reading to select correct input. 

808 """ 

809 # Shift "mean" cell_method to "sum" for selected UM inputs. 

810 if cube.attributes.get("STASH") in [ 

811 "m01s21i104", 

812 "m01s04i201", 

813 "m01s04i202", 

814 "m01s05i201", 

815 "m01s05i202", 

816 ]: 

817 # Check if input cell_method contains "mean" time-processing. 

818 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 818 ↛ exitline 818 didn't return from function '_fix_cell_methods' because the condition on line 818 was always true

819 # Retrieve interval and any comment information. 

820 for cell_method in cube.cell_methods: 

821 interval_str = cell_method.intervals 

822 comment_str = cell_method.comments 

823 

824 # Remove input aggregation method. 

825 cube.cell_methods = () 

826 

827 # Replace "mean" with "sum" cell_method to indicate aggregation. 

828 cube.add_cell_method( 

829 iris.coords.CellMethod( 

830 method="sum", 

831 coords="time", 

832 intervals=interval_str, 

833 comments=comment_str, 

834 ) 

835 ) 

836 

837 

838def _convert_cube_units_callback(cube: iris.cube.Cube): 

839 """Adjust diagnostic units for specific variables. 

840 

841 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

842 converted here to mm hr-1. 

843 

844 Visibility diagnostics are converted here from m to km to improve output 

845 formatting. 

846 """ 

847 # Convert precipitation diagnostic units if required. 

848 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

849 if any("surface_microphysical" in name for name in varnames): 

850 if cube.units == "kg m-2 s-1": 

851 _log_once( 

852 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

853 level=logging.DEBUG, 

854 ) 

855 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

856 # This is a 1:1 conversion, so we just change the units. 

857 cube.units = "mm s-1" 

858 # Convert the units to per hour. 

859 cube.convert_units("mm hr-1") 

860 elif cube.units == "kg m-2": 860 ↛ 870line 860 didn't jump to line 870 because the condition on line 860 was always true

861 _log_once( 

862 "Converting precipitation amount units from kg m-2 to mm", 

863 level=logging.DEBUG, 

864 ) 

865 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

866 # This is a 1:1 conversion, so we just change the units. 

867 cube.units = "mm" 

868 

869 # Convert visibility diagnostic units if required. 

870 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

871 if any("visibility" in name for name in varnames): 

872 if cube.units == "m": 872 ↛ 877line 872 didn't jump to line 877 because the condition on line 872 was always true

873 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

874 # Convert the units to km. 

875 cube.convert_units("km") 

876 

877 return cube 

878 

879 

880def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

881 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

882 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

883 if any("cloud_base_altitude" in name for name in varnames): 

884 # Mask cube where set > 144kft to catch default 144.35695538058164 

885 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

886 

887 

888def _fix_um_winds(cubes: iris.cube.CubeList): 

889 """To make winds from the UM consistent with those from LFRic. 

890 

891 Diagnostics of wind are not always consistent between the UM 

892 and LFric. Here, winds from the UM are adjusted to make them i 

893 consistent with LFRic. 

894 """ 

895 # Check whether we have components of the wind identified by STASH, 

896 # (so this will apply only to cubes from the UM), but not the 

897 # wind speed and calculate it if it is missing. Note that 

898 # this will be biased low in general because the components will mostly 

899 # be time averages. For simplicity, we do this only if there is just one 

900 # cube of a component. A more complicated approach would be to consider 

901 # the cell methods, but it may not be warranted. 

902 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

903 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

904 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

905 try: 

906 if cubes.extract(u_constr) and cubes.extract(v_constr): 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true

907 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

908 _add_wind_speed_um(cubes) 

909 # Convert winds in the UM to be relative to true east and true north. 

910 _convert_wind_true_dirn_um(cubes) 

911 except (KeyError, AttributeError): 

912 pass 

913 

914 

915def _add_wind_speed_um(cubes: iris.cube.CubeList): 

916 """Add windspeeds to cubes from the UM.""" 

917 wspd10 = ( 

918 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

919 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

920 ) ** 0.5 

921 wspd10.attributes["STASH"] = "m01s03i227" 

922 wspd10.standard_name = "wind_speed" 

923 wspd10.long_name = "wind_speed_at_10m" 

924 cubes.append(wspd10) 

925 

926 

927def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

928 """To convert winds to true directions. 

929 

930 Convert from the components relative to the grid to true directions. 

931 This functionality only handles the simplest case. 

932 """ 

933 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

934 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

935 for u, v in zip(u_grids, v_grids, strict=True): 

936 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

937 u.data = true_u.core_data() 

938 v.data = true_v.core_data() 

939 

940 

941def _normalise_var0_varname(cube: iris.cube.Cube): 

942 """Fix varnames for consistency to allow merging. 

943 

944 Some model data netCDF sometimes have a coordinate name end in 

945 "_0" etc, where duplicate coordinates of same name are defined but 

946 with different attributes. This can be inconsistently managed in 

947 different model inputs and can cause cubes to fail to merge. 

948 """ 

949 for coord in cube.coords(): 

950 if coord.var_name and coord.var_name.endswith("_0"): 

951 coord.var_name = coord.var_name.removesuffix("_0") 

952 if coord.var_name and coord.var_name.endswith("_1"): 

953 coord.var_name = coord.var_name.removesuffix("_1") 

954 if coord.var_name and coord.var_name.endswith("_2"): 954 ↛ 955line 954 didn't jump to line 955 because the condition on line 954 was never true

955 coord.var_name = coord.var_name.removesuffix("_2") 

956 if coord.var_name and coord.var_name.endswith("_3"): 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true

957 coord.var_name = coord.var_name.removesuffix("_3") 

958 

959 if cube.var_name and cube.var_name.endswith("_0"): 

960 cube.var_name = cube.var_name.removesuffix("_0") 

961 

962 

963def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

964 """Convert the calendars of time units to use a standard calendar.""" 

965 try: 

966 time_coord = cube.coord("time") 

967 if time_coord.units.calendar == "proleptic_gregorian": 

968 logging.debug( 

969 "Changing proleptic Gregorian calendar to standard calendar for %s", 

970 repr(time_coord.units), 

971 ) 

972 time_coord.units = time_coord.units.change_calendar("standard") 

973 except iris.exceptions.CoordinateNotFoundError: 

974 pass 

975 

976 

977def _lfric_time_callback(cube: iris.cube.Cube): 

978 """Fix time coordinate metadata if missing dimensions. 

979 

980 Some model data does not contain forecast_reference_time or forecast_period as 

981 expected coordinates, and so we cannot aggregate over case studies without this 

982 metadata. This callback fixes these issues. 

983 

984 This callback also ensures all time coordinates are referenced as hours since 

985 1970-01-01 00:00:00 for consistency across different model inputs. 

986 

987 Notes 

988 ----- 

989 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

990 """ 

991 # Construct forecast_reference time if it doesn't exist. 

992 try: 

993 tcoord = cube.coord("time") 

994 # Set time coordinate to common basis "hours since 1970" 

995 try: 

996 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

997 except ValueError: 

998 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

999 

1000 if not cube.coords("forecast_reference_time"): 

1001 try: 

1002 init_time = datetime.datetime.fromisoformat( 

1003 tcoord.attributes["time_origin"] 

1004 ) 

1005 frt_point = tcoord.units.date2num(init_time) 

1006 frt_coord = iris.coords.AuxCoord( 

1007 frt_point, 

1008 units=tcoord.units, 

1009 standard_name="forecast_reference_time", 

1010 long_name="forecast_reference_time", 

1011 ) 

1012 cube.add_aux_coord(frt_coord) 

1013 except KeyError: 

1014 logging.warning( 

1015 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1016 ) 

1017 

1018 # Remove time_origin to allow multiple case studies to merge. 

1019 tcoord.attributes.pop("time_origin", None) 

1020 

1021 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1022 if not cube.coords("forecast_period"): 

1023 try: 

1024 # Create array of forecast lead times. 

1025 init_coord = cube.coord("forecast_reference_time") 

1026 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1027 init_coord.units.num2date(init_coord.points) 

1028 ) 

1029 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1030 

1031 # Get unit for lead time from time coordinate's unit. 

1032 # Convert all lead time to hours for consistency between models. 

1033 if "seconds" in str(tcoord.units): 1033 ↛ 1034line 1033 didn't jump to line 1034 because the condition on line 1033 was never true

1034 lead_times = lead_times / 3600.0 

1035 units = "hours" 

1036 elif "hours" in str(tcoord.units): 1036 ↛ 1039line 1036 didn't jump to line 1039 because the condition on line 1036 was always true

1037 units = "hours" 

1038 else: 

1039 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1040 

1041 # Create lead time coordinate. 

1042 lead_time_coord = iris.coords.AuxCoord( 

1043 lead_times, 

1044 standard_name="forecast_period", 

1045 long_name="forecast_period", 

1046 units=units, 

1047 ) 

1048 

1049 # Associate lead time coordinate with time dimension. 

1050 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1051 except iris.exceptions.CoordinateNotFoundError: 

1052 logging.warning( 

1053 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1054 ) 

1055 except iris.exceptions.CoordinateNotFoundError: 

1056 logging.warning("No time coordinate on cube.") 

1057 

1058 

1059def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1060 """Check forecast_period name and units.""" 

1061 try: 

1062 coord = cube.coord("forecast_period") 

1063 if coord.units != "hours": 

1064 cube.coord("forecast_period").convert_units("hours") 

1065 if not coord.standard_name: 

1066 coord.standard_name = "forecast_period" 

1067 except iris.exceptions.CoordinateNotFoundError: 

1068 pass 

1069 

1070 

1071def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1072 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1073 # Only add if time coordinate does not exist. 

1074 if not cube.coords("time"): 

1075 cube.add_aux_coord( 

1076 iris.coords.DimCoord( 

1077 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1078 ) 

1079 ) 

1080 

1081 return cube 

1082 

1083 

1084def _normalise_ML_varname(cube: iris.cube.Cube): 

1085 """Fix plev variable names to standard names.""" 

1086 if cube.coords("pressure"): 

1087 if cube.name() == "x_wind": 

1088 cube.long_name = "zonal_wind_at_pressure_levels" 

1089 if cube.name() == "y_wind": 

1090 cube.long_name = "meridional_wind_at_pressure_levels" 

1091 if cube.name() == "air_temperature": 

1092 cube.long_name = "temperature_at_pressure_levels" 

1093 if cube.name() == "specific_humidity": 1093 ↛ 1094line 1093 didn't jump to line 1094 because the condition on line 1093 was never true

1094 cube.long_name = ( 

1095 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1096 )