Coverage for src/CSET/operators/read.py: 91%

409 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-22 10:06 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43from CSET.operators.regrid import restructure_wrapper 

44 

45 

46class NoDataError(FileNotFoundError): 

47 """Error that no data has been loaded.""" 

48 

49 

50def read_cube( 

51 file_paths: list[str] | str, 

52 constraint: iris.Constraint = None, 

53 model_names: list[str] | str | None = None, 

54 subarea_type: str = None, 

55 subarea_extent: list[float] = None, 

56 **kwargs, 

57) -> iris.cube.Cube: 

58 """Read a single cube from files. 

59 

60 Read operator that takes a path string (can include shell-style glob 

61 patterns), and loads the cube matching the constraint. If any paths point to 

62 directory, all the files contained within are loaded. 

63 

64 Ensemble data can also be loaded. If it has a realization coordinate 

65 already, it will be directly used. If not, it will have its member number 

66 guessed from the filename, based on one of several common patterns. For 

67 example the pattern *emXX*, where XX is the realization. 

68 

69 Deterministic data will be loaded with a realization of 0, allowing it to be 

70 processed in the same way as ensemble data. 

71 

72 Arguments 

73 --------- 

74 file_paths: str | list[str] 

75 Path or paths to where .pp/.nc files are located 

76 constraint: iris.Constraint | iris.ConstraintCombination, optional 

77 Constraints to filter data by. Defaults to unconstrained. 

78 model_names: str | list[str], optional 

79 Names of the models that correspond to respective paths in file_paths. 

80 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

81 Whether to constrain data by model relative coordinates or real world 

82 coordinates. 

83 subarea_extent: list, optional 

84 List of coordinates to constraint data by, in order lower latitude, 

85 upper latitude, lower longitude, upper longitude. 

86 

87 Returns 

88 ------- 

89 cubes: iris.cube.Cube 

90 Cube loaded 

91 

92 Raises 

93 ------ 

94 FileNotFoundError 

95 If the provided path does not exist 

96 ValueError 

97 If the constraint doesn't produce a single cube. 

98 """ 

99 cubes = read_cubes( 

100 file_paths=file_paths, 

101 constraint=constraint, 

102 model_names=model_names, 

103 subarea_type=subarea_type, 

104 subarea_extent=subarea_extent, 

105 ) 

106 # Check filtered cubes is a CubeList containing one cube. 

107 if len(cubes) == 1: 

108 return cubes[0] 

109 else: 

110 raise ValueError( 

111 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

112 ) 

113 

114 

115def read_cubes( 

116 file_paths: list[str] | str, 

117 constraint: iris.Constraint | None = None, 

118 model_names: str | list[str] | None = None, 

119 subarea_type: str = None, 

120 subarea_extent: list = None, 

121 **kwargs, 

122) -> iris.cube.CubeList: 

123 """Read cubes from files. 

124 

125 Read operator that takes a path string (can include shell-style glob 

126 patterns), and loads the cubes matching the constraint. If any paths point 

127 to directory, all the files contained within are loaded. 

128 

129 Ensemble data can also be loaded. If it has a realization coordinate 

130 already, it will be directly used. If not, it will have its member number 

131 guessed from the filename, based on one of several common patterns. For 

132 example the pattern *emXX*, where XX is the realization. 

133 

134 Deterministic data will be loaded with a realization of 0, allowing it to be 

135 processed in the same way as ensemble data. 

136 

137 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

138 that the cubes merge across files. 

139 

140 Arguments 

141 --------- 

142 file_paths: str | list[str] 

143 Path or paths to where .pp/.nc files are located. Can include globs. 

144 constraint: iris.Constraint | iris.ConstraintCombination, optional 

145 Constraints to filter data by. Defaults to unconstrained. 

146 model_names: str | list[str], optional 

147 Names of the models that correspond to respective paths in file_paths. 

148 subarea_type: str, optional 

149 Whether to constrain data by model relative coordinates or real world 

150 coordinates. 

151 subarea_extent: list[float], optional 

152 List of coordinates to constraint data by, in order lower latitude, 

153 upper latitude, lower longitude, upper longitude. 

154 

155 Returns 

156 ------- 

157 cubes: iris.cube.CubeList 

158 Cubes loaded after being merged and concatenated. 

159 

160 Raises 

161 ------ 

162 FileNotFoundError 

163 If the provided path does not exist 

164 """ 

165 # Get iterable of paths. Each path corresponds to 1 model. 

166 paths = iter_maybe(file_paths) 

167 model_names = iter_maybe(model_names) 

168 

169 # Check we have appropriate number of model names. 

170 if model_names != (None,) and len(model_names) != len(paths): 

171 raise ValueError( 

172 f"The number of model names ({len(model_names)}) should equal " 

173 f"the number of paths given ({len(paths)})." 

174 ) 

