Coverage for src / CSET / operators / read.py: 89%

411 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 17:20 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43 

44 

45class NoDataError(FileNotFoundError): 

46 """Error that no data has been loaded.""" 

47 

48 

49def read_cube( 

50 file_paths: list[str] | str, 

51 constraint: iris.Constraint = None, 

52 model_names: list[str] | str | None = None, 

53 subarea_type: str = None, 

54 subarea_extent: list[float] = None, 

55 **kwargs, 

56) -> iris.cube.Cube: 

57 """Read a single cube from files. 

58 

59 Read operator that takes a path string (can include shell-style glob 

60 patterns), and loads the cube matching the constraint. If any paths point to 

61 directory, all the files contained within are loaded. 

62 

63 Ensemble data can also be loaded. If it has a realization coordinate 

64 already, it will be directly used. If not, it will have its member number 

65 guessed from the filename, based on one of several common patterns. For 

66 example the pattern *emXX*, where XX is the realization. 

67 

68 Deterministic data will be loaded with a realization of 0, allowing it to be 

69 processed in the same way as ensemble data. 

70 

71 Arguments 

72 --------- 

73 file_paths: str | list[str] 

74 Path or paths to where .pp/.nc files are located 

75 constraint: iris.Constraint | iris.ConstraintCombination, optional 

76 Constraints to filter data by. Defaults to unconstrained. 

77 model_names: str | list[str], optional 

78 Names of the models that correspond to respective paths in file_paths. 

79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

80 Whether to constrain data by model relative coordinates or real world 

81 coordinates. 

82 subarea_extent: list, optional 

83 List of coordinates to constraint data by, in order lower latitude, 

84 upper latitude, lower longitude, upper longitude. 

85 

86 Returns 

87 ------- 

88 cubes: iris.cube.Cube 

89 Cube loaded 

90 

91 Raises 

92 ------ 

93 FileNotFoundError 

94 If the provided path does not exist 

95 ValueError 

96 If the constraint doesn't produce a single cube. 

97 """ 

98 cubes = read_cubes( 

99 file_paths=file_paths, 

100 constraint=constraint, 

101 model_names=model_names, 

102 subarea_type=subarea_type, 

103 subarea_extent=subarea_extent, 

104 ) 

105 # Check filtered cubes is a CubeList containing one cube. 

106 if len(cubes) == 1: 

107 return cubes[0] 

108 else: 

109 raise ValueError( 

110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

111 ) 

112 

113 

114def read_cubes( 

115 file_paths: list[str] | str, 

116 constraint: iris.Constraint | None = None, 

117 model_names: str | list[str] | None = None, 

118 subarea_type: str = None, 

119 subarea_extent: list = None, 

120 **kwargs, 

121) -> iris.cube.CubeList: 

122 """Read cubes from files. 

123 

124 Read operator that takes a path string (can include shell-style glob 

125 patterns), and loads the cubes matching the constraint. If any paths point 

126 to directory, all the files contained within are loaded. 

127 

128 Ensemble data can also be loaded. If it has a realization coordinate 

129 already, it will be directly used. If not, it will have its member number 

130 guessed from the filename, based on one of several common patterns. For 

131 example the pattern *emXX*, where XX is the realization. 

132 

133 Deterministic data will be loaded with a realization of 0, allowing it to be 

134 processed in the same way as ensemble data. 

135 

136 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

137 that the cubes merge across files. 

138 

139 Arguments 

140 --------- 

141 file_paths: str | list[str] 

142 Path or paths to where .pp/.nc files are located. Can include globs. 

143 constraint: iris.Constraint | iris.ConstraintCombination, optional 

144 Constraints to filter data by. Defaults to unconstrained. 

145 model_names: str | list[str], optional 

146 Names of the models that correspond to respective paths in file_paths. 

147 subarea_type: str, optional 

148 Whether to constrain data by model relative coordinates or real world 

149 coordinates. 

150 subarea_extent: list[float], optional 

151 List of coordinates to constraint data by, in order lower latitude, 

152 upper latitude, lower longitude, upper longitude. 

153 

154 Returns 

155 ------- 

156 cubes: iris.cube.CubeList 

157 Cubes loaded after being merged and concatenated. 

158 

159 Raises 

160 ------ 

161 FileNotFoundError 

162 If the provided path does not exist 

163 """ 

164 # Get iterable of paths. Each path corresponds to 1 model. 

165 paths = iter_maybe(file_paths) 

166 model_names = iter_maybe(model_names) 

167 

168 # Check we have appropriate number of model names. 

169 if model_names != (None,) and len(model_names) != len(paths): 

170 raise ValueError( 

171 f"The number of model names ({len(model_names)}) should equal " 

172 f"the number of paths given ({len(paths)})." 

173 ) 

