Coverage for src / CSET / operators / read.py: 90%

392 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 15:04 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43 

44 

45class NoDataError(FileNotFoundError): 

46 """Error that no data has been loaded.""" 

47 

48 

49def read_cube( 

50 file_paths: list[str] | str, 

51 constraint: iris.Constraint = None, 

52 model_names: list[str] | str | None = None, 

53 subarea_type: str = None, 

54 subarea_extent: list[float] = None, 

55 **kwargs, 

56) -> iris.cube.Cube: 

57 """Read a single cube from files. 

58 

59 Read operator that takes a path string (can include shell-style glob 

60 patterns), and loads the cube matching the constraint. If any paths point to 

61 directory, all the files contained within are loaded. 

62 

63 Ensemble data can also be loaded. If it has a realization coordinate 

64 already, it will be directly used. If not, it will have its member number 

65 guessed from the filename, based on one of several common patterns. For 

66 example the pattern *emXX*, where XX is the realization. 

67 

68 Deterministic data will be loaded with a realization of 0, allowing it to be 

69 processed in the same way as ensemble data. 

70 

71 Arguments 

72 --------- 

73 file_paths: str | list[str] 

74 Path or paths to where .pp/.nc files are located 

75 constraint: iris.Constraint | iris.ConstraintCombination, optional 

76 Constraints to filter data by. Defaults to unconstrained. 

77 model_names: str | list[str], optional 

78 Names of the models that correspond to respective paths in file_paths. 

79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

80 Whether to constrain data by model relative coordinates or real world 

81 coordinates. 

82 subarea_extent: list, optional 

83 List of coordinates to constraint data by, in order lower latitude, 

84 upper latitude, lower longitude, upper longitude. 

85 

86 Returns 

87 ------- 

88 cubes: iris.cube.Cube 

89 Cube loaded 

90 

91 Raises 

92 ------ 

93 FileNotFoundError 

94 If the provided path does not exist 

95 ValueError 

96 If the constraint doesn't produce a single cube. 

97 """ 

98 cubes = read_cubes( 

99 file_paths=file_paths, 

100 constraint=constraint, 

101 model_names=model_names, 

102 subarea_type=subarea_type, 

103 subarea_extent=subarea_extent, 

104 ) 

105 # Check filtered cubes is a CubeList containing one cube. 

106 if len(cubes) == 1: 

107 return cubes[0] 

108 else: 

109 raise ValueError( 

110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

111 ) 

112 

113 

114def read_cubes( 

115 file_paths: list[str] | str, 

116 constraint: iris.Constraint | None = None, 

117 model_names: str | list[str] | None = None, 

118 subarea_type: str = None, 

119 subarea_extent: list = None, 

120 **kwargs, 

121) -> iris.cube.CubeList: 

122 """Read cubes from files. 

123 

124 Read operator that takes a path string (can include shell-style glob 

125 patterns), and loads the cubes matching the constraint. If any paths point 

126 to directory, all the files contained within are loaded. 

127 

128 Ensemble data can also be loaded. If it has a realization coordinate 

129 already, it will be directly used. If not, it will have its member number 

130 guessed from the filename, based on one of several common patterns. For 

131 example the pattern *emXX*, where XX is the realization. 

132 

133 Deterministic data will be loaded with a realization of 0, allowing it to be 

134 processed in the same way as ensemble data. 

135 

136 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

137 that the cubes merge across files. 

138 

139 Arguments 

140 --------- 

141 file_paths: str | list[str] 

142 Path or paths to where .pp/.nc files are located. Can include globs. 

143 constraint: iris.Constraint | iris.ConstraintCombination, optional 

144 Constraints to filter data by. Defaults to unconstrained. 

145 model_names: str | list[str], optional 

146 Names of the models that correspond to respective paths in file_paths. 

147 subarea_type: str, optional 

148 Whether to constrain data by model relative coordinates or real world 

149 coordinates. 

150 subarea_extent: list[float], optional 

151 List of coordinates to constraint data by, in order lower latitude, 

152 upper latitude, lower longitude, upper longitude. 

153 

154 Returns 

155 ------- 

156 cubes: iris.cube.CubeList 

157 Cubes loaded after being merged and concatenated. 

158 

159 Raises 

160 ------ 

161 FileNotFoundError 

162 If the provided path does not exist 

163 """ 

164 # Get iterable of paths. Each path corresponds to 1 model. 

165 paths = iter_maybe(file_paths) 

166 model_names = iter_maybe(model_names) 

167 

168 # Check we have appropriate number of model names. 