175 

176 # Load the data for each model into a CubeList per model. 

177 model_cubes = ( 

178 _load_model(path, name, constraint) 

179 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

180 ) 

181 

182 # Split out first model's cubes and mark it as the base for comparisons. 

183 cubes = next(model_cubes) 

184 for cube in cubes: 

185 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

186 cube.attributes["cset_comparison_base"] = 1 

187 

188 # Load the rest of the models. 

189 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

190 

191 # Unify time units so different case studies can merge. 

192 iris.util.unify_time_units(cubes) 

193 

194 # Select sub region. 

195 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

196 

197 # Merge and concatenate cubes now metadata has been fixed. 

198 cubes = _merge_cubes_check_ensemble(cubes) 

199 cubes = cubes.concatenate() 

200 

201 # Squeeze single valued coordinates into scalar coordinates. 

202 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

203 

204 # Ensure dimension coordinates are bounded. 

205 for cube in cubes: 

206 for dim_coord in cube.coords(dim_coords=True): 

207 if (dim_coord.standard_name == "time") and ( 

208 dim_coord.name() 

209 not in itertools.chain.from_iterable( 

210 m.coord_names for m in cube.cell_methods if m.method != "point" 

211 ) 

212 ): 

213 # Instantaneous time coordinate 

214 continue 

215 # Iris can't guess the bounds of a scalar coordinate. 

216 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

217 dim_coord.guess_bounds() 

218 

219 logging.info("Loaded cubes: %s", cubes) 

220 if len(cubes) == 0: 

221 raise NoDataError("No cubes loaded, check your constraints!") 

222 return cubes 

223 

224 

225def _load_model( 

226 paths: str | list[str], 

227 model_name: str | None, 

228 constraint: iris.Constraint | None, 

229) -> iris.cube.CubeList: 

230 """Load a single model's data into a CubeList.""" 

231 input_files = _check_input_files(paths) 

232 # If unset, a constraint of None lets everything be loaded. 

233 logging.debug("Constraint: %s", constraint) 

234 

235 cubes = iris.load(input_files) 

236 

237 # If a cube called latitude exists, chances are its unstructured. 

238 # Using extract, not extract_cubes in a try as there might be more 

239 # than one cube called latitude if we are aggregating. 

240 if len(cubes.extract("latitude")) > 0: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 cubes = restructure_wrapper(cubes) 

242 

243 for cube in cubes: 

244 _loading_callback(cube, None, None) 

245 

246 # Extract required cubes based on constraint 

247 cubes = cubes.extract(constraint) 

248 

249 # cubes = iris.load(input_files, constraint, callback=_loading_callback) 

250 # Make the UM's winds consistent with LFRic. 

251 _fix_um_winds(cubes) 

252 

253 # Add model_name attribute to each cube to make it available at any further 

254 # step without needing to pass it as function parameter. 

255 if model_name is not None: 

256 for cube in cubes: 

257 cube.attributes["model_name"] = model_name 

258 return cubes 

259 

260 

261def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

262 """Get an iterable of files to load, and check that they all exist. 

263 

264 Arguments 

265 --------- 

266 input_paths: list[str] 

267 List of paths to input files or directories. The path may itself contain 

268 glob patterns, but unlike in shells it will match directly first. 

269 

270 Returns 

271 ------- 

272 list[Path] 

273 A list of files to load. 

274 

275 Raises 

276 ------ 

277 FileNotFoundError: 

278 If the provided arguments don't resolve to at least one existing file. 

279 """ 

280 files = [] 

281 for raw_filename in iter_maybe(input_paths): 

282 # Match glob-like files first, if they exist. 

283 raw_path = Path(raw_filename) 

284 if raw_path.is_file(): 

285 files.append(raw_path) 

286 else: 

287 for input_path in glob.glob(raw_filename): 

288 # Convert string paths into Path objects. 

289 input_path = Path(input_path) 

290 # Get the list of files in the directory, or use it directly. 

291 if input_path.is_dir(): 

292 logging.debug("Checking directory '%s' for files", input_path) 

293 files.extend(p for p in input_path.iterdir() if p.is_file()) 

294 else: 

295 files.append(input_path) 

296 

297 files.sort() 

298 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

299 if len(files) == 0: 

300 raise FileNotFoundError(f"No files found for {input_paths}") 

301 return files 

302 

303 

304def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

305 """Merge CubeList, renumbering realizations of 0 if required. 

306 

307 An unsuccessful merge indicates common input cube attributes, most 

308 commonly from ensemble members missing an explicit realization 

309 coordinate. Therefore the members are renumbered before being merged 

310 again. 

311 """ 

312 try: 

313 cubes = cubes.merge() 

314 except iris.exceptions.MergeError: 