174 

175 # Load the data for each model into a CubeList per model. 

176 model_cubes = ( 

177 _load_model(path, name, constraint) 

178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

179 ) 

180 

181 # Split out first model's cubes and mark it as the base for comparisons. 

182 cubes = next(model_cubes) 

183 for cube in cubes: 

184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

185 cube.attributes["cset_comparison_base"] = 1 

186 

187 # Load the rest of the models. 

188 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

189 

190 # Enable different point-based observation sources to be concatenated. 

191 cubes = _check_combine_point_observations(cubes) 

192 

193 # Unify time units so different case studies can merge. 

194 iris.util.unify_time_units(cubes) 

195 

196 # Select sub region. 

197 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

198 

199 # Merge and concatenate cubes now metadata has been fixed. 

200 cubes = _merge_cubes_check_ensemble(cubes) 

201 cubes = cubes.concatenate() 

202 

203 # Squeeze single valued coordinates into scalar coordinates. 

204 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

205 

206 # Ensure dimension coordinates are bounded. 

207 for cube in cubes: 

208 for dim_coord in cube.coords(dim_coords=True): 

209 # Iris can't guess the bounds of a scalar coordinate. 

210 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

211 dim_coord.guess_bounds() 

212 

213 logging.info("Loaded cubes: %s", cubes) 

214 if len(cubes) == 0: 

215 raise NoDataError("No cubes loaded, check your constraints!") 

216 return cubes 

217 

218 

219def _load_model( 

220 paths: str | list[str], 

221 model_name: str | None, 

222 constraint: iris.Constraint | None, 

223) -> iris.cube.CubeList: 

224 """Load a single model's data into a CubeList.""" 

225 input_files = _check_input_files(paths) 

226 # If unset, a constraint of None lets everything be loaded. 

227 logging.debug("Constraint: %s", constraint) 

228 cubes = iris.load(input_files, constraint, callback=_loading_callback) 

229 # Make the UM's winds consistent with LFRic. 

230 _fix_um_winds(cubes) 

231 

232 # Add model_name attribute to each cube to make it available at any further 

233 # step without needing to pass it as function parameter. 

234 if model_name is not None: 

235 for cube in cubes: 

236 cube.attributes["model_name"] = model_name 

237 return cubes 

238 

239 

240def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

241 """Get an iterable of files to load, and check that they all exist. 

242 

243 Arguments 

244 --------- 

245 input_paths: list[str] 

246 List of paths to input files or directories. The path may itself contain 

247 glob patterns, but unlike in shells it will match directly first. 

248 

249 Returns 

250 ------- 

251 list[Path] 

252 A list of files to load. 

253 

254 Raises 

255 ------ 

256 FileNotFoundError: 

257 If the provided arguments don't resolve to at least one existing file. 

258 """ 

259 files = [] 

260 for raw_filename in iter_maybe(input_paths): 

261 # Match glob-like files first, if they exist. 

262 raw_path = Path(raw_filename) 

263 if raw_path.is_file(): 

264 files.append(raw_path) 

265 else: 

266 for input_path in glob.glob(raw_filename): 

267 # Convert string paths into Path objects. 

268 input_path = Path(input_path) 

269 # Get the list of files in the directory, or use it directly. 

270 if input_path.is_dir(): 

271 logging.debug("Checking directory '%s' for files", input_path) 

272 files.extend(p for p in input_path.iterdir() if p.is_file()) 

273 else: 

274 files.append(input_path) 

275 

276 files.sort() 

277 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

278 if len(files) == 0: 

279 raise FileNotFoundError(f"No files found for {input_paths}") 

280 return files 

281 

282 

283def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

284 """Merge CubeList, renumbering realizations of 0 if required. 

285 

286 An unsuccessful merge indicates common input cube attributes, most 

287 commonly from ensemble members missing an explicit realization 

288 coordinate. Therefore the members are renumbered before being merged 

289 again. 

290 """ 

291 try: 

292 cubes = cubes.merge() 

293 except iris.exceptions.MergeError: 

294 _log_once( 

295 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

296 level=logging.WARNING, 

297 ) 

298 for ir, cube in enumerate(cubes): 

299 if cube.coord("realization").points == 0: 299 ↛ 298line 299 didn't jump to line 298 because the condition on line 299 was always true

300 cube.coord("realization").points = ir + 1 

301 cubes = cubes.merge() 

302 return cubes 

303 

304 

305def _cutout_cubes( 

306 cubes: iris.cube.CubeList, 

307 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

308 subarea_extent: list[float], 

309): 

310 """Cut out a subarea from a CubeList.""" 

311 if subarea_type is None: 

312 logging.debug("Subarea selection is disabled.") 

313 return cubes 

314 