169 if model_names != (None,) and len(model_names) != len(paths): 

170 raise ValueError( 

171 f"The number of model names ({len(model_names)}) should equal " 

172 f"the number of paths given ({len(paths)})." 

173 ) 

174 

175 # Load the data for each model into a CubeList per model. 

176 model_cubes = ( 

177 _load_model(path, name, constraint) 

178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

179 ) 

180 

181 # Split out first model's cubes and mark it as the base for comparisons. 

182 cubes = next(model_cubes) 

183 for cube in cubes: 

184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

185 cube.attributes["cset_comparison_base"] = 1 

186 

187 # Load the rest of the models. 

188 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

189 

190 # Unify time units so different case studies can merge. 

191 iris.util.unify_time_units(cubes) 

192 

193 # Select sub region. 

194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

195 

196 # Merge and concatenate cubes now metadata has been fixed. 

197 cubes = cubes.merge() 

198 cubes = cubes.concatenate() 

199 

200 # Squeeze single valued coordinates into scalar coordinates. 

201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

202 

203 # Ensure dimension coordinates are bounded. 

204 for cube in cubes: 

205 for dim_coord in cube.coords(dim_coords=True): 

206 # Iris can't guess the bounds of a scalar coordinate. 

207 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

208 dim_coord.guess_bounds() 

209 

210 logging.info("Loaded cubes: %s", cubes) 

211 if len(cubes) == 0: 

212 raise NoDataError("No cubes loaded, check your constraints!") 

213 return cubes 

214 

215 

216def _load_model( 

217 paths: str | list[str], 

218 model_name: str | None, 

219 constraint: iris.Constraint | None, 

220) -> iris.cube.CubeList: 

221 """Load a single model's data into a CubeList.""" 

222 input_files = _check_input_files(paths) 

223 # If unset, a constraint of None lets everything be loaded. 

224 logging.debug("Constraint: %s", constraint) 

225 cubes = iris.load(input_files, constraint, callback=_loading_callback) 

226 # Make the UM's winds consistent with LFRic. 

227 _fix_um_winds(cubes) 

228 

229 # Add model_name attribute to each cube to make it available at any further 

230 # step without needing to pass it as function parameter. 

231 if model_name is not None: 

232 for cube in cubes: 

233 cube.attributes["model_name"] = model_name 

234 return cubes 

235 

236 

237def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

238 """Get an iterable of files to load, and check that they all exist. 

239 

240 Arguments 

241 --------- 

242 input_paths: list[str] 

243 List of paths to input files or directories. The path may itself contain 

244 glob patterns, but unlike in shells it will match directly first. 

245 

246 Returns 

247 ------- 

248 list[Path] 

249 A list of files to load. 

250 

251 Raises 

252 ------ 

253 FileNotFoundError: 

254 If the provided arguments don't resolve to at least one existing file. 

255 """ 

256 files = [] 

257 for raw_filename in iter_maybe(input_paths): 

258 # Match glob-like files first, if they exist. 

259 raw_path = Path(raw_filename) 

260 if raw_path.is_file(): 

261 files.append(raw_path) 

262 else: 

263 for input_path in glob.glob(raw_filename): 

264 # Convert string paths into Path objects. 

265 input_path = Path(input_path) 

266 # Get the list of files in the directory, or use it directly. 

267 if input_path.is_dir(): 

268 logging.debug("Checking directory '%s' for files", input_path) 

269 files.extend(p for p in input_path.iterdir() if p.is_file()) 

270 else: 

271 files.append(input_path) 

272 

273 files.sort() 

274 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

275 if len(files) == 0: 

276 raise FileNotFoundError(f"No files found for {input_paths}") 

277 return files 

278 

279 

280def _cutout_cubes( 

281 cubes: iris.cube.CubeList, 

282 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

283 subarea_extent: list[float], 

284): 

285 """Cut out a subarea from a CubeList.""" 

286 if subarea_type is None: 

287 logging.debug("Subarea selection is disabled.") 

288 return cubes 

289 

290 # If selected, cutout according to number of grid cells to trim from each edge. 

291 cutout_cubes = iris.cube.CubeList() 

292 # Find spatial coordinates 

293 for cube in cubes: 

294 # Find dimension coordinates. 

295 lat_name, lon_name = get_cube_yxcoordname(cube) 

296 

297 # Compute cutout based on number of cells to trim from edges. 

298 if subarea_type == "gridcells": 

299 logging.debug( 

300 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

301 subarea_extent[0], 

302 subarea_extent[1], 

303 subarea_extent[2], 

304 subarea_extent[3], 

305 ) 