315 _log_once( 

316 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

317 level=logging.WARNING, 

318 ) 

319 for ir, cube in enumerate(cubes): 

320 if cube.coord("realization").points == 0: 320 ↛ 319line 320 didn't jump to line 319 because the condition on line 320 was always true

321 cube.coord("realization").points = ir + 1 

322 cubes = cubes.merge() 

323 return cubes 

324 

325 

326def _cutout_cubes( 

327 cubes: iris.cube.CubeList, 

328 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

329 subarea_extent: list[float], 

330): 

331 """Cut out a subarea from a CubeList.""" 

332 if subarea_type is None: 

333 logging.debug("Subarea selection is disabled.") 

334 return cubes 

335 

336 # If selected, cutout according to number of grid cells to trim from each edge. 

337 cutout_cubes = iris.cube.CubeList() 

338 # Find spatial coordinates 

339 for cube in cubes: 

340 # Find dimension coordinates. 

341 lat_name, lon_name = get_cube_yxcoordname(cube) 

342 

343 # Compute cutout based on number of cells to trim from edges. 

344 if subarea_type == "gridcells": 

345 logging.debug( 

346 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

347 subarea_extent[0], 

348 subarea_extent[1], 

349 subarea_extent[2], 

350 subarea_extent[3], 

351 ) 

352 lat_points = np.sort(cube.coord(lat_name).points) 

353 lon_points = np.sort(cube.coord(lon_name).points) 

354 # Define cutout region using user provided cell points. 

355 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

356 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

357 

358 # Compute cutout based on specified coordinate values. 

359 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

360 # If not gridcells, cutout by requested geographic area, 

361 logging.debug( 

362 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

363 subarea_extent[0], 

364 subarea_extent[1], 

365 subarea_extent[2], 

366 subarea_extent[3], 

367 ) 

368 # Define cutout region using user provided coordinates. 

369 lats = np.array(subarea_extent[0:2]) 

370 lons = np.array(subarea_extent[2:4]) 

371 # Ensure cutout longitudes are within +/- 180.0 bounds. 

372 while lons[0] < -180.0: 

373 lons += 360.0 

374 while lons[1] > 180.0: 

375 lons -= 360.0 

376 # If the coordinate system is rotated we convert coordinates into 

377 # model-relative coordinates to extract the appropriate cutout. 

378 coord_system = cube.coord(lat_name).coord_system 

379 if subarea_type == "realworld" and isinstance( 

380 coord_system, iris.coord_systems.RotatedGeogCS 

381 ): 

382 lons, lats = rotate_pole( 

383 lons, 

384 lats, 

385 pole_lon=coord_system.grid_north_pole_longitude, 

386 pole_lat=coord_system.grid_north_pole_latitude, 

387 ) 

388 else: 

389 raise ValueError("Unknown subarea_type:", subarea_type) 

390 

391 # Do cutout and add to cutout_cubes. 

392 intersection_args = {lat_name: lats, lon_name: lons} 

393 logging.debug("Cutting out coords: %s", intersection_args) 

394 try: 

395 cutout_cubes.append(cube.intersection(**intersection_args)) 

396 except IndexError as err: 

397 raise ValueError( 

398 "Region cutout error. Check and update SUBAREA_EXTENT." 

399 "Cutout region requested should be contained within data area. " 

400 "Also check if cutout region requested is smaller than input grid spacing." 

401 ) from err 

402 

403 return cutout_cubes 

404 

405 

406def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

407 """Compose together the needed callbacks into a single function.""" 

408 # Most callbacks operate in-place, but save the cube when returned! 

409 _realization_callback(cube) 

410 _um_normalise_callback(cube) 

411 _lfric_normalise_callback(cube) 

412 cube = _lfric_time_coord_fix_callback(cube) 

413 _normalise_var0_varname(cube) 

414 cube = _fix_no_spatial_coords_callback(cube) 

415 _fix_spatial_coords_callback(cube) 

416 _fix_pressure_coord_callback(cube) 

417 _fix_um_radtime(cube) 

418 _fix_cell_methods(cube) 

419 cube = _convert_cube_units_callback(cube) 

420 cube = _grid_longitude_fix_callback(cube) 

421 _fix_lfric_cloud_base_altitude(cube) 

422 _proleptic_gregorian_fix(cube) 

423 _lfric_time_callback(cube) 

424 _lfric_forecast_period_callback(cube) 

425 cube = _fix_no_time_coords_callback(cube) 

426 _normalise_ML_varname(cube) 

427 return cube 

428 

429 

430def _realization_callback(cube): 

431 """Add a realization coordinate initialised to 0 if missing. 

432 

433 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

434 of the code. 

435 """ 

436 # Only add if realization coordinate does not exist. 

437 if not cube.coords("realization"): 