315 # If selected, cutout according to number of grid cells to trim from each edge. 

316 cutout_cubes = iris.cube.CubeList() 

317 # Find spatial coordinates 

318 for cube in cubes: 

319 # Find dimension coordinates. 

320 lat_name, lon_name = get_cube_yxcoordname(cube) 

321 

322 # Compute cutout based on number of cells to trim from edges. 

323 if subarea_type == "gridcells": 

324 logging.debug( 

325 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

326 subarea_extent[0], 

327 subarea_extent[1], 

328 subarea_extent[2], 

329 subarea_extent[3], 

330 ) 

331 lat_points = np.sort(cube.coord(lat_name).points) 

332 lon_points = np.sort(cube.coord(lon_name).points) 

333 # Define cutout region using user provided cell points. 

334 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

335 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

336 

337 # Compute cutout based on specified coordinate values. 

338 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

339 # If not gridcells, cutout by requested geographic area, 

340 logging.debug( 

341 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

342 subarea_extent[0], 

343 subarea_extent[1], 

344 subarea_extent[2], 

345 subarea_extent[3], 

346 ) 

347 # Define cutout region using user provided coordinates. 

348 lats = np.array(subarea_extent[0:2]) 

349 lons = np.array(subarea_extent[2:4]) 

350 # Ensure cutout longitudes are within +/- 180.0 bounds. 

351 while lons[0] < -180.0: 

352 lons += 360.0 

353 while lons[1] > 180.0: 

354 lons -= 360.0 

355 # If the coordinate system is rotated we convert coordinates into 

356 # model-relative coordinates to extract the appropriate cutout. 

357 coord_system = cube.coord(lat_name).coord_system 

358 if subarea_type == "realworld" and isinstance( 

359 coord_system, iris.coord_systems.RotatedGeogCS 

360 ): 

361 lons, lats = rotate_pole( 

362 lons, 

363 lats, 

364 pole_lon=coord_system.grid_north_pole_longitude, 

365 pole_lat=coord_system.grid_north_pole_latitude, 

366 ) 

367 else: 

368 raise ValueError("Unknown subarea_type:", subarea_type) 

369 

370 # Do cutout and add to cutout_cubes. 

371 intersection_args = {lat_name: lats, lon_name: lons} 

372 logging.debug("Cutting out coords: %s", intersection_args) 

373 try: 

374 cutout_cubes.append(cube.intersection(**intersection_args)) 

375 except IndexError as err: 

376 raise ValueError( 

377 "Region cutout error. Check and update SUBAREA_EXTENT." 

378 "Cutout region requested should be contained within data area. " 

379 "Also check if cutout region requested is smaller than input grid spacing." 

380 ) from err 

381 

382 return cutout_cubes 

383 

384 

385def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

386 """Compose together the needed callbacks into a single function.""" 

387 # Most callbacks operate in-place, but save the cube when returned! 

388 _realization_callback(cube) 

389 _um_normalise_callback(cube) 

390 _lfric_normalise_callback(cube) 

391 cube = _lfric_time_coord_fix_callback(cube) 

392 _normalise_var0_varname(cube) 

393 cube = _fix_no_spatial_coords_callback(cube) 

394 _fix_spatial_coords_callback(cube) 

395 _fix_pressure_coord_callback(cube) 

396 _fix_um_radtime(cube) 

397 _fix_cell_methods(cube) 

398 cube = _convert_cube_units_callback(cube) 

399 cube = _grid_longitude_fix_callback(cube) 

400 _fix_lfric_cloud_base_altitude(cube) 

401 _proleptic_gregorian_fix(cube) 

402 _lfric_time_callback(cube) 

403 _lfric_forecast_period_callback(cube) 

404 cube = _fix_no_time_coords_callback(cube) 

405 _normalise_ML_varname(cube) 

406 return cube 

407 

408 

409def _realization_callback(cube): 

410 """Add a realization coordinate initialised to 0 if missing. 

411 

412 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

413 of the code. 

414 """ 

415 # Only add if realization coordinate does not exist. 

416 if not cube.coords("realization"): 

417 cube.add_aux_coord( 

418 iris.coords.DimCoord(0, standard_name="realization", units="1") 

419 ) 

420 

421 

422@functools.lru_cache(None) 

423def _log_once(msg, level=logging.WARNING): 

424 """Print a warning message, skipping recent duplicates.""" 

425 logging.log(level, msg) 

426 

427 

428def _um_normalise_callback(cube: iris.cube.Cube): 

429 """Normalise UM STASH variable long names to LFRic variable names. 

430 

431 Note standard names will remain associated with cubes where different. 

432 Long name will be used consistently in output filename and titles. 

433 """ 

434 # Convert STASH to LFRic variable name 

435 if "STASH" in cube.attributes: 