306 lat_points = np.sort(cube.coord(lat_name).points) 

307 lon_points = np.sort(cube.coord(lon_name).points) 

308 # Define cutout region using user provided cell points. 

309 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

310 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

311 

312 # Compute cutout based on specified coordinate values. 

313 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

314 # If not gridcells, cutout by requested geographic area, 

315 logging.debug( 

316 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

317 subarea_extent[0], 

318 subarea_extent[1], 

319 subarea_extent[2], 

320 subarea_extent[3], 

321 ) 

322 # Define cutout region using user provided coordinates. 

323 lats = np.array(subarea_extent[0:2]) 

324 lons = np.array(subarea_extent[2:4]) 

325 # Ensure cutout longitudes are within +/- 180.0 bounds. 

326 while lons[0] < -180.0: 

327 lons += 360.0 

328 while lons[1] > 180.0: 

329 lons -= 360.0 

330 # If the coordinate system is rotated we convert coordinates into 

331 # model-relative coordinates to extract the appropriate cutout. 

332 coord_system = cube.coord(lat_name).coord_system 

333 if subarea_type == "realworld" and isinstance( 

334 coord_system, iris.coord_systems.RotatedGeogCS 

335 ): 

336 lons, lats = rotate_pole( 

337 lons, 

338 lats, 

339 pole_lon=coord_system.grid_north_pole_longitude, 

340 pole_lat=coord_system.grid_north_pole_latitude, 

341 ) 

342 else: 

343 raise ValueError("Unknown subarea_type:", subarea_type) 

344 

345 # Do cutout and add to cutout_cubes. 

346 intersection_args = {lat_name: lats, lon_name: lons} 

347 logging.debug("Cutting out coords: %s", intersection_args) 

348 try: 

349 cutout_cubes.append(cube.intersection(**intersection_args)) 

350 except IndexError as err: 

351 raise ValueError( 

352 "Region cutout error. Check and update SUBAREA_EXTENT." 

353 "Cutout region requested should be contained within data area. " 

354 "Also check if cutout region requested is smaller than input grid spacing." 

355 ) from err 

356 

357 return cutout_cubes 

358 

359 

360def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

361 """Compose together the needed callbacks into a single function.""" 

362 # Most callbacks operate in-place, but save the cube when returned! 

363 _realization_callback(cube) 

364 _um_normalise_callback(cube) 

365 _lfric_normalise_callback(cube) 

366 _nimrod_normalise_callback(cube) 

367 cube = _lfric_time_coord_fix_callback(cube) 

368 _normalise_var0_varname(cube) 

369 cube = _fix_no_spatial_coords_callback(cube) 

370 _fix_spatial_coords_callback(cube) 

371 _fix_pressure_coord_callback(cube) 

372 _fix_um_radtime(cube) 

373 _fix_cell_methods(cube) 

374 cube = _convert_cube_units_callback(cube) 

375 cube = _grid_longitude_fix_callback(cube) 

376 _fix_lfric_cloud_base_altitude(cube) 

377 _proleptic_gregorian_fix(cube) 

378 _lfric_time_callback(cube) 

379 _lfric_forecast_period_callback(cube) 

380 _normalise_ML_varname(cube) 

381 return cube 

382 

383 

384def _realization_callback(cube): 

385 """Give deterministic cubes a realization of 0. 

386 

387 This means they can be handled in the same way as ensembles through the rest 

388 of the code. 

389 """ 

390 # Only add if realization coordinate does not exist. 

391 if not cube.coords("realization"): 

392 cube.add_aux_coord( 

393 iris.coords.DimCoord(0, standard_name="realization", units="1") 

394 ) 

395 

396 

397@functools.lru_cache(None) 

398def _log_once(msg, level=logging.WARNING): 

399 """Print a warning message, skipping recent duplicates.""" 

400 logging.log(level, msg) 

401 

402 

403def _um_normalise_callback(cube: iris.cube.Cube): 

404 """Normalise UM STASH variable long names to LFRic variable names. 

405 

406 Note standard names will remain associated with cubes where different. 

407 Long name will be used consistently in output filename and titles. 

408 """ 

409 # Convert STASH to LFRic variable name 

410 if "STASH" in cube.attributes: 

411 stash = cube.attributes["STASH"] 

412 try: 

413 (name, grid) = STASH_TO_LFRIC[str(stash)] 

414 cube.long_name = name 

415 except KeyError: 

416 # Don't change cubes with unknown stash codes. 

417 _log_once( 

418 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

419 level=logging.WARNING, 

420 ) 

421 

422 