438 cube.add_aux_coord( 

439 iris.coords.DimCoord(0, standard_name="realization", units="1") 

440 ) 

441 

442 

443@functools.lru_cache(None) 

444def _log_once(msg, level=logging.WARNING): 

445 """Print a warning message, skipping recent duplicates.""" 

446 logging.log(level, msg) 

447 

448 

449def _um_normalise_callback(cube: iris.cube.Cube): 

450 """Normalise UM STASH variable long names to LFRic variable names. 

451 

452 Note standard names will remain associated with cubes where different. 

453 Long name will be used consistently in output filename and titles. 

454 """ 

455 # Convert STASH to LFRic variable name 

456 if "STASH" in cube.attributes: 

457 stash = cube.attributes["STASH"] 

458 try: 

459 (name, grid) = STASH_TO_LFRIC[str(stash)] 

460 cube.long_name = name 

461 except KeyError: 

462 # Don't change cubes with unknown stash codes. 

463 _log_once( 

464 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

465 level=logging.WARNING, 

466 ) 

467 

468 

469def _lfric_normalise_callback(cube: iris.cube.Cube): 

470 """Normalise attributes that prevents LFRic cube from merging. 

471 

472 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

473 no relation to the data contained. These attributes are removed. 

474 

475 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

476 multiple UM fields, however it can be encoded in any order. This attribute 

477 is sorted to prevent this. This attribute is only present in LFRic data that 

478 has been converted to look like UM data. 

479 """ 

480 # Remove unwanted attributes. 

481 cube.attributes.pop("timeStamp", None) 

482 cube.attributes.pop("uuid", None) 

483 cube.attributes.pop("name", None) 

484 cube.attributes.pop("source", None) 

485 cube.attributes.pop("analysis_source", None) 

486 cube.attributes.pop("history", None) 

487 

488 # Sort STASH code list. 

489 stash_list = cube.attributes.get("um_stash_source") 

490 if stash_list: 

491 # Parse the string as a list, sort, then re-encode as a string. 

492 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

493 

494 

495def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

496 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

497 

498 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

499 issue, though the coordinate satisfies all the properties for a DimCoord. 

500 Scalar time values are left as AuxCoords. 

501 """ 

502 # This issue seems to come from iris's handling of NetCDF files where time 

503 # always ends up as an AuxCoord. 

504 if cube.coords("time"): 

505 time_coord = cube.coord("time") 

506 if ( 

507 not isinstance(time_coord, iris.coords.DimCoord) 

508 and len(cube.coord_dims(time_coord)) == 1 

509 ): 

510 # Fudge the bounds to foil checking for strict monotonicity. 

511 if time_coord.has_bounds(): 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

513 time_coord.bounds = [ 

514 [ 

515 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

516 time_coord.bounds[i][1], 

517 ] 

518 for i in range(len(time_coord.bounds)) 

519 ] 

520 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

521 return cube 

522 

523 

524def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

525 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

526 

527 This is necessary if comparing two models with different conventions -- 

528 for example, models where the prime meridian is defined as 0 deg or 

529 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

530 so that it falls in this range. Checks are for near-180 bounds given 

531 model data bounds may not extend exactly to 0. or 360. 

532 Input cubes on non-rotated grid coordinates are not impacted. 

533 """ 

534 try: 

535 y, x = get_cube_yxcoordname(cube) 

536 except ValueError: 

537 # Don't modify non-spatial cubes. 

538 return cube 

539 

540 long_coord = cube.coord(x) 

541 # Wrap longitudes if rotated pole coordinates 

542 coord_system = long_coord.coord_system 

543 if x == "grid_longitude" and isinstance( 

544 coord_system, iris.coord_systems.RotatedGeogCS 

545 ): 

546 long_points = long_coord.points.copy() 

547 long_centre = np.median(long_points) 

548 while long_centre < -175.0: 

549 long_centre += 360.0 

550 long_points += 360.0 

551 while long_centre >= 175.0: 

552 long_centre -= 360.0 

553 long_points -= 360.0 

554 long_coord.points = long_points 

555 

556 # Update coord bounds to be consistent with wrapping. 

557 if long_coord.has_bounds(): 

558 long_coord.bounds = None 

559 long_coord.guess_bounds() 

560 

561 return cube 

562 

563 

564def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

565 import CSET.operators._utils as utils 

566 

567 # Don't modify spatial cubes that already have spatial dimensions 

568 if utils.is_spatialdim(cube): 

569 return cube 

570 

571 else: 

572 # attempt to get lat/long from cube attributes 

573 try: 

574 lat_min = cube.attributes.get("geospatial_lat_min") 

575 lat_max = cube.attributes.get("geospatial_lat_max") 

576 lon_min = cube.attributes.get("geospatial_lon_min") 