436 stash = cube.attributes["STASH"] 

437 try: 

438 (name, grid) = STASH_TO_LFRIC[str(stash)] 

439 cube.long_name = name 

440 except KeyError: 

441 # Don't change cubes with unknown stash codes. 

442 _log_once( 

443 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

444 level=logging.WARNING, 

445 ) 

446 

447 

448def _lfric_normalise_callback(cube: iris.cube.Cube): 

449 """Normalise attributes that prevents LFRic cube from merging. 

450 

451 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

452 no relation to the data contained. These attributes are removed. 

453 

454 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

455 multiple UM fields, however it can be encoded in any order. This attribute 

456 is sorted to prevent this. This attribute is only present in LFRic data that 

457 has been converted to look like UM data. 

458 """ 

459 # Remove unwanted attributes. 

460 cube.attributes.pop("timeStamp", None) 

461 cube.attributes.pop("uuid", None) 

462 cube.attributes.pop("name", None) 

463 cube.attributes.pop("source", None) 

464 cube.attributes.pop("analysis_source", None) 

465 cube.attributes.pop("history", None) 

466 

467 # Sort STASH code list. 

468 stash_list = cube.attributes.get("um_stash_source") 

469 if stash_list: 

470 # Parse the string as a list, sort, then re-encode as a string. 

471 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

472 

473 

474def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

475 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

476 

477 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

478 issue, though the coordinate satisfies all the properties for a DimCoord. 

479 Scalar time values are left as AuxCoords. 

480 """ 

481 # This issue seems to come from iris's handling of NetCDF files where time 

482 # always ends up as an AuxCoord. 

483 if cube.coords("time"): 

484 time_coord = cube.coord("time") 

485 if ( 

486 not isinstance(time_coord, iris.coords.DimCoord) 

487 and len(cube.coord_dims(time_coord)) == 1 

488 ): 

489 # Fudge the bounds to foil checking for strict monotonicity. 

490 if time_coord.has_bounds(): 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

492 time_coord.bounds = [ 

493 [ 

494 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

495 time_coord.bounds[i][1], 

496 ] 

497 for i in range(len(time_coord.bounds)) 

498 ] 

499 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

500 return cube 

501 

502 

503def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

504 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

505 

506 This is necessary if comparing two models with different conventions -- 

507 for example, models where the prime meridian is defined as 0 deg or 

508 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

509 so that it falls in this range. Checks are for near-180 bounds given 

510 model data bounds may not extend exactly to 0. or 360. 

511 Input cubes on non-rotated grid coordinates are not impacted. 

512 """ 

513 try: 

514 y, x = get_cube_yxcoordname(cube) 

515 except ValueError: 

516 # Don't modify non-spatial cubes. 

517 return cube 

518 

519 long_coord = cube.coord(x) 

520 # Wrap longitudes if rotated pole coordinates 

521 coord_system = long_coord.coord_system 

522 if x == "grid_longitude" and isinstance( 

523 coord_system, iris.coord_systems.RotatedGeogCS 

524 ): 

525 long_points = long_coord.points.copy() 

526 long_centre = np.median(long_points) 

527 while long_centre < -175.0: 

528 long_centre += 360.0 

529 long_points += 360.0 

530 while long_centre >= 175.0: 

531 long_centre -= 360.0 

532 long_points -= 360.0 

533 long_coord.points = long_points 

534 

535 # Update coord bounds to be consistent with wrapping. 

536 if long_coord.has_bounds(): 

537 long_coord.bounds = None 

538 long_coord.guess_bounds() 

539 

540 return cube 

541 

542 

543def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

544 import CSET.operators._utils as utils 

545 

546 # Don't modify spatial cubes that already have spatial dimensions 

547 if utils.is_spatialdim(cube): 

548 return cube 

549 

550 else: 

551 # attempt to get lat/long from cube attributes 

552 try: 

553 lat_min = cube.attributes.get("geospatial_lat_min") 

554 lat_max = cube.attributes.get("geospatial_lat_max") 

555 lon_min = cube.attributes.get("geospatial_lon_min") 

556 lon_max = cube.attributes.get("geospatial_lon_max") 

557 

558 lon_val = (lon_min + lon_max) / 2.0 

559 lat_val = (lat_min + lat_max) / 2.0 

560 

561 lat_coord = iris.coords.DimCoord( 

562 lat_val, 

563 standard_name="latitude", 

564 units="degrees_north", 

565 var_name="latitude", 

566 coord_system=iris.coord_systems.GeogCS(6371229.0), 

567 circular=True, 

568 ) 

569 

570 lon_coord = iris.coords.DimCoord( 

571 lon_val, 

572 standard_name="longitude", 

573 units="degrees_east", 

574 var_name="longitude", 

575 coord_system=iris.coord_systems.GeogCS(6371229.0), 

576 circular=True, 

577 ) 