423def _lfric_normalise_callback(cube: iris.cube.Cube): 

424 """Normalise attributes that prevents LFRic cube from merging. 

425 

426 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

427 no relation to the data contained. These attributes are removed. 

428 

429 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

430 multiple UM fields, however it can be encoded in any order. This attribute 

431 is sorted to prevent this. This attribute is only present in LFRic data that 

432 has been converted to look like UM data. 

433 """ 

434 # Remove unwanted attributes. 

435 cube.attributes.pop("timeStamp", None) 

436 cube.attributes.pop("uuid", None) 

437 cube.attributes.pop("name", None) 

438 cube.attributes.pop("source", None) 

439 cube.attributes.pop("analysis_source", None) 

440 cube.attributes.pop("history", None) 

441 

442 # Sort STASH code list. 

443 stash_list = cube.attributes.get("um_stash_source") 

444 if stash_list: 

445 # Parse the string as a list, sort, then re-encode as a string. 

446 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

447 

448 

449def _nimrod_normalise_callback(cube: iris.cube.Cube): 

450 """Normalise attributes that prevents NIMROD radar cubes from merging.""" 

451 # Remove unwanted attributes. 

452 cube.attributes.pop("radar_sites", None) 

453 cube.attributes.pop("additional_radar_sites", None) 

454 cube.attributes.pop("recursive_filter_iterations", None) 

455 cube.attributes.pop("Probability methods", None) 

456 

457 

458def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

459 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

460 

461 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

462 issue, though the coordinate satisfies all the properties for a DimCoord. 

463 Scalar time values are left as AuxCoords. 

464 """ 

465 # This issue seems to come from iris's handling of NetCDF files where time 

466 # always ends up as an AuxCoord. 

467 if cube.coords("time"): 

468 time_coord = cube.coord("time") 

469 if ( 

470 not isinstance(time_coord, iris.coords.DimCoord) 

471 and len(cube.coord_dims(time_coord)) == 1 

472 ): 

473 # Fudge the bounds to foil checking for strict monotonicity. 

474 if time_coord.has_bounds(): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

476 time_coord.bounds = [ 

477 [ 

478 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

479 time_coord.bounds[i][1], 

480 ] 

481 for i in range(len(time_coord.bounds)) 

482 ] 

483 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

484 return cube 

485 

486 

487def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

488 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

489 

490 This is necessary if comparing two models with different conventions -- 

491 for example, models where the prime meridian is defined as 0 deg or 

492 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

493 so that it falls in this range. Checks are for near-180 bounds given 

494 model data bounds may not extend exactly to 0. or 360. 

495 Input cubes on non-rotated grid coordinates are not impacted. 

496 """ 

497 try: 

498 y, x = get_cube_yxcoordname(cube) 

499 except ValueError: 

500 # Don't modify non-spatial cubes. 

501 return cube 

502 

503 long_coord = cube.coord(x) 

504 # Wrap longitudes if rotated pole coordinates 

505 coord_system = long_coord.coord_system 

506 if x == "grid_longitude" and isinstance( 

507 coord_system, iris.coord_systems.RotatedGeogCS 

508 ): 

509 long_points = long_coord.points.copy() 

510 long_centre = np.median(long_points) 

511 while long_centre < -175.0: 

512 long_centre += 360.0 

513 long_points += 360.0 

514 while long_centre >= 175.0: 

515 long_centre -= 360.0 

516 long_points -= 360.0 

517 long_coord.points = long_points 

518 

519 # Update coord bounds to be consistent with wrapping. 

520 if long_coord.has_bounds(): 

521 long_coord.bounds = None 

522 long_coord.guess_bounds() 

523 

524 return cube 

525 

526 

527def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

528 import CSET.operators._utils as utils 

529 

530 # Don't modify spatial cubes that already have spatial dimensions 

531 if utils.is_spatialdim(cube): 

532 return cube 

533 

534 else: 

535 # attempt to get lat/long from cube attributes 

536 try: 

537 lat_min = cube.attributes.get("geospatial_lat_min") 

538 lat_max = cube.attributes.get("geospatial_lat_max") 

539 lon_min = cube.attributes.get("geospatial_lon_min") 

540 lon_max = cube.attributes.get("geospatial_lon_max") 

541 

542 lon_val = (lon_min + lon_max) / 2.0 

543 lat_val = (lat_min + lat_max) / 2.0 

544 

545 lat_coord = iris.coords.DimCoord( 

546 lat_val, 

547 standard_name="latitude", 

548 units="degrees_north", 

549 var_name="latitude", 

550 coord_system=iris.coord_systems.GeogCS(6371229.0), 

551 circular=True, 

552 ) 