577 lon_max = cube.attributes.get("geospatial_lon_max") 

578 

579 lon_val = (lon_min + lon_max) / 2.0 

580 lat_val = (lat_min + lat_max) / 2.0 

581 

582 lat_coord = iris.coords.DimCoord( 

583 lat_val, 

584 standard_name="latitude", 

585 units="degrees_north", 

586 var_name="latitude", 

587 coord_system=iris.coord_systems.GeogCS(6371229.0), 

588 circular=True, 

589 ) 

590 

591 lon_coord = iris.coords.DimCoord( 

592 lon_val, 

593 standard_name="longitude", 

594 units="degrees_east", 

595 var_name="longitude", 

596 coord_system=iris.coord_systems.GeogCS(6371229.0), 

597 circular=True, 

598 ) 

599 

600 cube.add_aux_coord(lat_coord) 

601 cube.add_aux_coord(lon_coord) 

602 return cube 

603 

604 # if lat/long are not in attributes, then return cube unchanged: 

605 except TypeError: 

606 return cube 

607 

608 

609def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

610 """Check latitude and longitude coordinates name. 

611 

612 This is necessary as some models define their grid as on rotated 

613 'grid_latitude' and 'grid_longitude' coordinates while others define 

614 the grid on non-rotated 'latitude' and 'longitude'. 

615 Cube dimensions need to be made consistent to avoid recipe failures, 

616 particularly where comparing multiple input models with differing spatial 

617 coordinates. 

618 """ 

619 # Check if cube is spatial. 

620 if not is_spatialdim(cube): 

621 # Don't modify non-spatial cubes. 

622 return 

623 

624 # Get spatial coords and dimension index. 

625 y_name, x_name = get_cube_yxcoordname(cube) 

626 ny = get_cube_coordindex(cube, y_name) 

627 nx = get_cube_coordindex(cube, x_name) 

628 

629 # Remove spatial coords bounds if erroneous values detected. 

630 # Aims to catch some errors in input coord bounds by setting 

631 # invalid threshold of 10000.0 

632 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

633 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

634 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

635 if bx_max > 10000.0 or by_max > 10000.0: 

636 cube.coord(x_name).bounds = None 

637 cube.coord(y_name).bounds = None 

638 

639 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

640 # [latitude, longitude] for instances where rotated_pole=90.0 

641 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

642 coord_system = cube.coord("grid_latitude").coord_system 

643 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

644 if pole_lat == 90.0: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 lats = cube.coord("grid_latitude").points 

646 lons = cube.coord("grid_longitude").points 

647 

648 cube.remove_coord("grid_latitude") 

649 cube.add_dim_coord( 

650 iris.coords.DimCoord( 

651 lats, 

652 standard_name="latitude", 

653 var_name="latitude", 

654 units="degrees", 

655 coord_system=iris.coord_systems.GeogCS(6371229.0), 

656 circular=True, 

657 ), 

658 ny, 

659 ) 

660 y_name = "latitude" 

661 cube.remove_coord("grid_longitude") 

662 cube.add_dim_coord( 

663 iris.coords.DimCoord( 

664 lons, 

665 standard_name="longitude", 

666 var_name="longitude", 

667 units="degrees", 

668 coord_system=iris.coord_systems.GeogCS(6371229.0), 

669 circular=True, 

670 ), 

671 nx, 

672 ) 

673 x_name = "longitude" 

674 

675 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

676 # rotated pole attributes for cases with [lat, lon] inputs 

677 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

678 "degrees", 

679 "degrees_north", 

680 "degrees_south", 

681 ]: 

682 # Add grid_latitude AuxCoord 

683 if "grid_latitude" not in [ 

684 coord.name() for coord in cube.coords(dim_coords=False) 

685 ]: 

686 cube.add_aux_coord( 

687 iris.coords.AuxCoord( 

688 cube.coord(y_name).points, 

689 var_name="grid_latitude", 

690 units="degrees", 

691 ), 

692 ny, 

693 ) 

694 # Ensure input latitude DimCoord has CoordSystem 

695 # This attribute is sometimes lost on iris.save 

696 if not cube.coord(y_name).coord_system: 

697 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

698 

699 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

700 "degrees", 

701 "degrees_west", 

702 "degrees_east", 

703 ]: 

704 # Add grid_longitude AuxCoord 

705 if "grid_longitude" not in [ 

706 coord.name() for coord in cube.coords(dim_coords=False) 

707 ]: 

708 cube.add_aux_coord( 

709 iris.coords.AuxCoord( 

710 cube.coord(x_name).points, 

711 var_name="grid_longitude", 

712 units="degrees", 

713 ), 

714 nx, 

715 ) 

716 

717 # Ensure input longitude DimCoord has CoordSystem 