578 

579 cube.add_aux_coord(lat_coord) 

580 cube.add_aux_coord(lon_coord) 

581 return cube 

582 

583 # if lat/long are not in attributes, then return cube unchanged: 

584 except TypeError: 

585 return cube 

586 

587 

588def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

589 """Check latitude and longitude coordinates name. 

590 

591 This is necessary as some models define their grid as on rotated 

592 'grid_latitude' and 'grid_longitude' coordinates while others define 

593 the grid on non-rotated 'latitude' and 'longitude'. 

594 Cube dimensions need to be made consistent to avoid recipe failures, 

595 particularly where comparing multiple input models with differing spatial 

596 coordinates. 

597 """ 

598 # Check if cube is spatial. 

599 if not is_spatialdim(cube): 

600 # Don't modify non-spatial cubes. 

601 return 

602 

603 # Get spatial coords and dimension index. 

604 y_name, x_name = get_cube_yxcoordname(cube) 

605 ny = get_cube_coordindex(cube, y_name) 

606 nx = get_cube_coordindex(cube, x_name) 

607 

608 # Remove spatial coords bounds if erroneous values detected. 

609 # Aims to catch some errors in input coord bounds by setting 

610 # invalid threshold of 10000.0 

611 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

612 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

613 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

614 if bx_max > 10000.0 or by_max > 10000.0: 

615 cube.coord(x_name).bounds = None 

616 cube.coord(y_name).bounds = None 

617 

618 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

619 # [latitude, longitude] for instances where rotated_pole=90.0 

620 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

621 coord_system = cube.coord("grid_latitude").coord_system 

622 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

623 if pole_lat == 90.0: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 lats = cube.coord("grid_latitude").points 

625 lons = cube.coord("grid_longitude").points 

626 

627 cube.remove_coord("grid_latitude") 

628 cube.add_dim_coord( 

629 iris.coords.DimCoord( 

630 lats, 

631 standard_name="latitude", 

632 var_name="latitude", 

633 units="degrees", 

634 coord_system=iris.coord_systems.GeogCS(6371229.0), 

635 circular=True, 

636 ), 

637 ny, 

638 ) 

639 y_name = "latitude" 

640 cube.remove_coord("grid_longitude") 

641 cube.add_dim_coord( 

642 iris.coords.DimCoord( 

643 lons, 

644 standard_name="longitude", 

645 var_name="longitude", 

646 units="degrees", 

647 coord_system=iris.coord_systems.GeogCS(6371229.0), 

648 circular=True, 

649 ), 

650 nx, 

651 ) 

652 x_name = "longitude" 

653 

654 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

655 # rotated pole attributes for cases with [lat, lon] inputs 

656 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

657 "degrees", 

658 "degrees_north", 

659 "degrees_south", 

660 ]: 

661 # Add grid_latitude AuxCoord 

662 if "grid_latitude" not in [ 662 ↛ 675line 662 didn't jump to line 675 because the condition on line 662 was always true

663 coord.name() for coord in cube.coords(dim_coords=False) 

664 ]: 

665 cube.add_aux_coord( 

666 iris.coords.AuxCoord( 

667 cube.coord(y_name).points, 

668 var_name="grid_latitude", 

669 units="degrees", 

670 ), 

671 ny, 

672 ) 

673 # Ensure input latitude DimCoord has CoordSystem 

674 # This attribute is sometimes lost on iris.save 

675 if not cube.coord(y_name).coord_system: 

676 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

677 

678 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

679 "degrees", 

680 "degrees_west", 

681 "degrees_east", 

682 ]: 

683 # Add grid_longitude AuxCoord 

684 if "grid_longitude" not in [ 684 ↛ 698line 684 didn't jump to line 698 because the condition on line 684 was always true

685 coord.name() for coord in cube.coords(dim_coords=False) 

686 ]: 

687 cube.add_aux_coord( 

688 iris.coords.AuxCoord( 

689 cube.coord(x_name).points, 

690 var_name="grid_longitude", 

691 units="degrees", 

692 ), 

693 nx, 

694 ) 

695 

696 # Ensure input longitude DimCoord has CoordSystem 

697 # This attribute is sometimes lost on iris.save 

698 if not cube.coord(x_name).coord_system: 

699 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

700 

701 

702def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

703 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

704 

705 This problem was raised because the AIFS model data from ECMWF 

706 defines the pressure coordinate with the name "pressure_level" rather 

707 than compliant CF coordinate names. 

708 

709 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

710 and approach the coordinates in a unified way. 