553 

554 lon_coord = iris.coords.DimCoord( 

555 lon_val, 

556 standard_name="longitude", 

557 units="degrees_east", 

558 var_name="longitude", 

559 coord_system=iris.coord_systems.GeogCS(6371229.0), 

560 circular=True, 

561 ) 

562 

563 cube.add_aux_coord(lat_coord) 

564 cube.add_aux_coord(lon_coord) 

565 return cube 

566 

567 # if lat/long are not in attributes, then return cube unchanged: 

568 except TypeError: 

569 return cube 

570 

571 

572def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

573 """Check latitude and longitude coordinates name. 

574 

575 This is necessary as some models define their grid as on rotated 

576 'grid_latitude' and 'grid_longitude' coordinates while others define 

577 the grid on non-rotated 'latitude' and 'longitude'. 

578 Cube dimensions need to be made consistent to avoid recipe failures, 

579 particularly where comparing multiple input models with differing spatial 

580 coordinates. 

581 """ 

582 # Check if cube is spatial. 

583 if not is_spatialdim(cube): 

584 # Don't modify non-spatial cubes. 

585 return 

586 

587 # Get spatial coords and dimension index. 

588 y_name, x_name = get_cube_yxcoordname(cube) 

589 ny = get_cube_coordindex(cube, y_name) 

590 nx = get_cube_coordindex(cube, x_name) 

591 

592 # Remove spatial coords bounds if erroneous values detected. 

593 # Aims to catch some errors in input coord bounds by setting 

594 # invalid threshold of 10000.0 

595 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

596 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

597 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

598 if bx_max > 10000.0 or by_max > 10000.0: 

599 cube.coord(x_name).bounds = None 

600 cube.coord(y_name).bounds = None 

601 

602 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

603 # [latitude, longitude] for instances where rotated_pole=90.0 

604 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

605 coord_system = cube.coord("grid_latitude").coord_system 

606 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

607 if pole_lat == 90.0: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true

608 lats = cube.coord("grid_latitude").points 

609 lons = cube.coord("grid_longitude").points 

610 

611 cube.remove_coord("grid_latitude") 

612 cube.add_dim_coord( 

613 iris.coords.DimCoord( 

614 lats, 

615 standard_name="latitude", 

616 var_name="latitude", 

617 units="degrees", 

618 coord_system=iris.coord_systems.GeogCS(6371229.0), 

619 circular=True, 

620 ), 

621 ny, 

622 ) 

623 y_name = "latitude" 

624 cube.remove_coord("grid_longitude") 

625 cube.add_dim_coord( 

626 iris.coords.DimCoord( 

627 lons, 

628 standard_name="longitude", 

629 var_name="longitude", 

630 units="degrees", 

631 coord_system=iris.coord_systems.GeogCS(6371229.0), 

632 circular=True, 

633 ), 

634 nx, 

635 ) 

636 x_name = "longitude" 

637 

638 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

639 # rotated pole attributes for cases with [lat, lon] inputs 

640 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

641 "degrees", 

642 "degrees_north", 

643 "degrees_south", 

644 ]: 

645 # Add grid_latitude AuxCoord 

646 if "grid_latitude" not in [ 646 ↛ 659line 646 didn't jump to line 659 because the condition on line 646 was always true

647 coord.name() for coord in cube.coords(dim_coords=False) 

648 ]: 

649 cube.add_aux_coord( 

650 iris.coords.AuxCoord( 

651 cube.coord(y_name).points, 

652 var_name="grid_latitude", 

653 units="degrees", 

654 ), 

655 ny, 

656 ) 

657 # Ensure input latitude DimCoord has CoordSystem 

658 # This attribute is sometimes lost on iris.save 

659 if not cube.coord(y_name).coord_system: 

660 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

661 

662 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

663 "degrees", 

664 "degrees_west", 

665 "degrees_east", 

666 ]: 

667 # Add grid_longitude AuxCoord 

668 if "grid_longitude" not in [ 668 ↛ 682line 668 didn't jump to line 682 because the condition on line 668 was always true

669 coord.name() for coord in cube.coords(dim_coords=False) 

670 ]: 

671 cube.add_aux_coord( 

672 iris.coords.AuxCoord( 

673 cube.coord(x_name).points, 

674 var_name="grid_longitude", 

675 units="degrees", 

676 ), 

677 nx, 

678 ) 

679 

680 # Ensure input longitude DimCoord has CoordSystem 

681 # This attribute is sometimes lost on iris.save 