718 # This attribute is sometimes lost on iris.save 

719 if not cube.coord(x_name).coord_system: 

720 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

721 

722 

723def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

724 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

725 

726 This problem was raised because the AIFS model data from ECMWF 

727 defines the pressure coordinate with the name "pressure_level" rather 

728 than compliant CF coordinate names. 

729 

730 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

731 and approach the coordinates in a unified way. 

732 """ 

733 for coord in cube.dim_coords: 

734 if coord.name() in ["pressure_level", "pressure_levels"]: 

735 coord.rename("pressure") 

736 

737 if coord.name() == "pressure": 

738 if str(cube.coord("pressure").units) != "hPa": 

739 cube.coord("pressure").convert_units("hPa") 

740 

741 

742def _fix_um_radtime(cube: iris.cube.Cube): 

743 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

744 

745 This callback does not have any effect for output diagnostics with 

746 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

747 diagnostics are checked. 

748 Note this callback does not interpolate the data in time, only adjust 

749 timestamps to sit on the hour to enable time-to-time difference plotting 

750 with models which may output radiation data on the hour. 

751 """ 

752 try: 

753 if cube.attributes["STASH"] in [ 

754 "m01s01i207", 

755 "m01s01i208", 

756 "m01s02i205", 

757 "m01s02i201", 

758 "m01s01i207", 

759 "m01s02i207", 

760 "m01s01i235", 

761 ]: 

762 time_coord = cube.coord("time") 

763 

764 # Convert time points to datetime objects 

765 time_unit = time_coord.units 

766 time_points = time_unit.num2date(time_coord.points) 

767 # Skip if times don't need fixing. 

768 if time_points[0].minute == 0 and time_points[0].second == 0: 

769 return 

770 if time_points[0].minute == 30 and time_points[0].second == 0: 770 ↛ 771line 770 didn't jump to line 771 because the condition on line 770 was never true

771 return 

772 

773 # Subtract time difference from the hour from each time point 

774 n_minute = time_points[0].minute 

775 n_second = time_points[0].second 

776 # If times closer to next hour, compute difference to add on to following hour 

777 if n_minute > 30: 

778 n_minute = n_minute - 60 

779 # Compute new diagnostic time stamp 

780 new_time_points = ( 

781 time_points 

782 - datetime.timedelta(minutes=n_minute) 

783 - datetime.timedelta(seconds=n_second) 

784 ) 

785 

786 # Convert back to numeric values using the original time unit. 

787 new_time_values = time_unit.date2num(new_time_points) 

788 

789 # Replace the time coordinate with updated values. 

790 time_coord.points = new_time_values 

791 

792 # Recompute forecast_period with corrected values. 

793 if cube.coord("forecast_period"): 793 ↛ exitline 793 didn't return from function '_fix_um_radtime' because the condition on line 793 was always true

794 fcst_prd_points = cube.coord("forecast_period").points 

795 new_fcst_points = ( 

796 time_unit.num2date(fcst_prd_points) 

797 - datetime.timedelta(minutes=n_minute) 

798 - datetime.timedelta(seconds=n_second) 

799 ) 

800 cube.coord("forecast_period").points = time_unit.date2num( 

801 new_fcst_points 

802 ) 

803 except KeyError: 

804 pass 

805 

806 

807def _fix_cell_methods(cube: iris.cube.Cube): 

808 """To fix the assumed cell_methods in accumulation STASH from UM. 

809 

810 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

811 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

812 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

813 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

814 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

815 enabling the cell_method constraint on reading to select correct input. 

816 """ 

817 # Shift "mean" cell_method to "sum" for selected UM inputs. 

818 if cube.attributes.get("STASH") in [ 

819 "m01s21i104", 

820 "m01s04i201", 

821 "m01s04i202", 

822 "m01s05i201", 

823 "m01s05i202", 

824 ]: 

825 # Check if input cell_method contains "mean" time-processing. 

826 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 826 ↛ exitline 826 didn't return from function '_fix_cell_methods' because the condition on line 826 was always true

827 # Retrieve interval and any comment information. 

828 for cell_method in cube.cell_methods: 

829 interval_str = cell_method.intervals 

830 comment_str = cell_method.comments 

831 

832 # Remove input aggregation method. 

833 cube.cell_methods = () 

834 

835 # Replace "mean" with "sum" cell_method to indicate aggregation. 

836 cube.add_cell_method( 

837 iris.coords.CellMethod( 

838 method="sum", 

839 coords="time", 

840 intervals=interval_str, 

841 comments=comment_str, 

842 ) 

843 ) 

844 

845 

846def _convert_cube_units_callback(cube: iris.cube.Cube): 

847 """Adjust diagnostic units for specific variables. 

848 

849 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

850 converted here to mm hr-1. 

851 

852 Visibility diagnostics are converted here from m to km to improve output 

853 formatting. 

854 """ 