711 """ 

712 for coord in cube.dim_coords: 

713 if coord.name() in ["pressure_level", "pressure_levels"]: 

714 coord.rename("pressure") 

715 

716 if coord.name() == "pressure": 

717 if str(cube.coord("pressure").units) != "hPa": 

718 cube.coord("pressure").convert_units("hPa") 

719 

720 

721def _fix_um_radtime(cube: iris.cube.Cube): 

722 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

723 

724 This callback does not have any effect for output diagnostics with 

725 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

726 diagnostics are checked. 

727 Note this callback does not interpolate the data in time, only adjust 

728 timestamps to sit on the hour to enable time-to-time difference plotting 

729 with models which may output radiation data on the hour. 

730 """ 

731 try: 

732 if cube.attributes["STASH"] in [ 

733 "m01s01i207", 

734 "m01s01i208", 

735 "m01s02i205", 

736 "m01s02i201", 

737 "m01s01i207", 

738 "m01s02i207", 

739 "m01s01i235", 

740 ]: 

741 time_coord = cube.coord("time") 

742 

743 # Convert time points to datetime objects 

744 time_unit = time_coord.units 

745 time_points = time_unit.num2date(time_coord.points) 

746 # Skip if times don't need fixing. 

747 if time_points[0].minute == 0 and time_points[0].second == 0: 

748 return 

749 if time_points[0].minute == 30 and time_points[0].second == 0: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 return 

751 

752 # Subtract time difference from the hour from each time point 

753 n_minute = time_points[0].minute 

754 n_second = time_points[0].second 

755 # If times closer to next hour, compute difference to add on to following hour 

756 if n_minute > 30: 

757 n_minute = n_minute - 60 

758 # Compute new diagnostic time stamp 

759 new_time_points = ( 

760 time_points 

761 - datetime.timedelta(minutes=n_minute) 

762 - datetime.timedelta(seconds=n_second) 

763 ) 

764 

765 # Convert back to numeric values using the original time unit. 

766 new_time_values = time_unit.date2num(new_time_points) 

767 

768 # Replace the time coordinate with updated values. 

769 time_coord.points = new_time_values 

770 

771 # Recompute forecast_period with corrected values. 

772 if cube.coord("forecast_period"): 772 ↛ exitline 772 didn't return from function '_fix_um_radtime' because the condition on line 772 was always true

773 fcst_prd_points = cube.coord("forecast_period").points 

774 new_fcst_points = ( 

775 time_unit.num2date(fcst_prd_points) 

776 - datetime.timedelta(minutes=n_minute) 

777 - datetime.timedelta(seconds=n_second) 

778 ) 

779 cube.coord("forecast_period").points = time_unit.date2num( 

780 new_fcst_points 

781 ) 

782 except KeyError: 

783 pass 

784 

785 

786def _fix_cell_methods(cube: iris.cube.Cube): 

787 """To fix the assumed cell_methods in accumulation STASH from UM. 

788 

789 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

790 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

791 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

792 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

793 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

794 enabling the cell_method constraint on reading to select correct input. 

795 """ 

796 # Shift "mean" cell_method to "sum" for selected UM inputs. 

797 if cube.attributes.get("STASH") in [ 

798 "m01s21i104", 

799 "m01s04i201", 

800 "m01s04i202", 

801 "m01s05i201", 

802 "m01s05i202", 

803 ]: 

804 # Check if input cell_method contains "mean" time-processing. 

805 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 805 ↛ exitline 805 didn't return from function '_fix_cell_methods' because the condition on line 805 was always true

806 # Retrieve interval and any comment information. 

807 for cell_method in cube.cell_methods: 

808 interval_str = cell_method.intervals 

809 comment_str = cell_method.comments 

810 

811 # Remove input aggregation method. 

812 cube.cell_methods = () 

813 

814 # Replace "mean" with "sum" cell_method to indicate aggregation. 

815 cube.add_cell_method( 

816 iris.coords.CellMethod( 

817 method="sum", 

818 coords="time", 

819 intervals=interval_str, 

820 comments=comment_str, 

821 ) 

822 ) 

823 

824 

825def _convert_cube_units_callback(cube: iris.cube.Cube): 

826 """Adjust diagnostic units for specific variables. 

827 

828 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

829 converted here to mm hr-1. 

830 

831 Visibility diagnostics are converted here from m to km to improve output 

832 formatting. 

833 """ 

834 # Convert precipitation diagnostic units if required. 

835 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

836 if any("surface_microphysical" in name for name in varnames): 

837 if cube.units == "kg m-2 s-1": 

838 _log_once( 

839 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

840 level=logging.DEBUG, 

841 ) 

842 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

843 # This is a 1:1 conversion, so we just change the units. 

844 cube.units = "mm s-1" 

845 # Convert the units to per hour. 

846 cube.convert_units("mm hr-1") 