682 if not cube.coord(x_name).coord_system: 

683 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

684 

685 

686def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

687 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

688 

689 This problem was raised because the AIFS model data from ECMWF 

690 defines the pressure coordinate with the name "pressure_level" rather 

691 than compliant CF coordinate names. 

692 

693 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

694 and approach the coordinates in a unified way. 

695 """ 

696 for coord in cube.dim_coords: 

697 if coord.name() in ["pressure_level", "pressure_levels"]: 

698 coord.rename("pressure") 

699 

700 if coord.name() == "pressure": 

701 if str(cube.coord("pressure").units) != "hPa": 

702 cube.coord("pressure").convert_units("hPa") 

703 

704 

705def _fix_um_radtime(cube: iris.cube.Cube): 

706 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

707 

708 This callback does not have any effect for output diagnostics with 

709 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

710 diagnostics are checked. 

711 Note this callback does not interpolate the data in time, only adjust 

712 timestamps to sit on the hour to enable time-to-time difference plotting 

713 with models which may output radiation data on the hour. 

714 """ 

715 try: 

716 if cube.attributes["STASH"] in [ 

717 "m01s01i207", 

718 "m01s01i208", 

719 "m01s02i205", 

720 "m01s02i201", 

721 "m01s01i207", 

722 "m01s02i207", 

723 "m01s01i235", 

724 ]: 

725 time_coord = cube.coord("time") 

726 

727 # Convert time points to datetime objects 

728 time_unit = time_coord.units 

729 time_points = time_unit.num2date(time_coord.points) 

730 # Skip if times don't need fixing. 

731 if time_points[0].minute == 0 and time_points[0].second == 0: 

732 return 

733 if time_points[0].minute == 30 and time_points[0].second == 0: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true

734 return 

735 

736 # Subtract time difference from the hour from each time point 

737 n_minute = time_points[0].minute 

738 n_second = time_points[0].second 

739 # If times closer to next hour, compute difference to add on to following hour 

740 if n_minute > 30: 

741 n_minute = n_minute - 60 

742 # Compute new diagnostic time stamp 

743 new_time_points = ( 

744 time_points 

745 - datetime.timedelta(minutes=n_minute) 

746 - datetime.timedelta(seconds=n_second) 

747 ) 

748 

749 # Convert back to numeric values using the original time unit. 

750 new_time_values = time_unit.date2num(new_time_points) 

751 

752 # Replace the time coordinate with updated values. 

753 time_coord.points = new_time_values 

754 

755 # Recompute forecast_period with corrected values. 

756 if cube.coord("forecast_period"): 756 ↛ exitline 756 didn't return from function '_fix_um_radtime' because the condition on line 756 was always true

757 fcst_prd_points = cube.coord("forecast_period").points 

758 new_fcst_points = ( 

759 time_unit.num2date(fcst_prd_points) 

760 - datetime.timedelta(minutes=n_minute) 

761 - datetime.timedelta(seconds=n_second) 

762 ) 

763 cube.coord("forecast_period").points = time_unit.date2num( 

764 new_fcst_points 

765 ) 

766 except KeyError: 

767 pass 

768 

769 

770def _fix_cell_methods(cube: iris.cube.Cube): 

771 """To fix the assumed cell_methods in accumulation STASH from UM. 

772 

773 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

774 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

775 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

776 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

777 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

778 enabling the cell_method constraint on reading to select correct input. 

779 """ 

780 # Shift "mean" cell_method to "sum" for selected UM inputs. 

781 if cube.attributes.get("STASH") in [ 

782 "m01s21i104", 

783 "m01s04i201", 

784 "m01s04i202", 

785 "m01s05i201", 

786 "m01s05i202", 

787 ]: 

788 # Check if input cell_method contains "mean" time-processing. 

789 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 789 ↛ exitline 789 didn't return from function '_fix_cell_methods' because the condition on line 789 was always true

790 # Retrieve interval and any comment information. 

791 for cell_method in cube.cell_methods: 

792 interval_str = cell_method.intervals 

793 comment_str = cell_method.comments 

794 

795 # Remove input aggregation method. 

796 cube.cell_methods = () 

797 

798 # Replace "mean" with "sum" cell_method to indicate aggregation. 

799 cube.add_cell_method( 

800 iris.coords.CellMethod( 

801 method="sum", 

802 coords="time", 

803 intervals=interval_str, 

804 comments=comment_str, 

805 ) 

806 ) 

807 

808 

809def _convert_cube_units_callback(cube: iris.cube.Cube): 