855 # Convert precipitation diagnostic units if required. 

856 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

857 if any("surface_microphysical" in name for name in varnames): 

858 if cube.units == "kg m-2 s-1": 

859 _log_once( 

860 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

861 level=logging.DEBUG, 

862 ) 

863 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

864 # This is a 1:1 conversion, so we just change the units. 

865 cube.units = "mm s-1" 

866 # Convert the units to per hour. 

867 cube.convert_units("mm hr-1") 

868 elif cube.units == "kg m-2": 868 ↛ 878line 868 didn't jump to line 878 because the condition on line 868 was always true

869 _log_once( 

870 "Converting precipitation amount units from kg m-2 to mm", 

871 level=logging.DEBUG, 

872 ) 

873 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

874 # This is a 1:1 conversion, so we just change the units. 

875 cube.units = "mm" 

876 

877 # Convert visibility diagnostic units if required. 

878 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

879 if any("visibility" in name for name in varnames): 

880 if cube.units == "m": 880 ↛ 885line 880 didn't jump to line 885 because the condition on line 880 was always true

881 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

882 # Convert the units to km. 

883 cube.convert_units("km") 

884 

885 return cube 

886 

887 

888def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

889 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

890 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

891 if any("cloud_base_altitude" in name for name in varnames): 

892 # Mask cube where set > 144kft to catch default 144.35695538058164 

893 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

894 

895 

896def _fix_um_winds(cubes: iris.cube.CubeList): 

897 """To make winds from the UM consistent with those from LFRic. 

898 

899 Diagnostics of wind are not always consistent between the UM 

900 and LFric. Here, winds from the UM are adjusted to make them i 

901 consistent with LFRic. 

902 """ 

903 # Check whether we have components of the wind identified by STASH, 

904 # (so this will apply only to cubes from the UM), but not the 

905 # wind speed and calculate it if it is missing. Note that 

906 # this will be biased low in general because the components will mostly 

907 # be time averages. For simplicity, we do this only if there is just one 

908 # cube of a component. A more complicated approach would be to consider 

909 # the cell methods, but it may not be warranted. 

910 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

911 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

912 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

913 try: 

914 if cubes.extract(u_constr) and cubes.extract(v_constr): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true

915 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

916 _add_wind_speed_um(cubes) 

917 # Convert winds in the UM to be relative to true east and true north. 

918 _convert_wind_true_dirn_um(cubes) 

919 except (KeyError, AttributeError): 

920 pass 

921 

922 

923def _add_wind_speed_um(cubes: iris.cube.CubeList): 

924 """Add windspeeds to cubes from the UM.""" 

925 wspd10 = ( 

926 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

927 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

928 ) ** 0.5 

929 wspd10.attributes["STASH"] = "m01s03i227" 

930 wspd10.standard_name = "wind_speed" 

931 wspd10.long_name = "wind_speed_at_10m" 

932 cubes.append(wspd10) 

933 

934 

935def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

936 """To convert winds to true directions. 

937 

938 Convert from the components relative to the grid to true directions. 

939 This functionality only handles the simplest case. 

940 """ 

941 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

942 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

943 for u, v in zip(u_grids, v_grids, strict=True): 

944 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

945 u.data = true_u.core_data() 

946 v.data = true_v.core_data() 

947 

948 

949def _normalise_var0_varname(cube: iris.cube.Cube): 

950 """Fix varnames for consistency to allow merging. 

951 

952 Some model data netCDF sometimes have a coordinate name end in 

953 "_0" etc, where duplicate coordinates of same name are defined but 

954 with different attributes. This can be inconsistently managed in 

955 different model inputs and can cause cubes to fail to merge. 

956 """ 

957 for coord in cube.coords(): 

958 if coord.var_name and coord.var_name.endswith("_0"): 

959 coord.var_name = coord.var_name.removesuffix("_0") 

960 if coord.var_name and coord.var_name.endswith("_1"): 

961 coord.var_name = coord.var_name.removesuffix("_1") 

962 if coord.var_name and coord.var_name.endswith("_2"): 962 ↛ 963line 962 didn't jump to line 963 because the condition on line 962 was never true

963 coord.var_name = coord.var_name.removesuffix("_2") 

964 if coord.var_name and coord.var_name.endswith("_3"): 964 ↛ 965line 964 didn't jump to line 965 because the condition on line 964 was never true

965 coord.var_name = coord.var_name.removesuffix("_3") 

966 

967 if cube.var_name and cube.var_name.endswith("_0"): 

968 cube.var_name = cube.var_name.removesuffix("_0") 

969 

970 

971def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

972 """Convert the calendars of time units to use a standard calendar.""" 

973 try: 