847 elif cube.units == "kg m-2": 847 ↛ 857line 847 didn't jump to line 857 because the condition on line 847 was always true

848 _log_once( 

849 "Converting precipitation amount units from kg m-2 to mm", 

850 level=logging.DEBUG, 

851 ) 

852 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

853 # This is a 1:1 conversion, so we just change the units. 

854 cube.units = "mm" 

855 

856 # Convert visibility diagnostic units if required. 

857 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

858 if any("visibility" in name for name in varnames): 

859 if cube.units == "m": 859 ↛ 864line 859 didn't jump to line 864 because the condition on line 859 was always true

860 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

861 # Convert the units to km. 

862 cube.convert_units("km") 

863 

864 return cube 

865 

866 

867def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

868 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

869 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

870 if any("cloud_base_altitude" in name for name in varnames): 

871 # Mask cube where set > 144kft to catch default 144.35695538058164 

872 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

873 

874 

875def _fix_um_winds(cubes: iris.cube.CubeList): 

876 """To make winds from the UM consistent with those from LFRic. 

877 

878 Diagnostics of wind are not always consistent between the UM 

879 and LFric. Here, winds from the UM are adjusted to make them i 

880 consistent with LFRic. 

881 """ 

882 # Check whether we have components of the wind identified by STASH, 

883 # (so this will apply only to cubes from the UM), but not the 

884 # wind speed and calculate it if it is missing. Note that 

885 # this will be biased low in general because the components will mostly 

886 # be time averages. For simplicity, we do this only if there is just one 

887 # cube of a component. A more complicated approach would be to consider 

888 # the cell methods, but it may not be warranted. 

889 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

890 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

891 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

892 try: 

893 if cubes.extract(u_constr) and cubes.extract(v_constr): 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true

894 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

895 _add_wind_speed_um(cubes) 

896 # Convert winds in the UM to be relative to true east and true north. 

897 _convert_wind_true_dirn_um(cubes) 

898 except (KeyError, AttributeError): 

899 pass 

900 

901 

902def _add_wind_speed_um(cubes: iris.cube.CubeList): 

903 """Add windspeeds to cubes from the UM.""" 

904 wspd10 = ( 

905 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

906 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

907 ) ** 0.5 

908 wspd10.attributes["STASH"] = "m01s03i227" 

909 wspd10.standard_name = "wind_speed" 

910 wspd10.long_name = "wind_speed_at_10m" 

911 cubes.append(wspd10) 

912 

913 

914def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

915 """To convert winds to true directions. 

916 

917 Convert from the components relative to the grid to true directions. 

918 This functionality only handles the simplest case. 

919 """ 

920 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

921 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

922 for u, v in zip(u_grids, v_grids, strict=True): 

923 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

924 u.data = true_u.core_data() 

925 v.data = true_v.core_data() 

926 

927 

928def _normalise_var0_varname(cube: iris.cube.Cube): 

929 """Fix varnames for consistency to allow merging. 

930 

931 Some model data netCDF sometimes have a coordinate name end in 

932 "_0" etc, where duplicate coordinates of same name are defined but 

933 with different attributes. This can be inconsistently managed in 

934 different model inputs and can cause cubes to fail to merge. 

935 """ 

936 for coord in cube.coords(): 

937 if coord.var_name and coord.var_name.endswith("_0"): 

938 coord.var_name = coord.var_name.removesuffix("_0") 

939 if coord.var_name and coord.var_name.endswith("_1"): 

940 coord.var_name = coord.var_name.removesuffix("_1") 

941 if coord.var_name and coord.var_name.endswith("_2"): 941 ↛ 942line 941 didn't jump to line 942 because the condition on line 941 was never true

942 coord.var_name = coord.var_name.removesuffix("_2") 

943 if coord.var_name and coord.var_name.endswith("_3"): 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true

944 coord.var_name = coord.var_name.removesuffix("_3") 

945 

946 if cube.var_name and cube.var_name.endswith("_0"): 

947 cube.var_name = cube.var_name.removesuffix("_0") 

948 

949 

950def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

951 """Convert the calendars of time units to use a standard calendar.""" 

952 try: 

953 time_coord = cube.coord("time") 

954 if time_coord.units.calendar == "proleptic_gregorian": 

955 logging.debug( 

956 "Changing proleptic Gregorian calendar to standard calendar for %s", 

957 repr(time_coord.units), 

958 ) 

959 time_coord.units = time_coord.units.change_calendar("standard") 

960 except iris.exceptions.CoordinateNotFoundError: 

961 pass 

962 

963 

964def _lfric_time_callback(cube: iris.cube.Cube): 