810 """Adjust diagnostic units for specific variables. 

811 

812 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

813 converted here to mm hr-1. 

814 

815 Visibility diagnostics are converted here from m to km to improve output 

816 formatting. 

817 """ 

818 # Convert precipitation diagnostic units if required. 

819 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

820 if any("surface_microphysical" in name for name in varnames): 

821 if cube.units == "kg m-2 s-1": 

822 _log_once( 

823 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

824 level=logging.DEBUG, 

825 ) 

826 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

827 # This is a 1:1 conversion, so we just change the units. 

828 cube.units = "mm s-1" 

829 # Convert the units to per hour. 

830 cube.convert_units("mm hr-1") 

831 elif cube.units == "kg m-2": 831 ↛ 841line 831 didn't jump to line 841 because the condition on line 831 was always true

832 _log_once( 

833 "Converting precipitation amount units from kg m-2 to mm", 

834 level=logging.DEBUG, 

835 ) 

836 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

837 # This is a 1:1 conversion, so we just change the units. 

838 cube.units = "mm" 

839 

840 # Convert visibility diagnostic units if required. 

841 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

842 if any("visibility" in name for name in varnames): 

843 if cube.units == "m": 843 ↛ 848line 843 didn't jump to line 848 because the condition on line 843 was always true

844 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

845 # Convert the units to km. 

846 cube.convert_units("km") 

847 

848 return cube 

849 

850 

851def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

852 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

853 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

854 if any("cloud_base_altitude" in name for name in varnames): 

855 # Mask cube where set > 144kft to catch default 144.35695538058164 

856 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

857 

858 

859def _fix_um_winds(cubes: iris.cube.CubeList): 

860 """To make winds from the UM consistent with those from LFRic. 

861 

862 Diagnostics of wind are not always consistent between the UM 

863 and LFric. Here, winds from the UM are adjusted to make them i 

864 consistent with LFRic. 

865 """ 

866 # Check whether we have components of the wind identified by STASH, 

867 # (so this will apply only to cubes from the UM), but not the 

868 # wind speed and calculate it if it is missing. Note that 

869 # this will be biased low in general because the components will mostly 

870 # be time averages. For simplicity, we do this only if there is just one 

871 # cube of a component. A more complicated approach would be to consider 

872 # the cell methods, but it may not be warranted. 

873 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

874 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

875 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

876 try: 

877 if cubes.extract(u_constr) and cubes.extract(v_constr): 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

879 _add_wind_speed_um(cubes) 

880 # Convert winds in the UM to be relative to true east and true north. 

881 _convert_wind_true_dirn_um(cubes) 

882 except (KeyError, AttributeError): 

883 pass 

884 

885 

886def _add_wind_speed_um(cubes: iris.cube.CubeList): 

887 """Add windspeeds to cubes from the UM.""" 

888 wspd10 = ( 

889 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

890 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

891 ) ** 0.5 

892 wspd10.attributes["STASH"] = "m01s03i227" 

893 wspd10.standard_name = "wind_speed" 

894 wspd10.long_name = "wind_speed_at_10m" 

895 cubes.append(wspd10) 

896 

897 

898def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

899 """To convert winds to true directions. 

900 

901 Convert from the components relative to the grid to true directions. 

902 This functionality only handles the simplest case. 

903 """ 

904 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

905 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

906 for u, v in zip(u_grids, v_grids, strict=True): 

907 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

908 u.data = true_u.core_data() 

909 v.data = true_v.core_data() 

910 

911 

912def _normalise_var0_varname(cube: iris.cube.Cube): 

913 """Fix varnames for consistency to allow merging. 

914 

915 Some model data netCDF sometimes have a coordinate name end in 

916 "_0" etc, where duplicate coordinates of same name are defined but 

917 with different attributes. This can be inconsistently managed in 

918 different model inputs and can cause cubes to fail to merge. 

919 """ 

920 for coord in cube.coords(): 

921 if coord.var_name and coord.var_name.endswith("_0"): 

922 coord.var_name = coord.var_name.removesuffix("_0") 

923 if coord.var_name and coord.var_name.endswith("_1"): 

924 coord.var_name = coord.var_name.removesuffix("_1") 

925 if coord.var_name and coord.var_name.endswith("_2"): 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 coord.var_name = coord.var_name.removesuffix("_2") 

927 if coord.var_name and coord.var_name.endswith("_3"): 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 coord.var_name = coord.var_name.removesuffix("_3") 

929 

930 if cube.var_name and cube.var_name.endswith("_0"): 

931 cube.var_name = cube.var_name.removesuffix("_0") 