974 time_coord = cube.coord("time") 

975 if time_coord.units.calendar == "proleptic_gregorian": 

976 logging.debug( 

977 "Changing proleptic Gregorian calendar to standard calendar for %s", 

978 repr(time_coord.units), 

979 ) 

980 time_coord.units = time_coord.units.change_calendar("standard") 

981 except iris.exceptions.CoordinateNotFoundError: 

982 pass 

983 

984 

985def _lfric_time_callback(cube: iris.cube.Cube): 

986 """Fix time coordinate metadata if missing dimensions. 

987 

988 Some model data does not contain forecast_reference_time or forecast_period as 

989 expected coordinates, and so we cannot aggregate over case studies without this 

990 metadata. This callback fixes these issues. 

991 

992 This callback also ensures all time coordinates are referenced as hours since 

993 1970-01-01 00:00:00 for consistency across different model inputs. 

994 

995 Notes 

996 ----- 

997 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

998 """ 

999 # Construct forecast_reference time if it doesn't exist. 

1000 try: 

1001 tcoord = cube.coord("time") 

1002 # Set time coordinate to common basis "hours since 1970" 

1003 try: 

1004 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

1005 except ValueError: 

1006 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

1007 

1008 if not cube.coords("forecast_reference_time"): 

1009 try: 

1010 init_time = datetime.datetime.fromisoformat( 

1011 tcoord.attributes["time_origin"] 

1012 ) 

1013 frt_point = tcoord.units.date2num(init_time) 

1014 frt_coord = iris.coords.AuxCoord( 

1015 frt_point, 

1016 units=tcoord.units, 

1017 standard_name="forecast_reference_time", 

1018 long_name="forecast_reference_time", 

1019 ) 

1020 cube.add_aux_coord(frt_coord) 

1021 except KeyError: 

1022 logging.warning( 

1023 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1024 ) 

1025 

1026 # Remove time_origin to allow multiple case studies to merge. 

1027 tcoord.attributes.pop("time_origin", None) 

1028 

1029 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1030 if not cube.coords("forecast_period"): 

1031 try: 

1032 # Create array of forecast lead times. 

1033 init_coord = cube.coord("forecast_reference_time") 

1034 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1035 init_coord.units.num2date(init_coord.points) 

1036 ) 

1037 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1038 

1039 # Get unit for lead time from time coordinate's unit. 

1040 # Convert all lead time to hours for consistency between models. 

1041 if "seconds" in str(tcoord.units): 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true

1042 lead_times = lead_times / 3600.0 

1043 units = "hours" 

1044 elif "hours" in str(tcoord.units): 1044 ↛ 1047line 1044 didn't jump to line 1047 because the condition on line 1044 was always true

1045 units = "hours" 

1046 else: 

1047 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1048 

1049 # Create lead time coordinate. 

1050 lead_time_coord = iris.coords.AuxCoord( 

1051 lead_times, 

1052 standard_name="forecast_period", 

1053 long_name="forecast_period", 

1054 units=units, 

1055 ) 

1056 

1057 # Associate lead time coordinate with time dimension. 

1058 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1059 except iris.exceptions.CoordinateNotFoundError: 

1060 logging.warning( 

1061 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1062 ) 

1063 except iris.exceptions.CoordinateNotFoundError: 

1064 logging.warning("No time coordinate on cube.") 

1065 

1066 

1067def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1068 """Check forecast_period name and units.""" 

1069 try: 

1070 coord = cube.coord("forecast_period") 

1071 if coord.units != "hours": 

1072 cube.coord("forecast_period").convert_units("hours") 

1073 if not coord.standard_name: 

1074 coord.standard_name = "forecast_period" 

1075 except iris.exceptions.CoordinateNotFoundError: 

1076 pass 

1077 

1078 

1079def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1080 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1081 # Only add if time coordinate does not exist. 

1082 if not cube.coords("time"): 

1083 cube.add_aux_coord( 

1084 iris.coords.DimCoord( 

1085 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1086 ) 

1087 ) 

1088 

1089 return cube 

1090 

1091 

1092def _normalise_ML_varname(cube: iris.cube.Cube): 

1093 """Fix plev variable names to standard names.""" 

1094 if cube.coords("pressure"): 

1095 if cube.name() == "x_wind": 

1096 cube.long_name = "zonal_wind_at_pressure_levels" 

1097 if cube.name() == "y_wind": 

1098 cube.long_name = "meridional_wind_at_pressure_levels" 

1099 if cube.name() == "air_temperature": 

1100 cube.long_name = "temperature_at_pressure_levels" 

1101 if cube.name() == "specific_humidity": 1101 ↛ 1102line 1101 didn't jump to line 1102 because the condition on line 1101 was never true

1102 cube.long_name = ( 

1103 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1104 )