965 """Fix time coordinate metadata if missing dimensions. 

966 

967 Some model data does not contain forecast_reference_time or forecast_period as 

968 expected coordinates, and so we cannot aggregate over case studies without this 

969 metadata. This callback fixes these issues. 

970 

971 This callback also ensures all time coordinates are referenced as hours since 

972 1970-01-01 00:00:00 for consistency across different model inputs. 

973 

974 Notes 

975 ----- 

976 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

977 """ 

978 # Construct forecast_reference time if it doesn't exist. 

979 try: 

980 tcoord = cube.coord("time") 

981 # Set time coordinate to common basis "hours since 1970" 

982 try: 

983 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

984 except ValueError: 

985 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

986 

987 if not cube.coords("forecast_reference_time"): 

988 try: 

989 init_time = datetime.datetime.fromisoformat( 

990 tcoord.attributes["time_origin"] 

991 ) 

992 frt_point = tcoord.units.date2num(init_time) 

993 frt_coord = iris.coords.AuxCoord( 

994 frt_point, 

995 units=tcoord.units, 

996 standard_name="forecast_reference_time", 

997 long_name="forecast_reference_time", 

998 ) 

999 cube.add_aux_coord(frt_coord) 

1000 except KeyError: 

1001 logging.warning( 

1002 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1003 ) 

1004 

1005 # Remove time_origin to allow multiple case studies to merge. 

1006 tcoord.attributes.pop("time_origin", None) 

1007 

1008 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1009 if not cube.coords("forecast_period"): 

1010 try: 

1011 # Create array of forecast lead times. 

1012 init_coord = cube.coord("forecast_reference_time") 

1013 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1014 init_coord.units.num2date(init_coord.points) 

1015 ) 

1016 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1017 

1018 # Get unit for lead time from time coordinate's unit. 

1019 # Convert all lead time to hours for consistency between models. 

1020 if "seconds" in str(tcoord.units): 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true

1021 lead_times = lead_times / 3600.0 

1022 units = "hours" 

1023 elif "hours" in str(tcoord.units): 1023 ↛ 1026line 1023 didn't jump to line 1026 because the condition on line 1023 was always true

1024 units = "hours" 

1025 else: 

1026 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1027 

1028 # Create lead time coordinate. 

1029 lead_time_coord = iris.coords.AuxCoord( 

1030 lead_times, 

1031 standard_name="forecast_period", 

1032 long_name="forecast_period", 

1033 units=units, 

1034 ) 

1035 

1036 # Associate lead time coordinate with time dimension. 

1037 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1038 except iris.exceptions.CoordinateNotFoundError: 

1039 logging.warning( 

1040 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1041 ) 

1042 except iris.exceptions.CoordinateNotFoundError: 

1043 logging.warning("No time coordinate on cube.") 

1044 

1045 

1046def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1047 """Check forecast_period name and units.""" 

1048 try: 

1049 coord = cube.coord("forecast_period") 

1050 if coord.units != "hours": 

1051 cube.coord("forecast_period").convert_units("hours") 

1052 if not coord.standard_name: 

1053 coord.standard_name = "forecast_period" 

1054 except iris.exceptions.CoordinateNotFoundError: 

1055 pass 

1056 

1057 

1058def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1059 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1060 # Only add if time coordinate does not exist. 

1061 if not cube.coords("time"): 

1062 cube.add_aux_coord( 

1063 iris.coords.DimCoord( 

1064 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1065 ) 

1066 ) 

1067 

1068 return cube 

1069 

1070 

1071def _normalise_ML_varname(cube: iris.cube.Cube): 

1072 """Fix plev variable names to standard names.""" 

1073 if cube.coords("pressure"): 

1074 if cube.name() == "x_wind": 

1075 cube.long_name = "zonal_wind_at_pressure_levels" 

1076 if cube.name() == "y_wind": 

1077 cube.long_name = "meridional_wind_at_pressure_levels" 

1078 if cube.name() == "air_temperature": 

1079 cube.long_name = "temperature_at_pressure_levels" 

1080 if cube.name() == "specific_humidity": 1080 ↛ 1081line 1080 didn't jump to line 1081 because the condition on line 1080 was never true

1081 cube.long_name = ( 

1082 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1083 ) 

1084 

1085 

1086def _check_combine_point_observations(cubes: iris.cube.CubeList): 

1087 """Enable cubes containing different point observation sources to be concatenated.""" 

1088 nstation = 0 

1089 for cube in cubes: 

1090 if "station" in [coord.name() for coord in cube.coords(dim_coords=True)]: 1090 ↛ 1091line 1090 didn't jump to line 1091 because the condition on line 1090 was never true

1091 if "obs_source" in [coord.name() for coord in cube.coords()]: 

1092 cube.remove_coord("obs_source") 

1093 cube.coord("station").points = cube.coord("station").points + nstation 

1094 nstation = nstation + len(cube.coord("station").points) 

1095 

1096 return cubes