932 

933 

934def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

935 """Convert the calendars of time units to use a standard calendar.""" 

936 try: 

937 time_coord = cube.coord("time") 

938 if time_coord.units.calendar == "proleptic_gregorian": 

939 logging.debug( 

940 "Changing proleptic Gregorian calendar to standard calendar for %s", 

941 repr(time_coord.units), 

942 ) 

943 time_coord.units = time_coord.units.change_calendar("standard") 

944 except iris.exceptions.CoordinateNotFoundError: 

945 pass 

946 

947 

948def _lfric_time_callback(cube: iris.cube.Cube): 

949 """Fix time coordinate metadata if missing dimensions. 

950 

951 Some model data does not contain forecast_reference_time or forecast_period as 

952 expected coordinates, and so we cannot aggregate over case studies without this 

953 metadata. This callback fixes these issues. 

954 

955 This callback also ensures all time coordinates are referenced as hours since 

956 1970-01-01 00:00:00 for consistency across different model inputs. 

957 

958 Notes 

959 ----- 

960 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

961 """ 

962 # Construct forecast_reference time if it doesn't exist. 

963 try: 

964 tcoord = cube.coord("time") 

965 # Set time coordinate to common basis "hours since 1970" 

966 try: 

967 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

968 except ValueError: 

969 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

970 

971 if not cube.coords("forecast_reference_time"): 

972 try: 

973 init_time = datetime.datetime.fromisoformat( 

974 tcoord.attributes["time_origin"] 

975 ) 

976 frt_point = tcoord.units.date2num(init_time) 

977 frt_coord = iris.coords.AuxCoord( 

978 frt_point, 

979 units=tcoord.units, 

980 standard_name="forecast_reference_time", 

981 long_name="forecast_reference_time", 

982 ) 

983 cube.add_aux_coord(frt_coord) 

984 except KeyError: 

985 logging.warning( 

986 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

987 ) 

988 

989 # Remove time_origin to allow multiple case studies to merge. 

990 tcoord.attributes.pop("time_origin", None) 

991 

992 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

993 if not cube.coords("forecast_period"): 

994 try: 

995 # Create array of forecast lead times. 

996 init_coord = cube.coord("forecast_reference_time") 

997 init_time_points_in_tcoord_units = tcoord.units.date2num( 

998 init_coord.units.num2date(init_coord.points) 

999 ) 

1000 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1001 

1002 # Get unit for lead time from time coordinate's unit. 

1003 # Convert all lead time to hours for consistency between models. 

1004 if "seconds" in str(tcoord.units): 1004 ↛ 1005line 1004 didn't jump to line 1005 because the condition on line 1004 was never true

1005 lead_times = lead_times / 3600.0 

1006 units = "hours" 

1007 elif "hours" in str(tcoord.units): 1007 ↛ 1010line 1007 didn't jump to line 1010 because the condition on line 1007 was always true

1008 units = "hours" 

1009 else: 

1010 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1011 

1012 # Create lead time coordinate. 

1013 lead_time_coord = iris.coords.AuxCoord( 

1014 lead_times, 

1015 standard_name="forecast_period", 

1016 long_name="forecast_period", 

1017 units=units, 

1018 ) 

1019 

1020 # Associate lead time coordinate with time dimension. 

1021 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1022 except iris.exceptions.CoordinateNotFoundError: 

1023 logging.warning( 

1024 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1025 ) 

1026 except iris.exceptions.CoordinateNotFoundError: 

1027 logging.warning("No time coordinate on cube.") 

1028 

1029 

1030def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1031 """Check forecast_period name and units.""" 

1032 try: 

1033 coord = cube.coord("forecast_period") 

1034 if coord.units != "hours": 

1035 cube.coord("forecast_period").convert_units("hours") 

1036 if not coord.standard_name: 

1037 coord.standard_name = "forecast_period" 

1038 except iris.exceptions.CoordinateNotFoundError: 

1039 pass 

1040 

1041 

1042def _normalise_ML_varname(cube: iris.cube.Cube): 

1043 """Fix plev variable names to standard names.""" 

1044 if cube.coords("pressure"): 

1045 if cube.name() == "x_wind": 

1046 cube.long_name = "zonal_wind_at_pressure_levels" 

1047 if cube.name() == "y_wind": 

1048 cube.long_name = "meridional_wind_at_pressure_levels" 

1049 if cube.name() == "air_temperature": 

1050 cube.long_name = "temperature_at_pressure_levels" 

1051 if cube.name() == "specific_humidity": 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 cube.long_name = ( 

1053 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1054 )