Coverage for src/CSET/operators/read.py: 90%

417 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-19 11:18 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import dask 

27import iris 

28import iris.coord_systems 

29import iris.coords 

30import iris.cube 

31import iris.exceptions 

32import iris.util 

33import numpy as np 

34from iris.analysis.cartography import rotate_pole, rotate_winds 

35 

36from CSET._common import iter_maybe 

37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

38from CSET.operators._utils import ( 

39 get_cube_coordindex, 

40 get_cube_yxcoordname, 

41 is_spatialdim, 

42) 

43 

44 

45class NoDataError(FileNotFoundError): 

46 """Error that no data has been loaded.""" 

47 

48 

49def read_cube( 

50 file_paths: list[str] | str, 

51 constraint: iris.Constraint = None, 

52 model_names: list[str] | str | None = None, 

53 subarea_type: str = None, 

54 subarea_extent: list[float] = None, 

55 **kwargs, 

56) -> iris.cube.Cube: 

57 """Read a single cube from files. 

58 

59 Read operator that takes a path string (can include shell-style glob 

60 patterns), and loads the cube matching the constraint. If any paths point to 

61 directory, all the files contained within are loaded. 

62 

63 Ensemble data can also be loaded. If it has a realization coordinate 

64 already, it will be directly used. If not, it will have its member number 

65 guessed from the filename, based on one of several common patterns. For 

66 example the pattern *emXX*, where XX is the realization. 

67 

68 Deterministic data will be loaded with a realization of 0, allowing it to be 

69 processed in the same way as ensemble data. 

70 

71 Arguments 

72 --------- 

73 file_paths: str | list[str] 

74 Path or paths to where .pp/.nc files are located 

75 constraint: iris.Constraint | iris.ConstraintCombination, optional 

76 Constraints to filter data by. Defaults to unconstrained. 

77 model_names: str | list[str], optional 

78 Names of the models that correspond to respective paths in file_paths. 

79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

80 Whether to constrain data by model relative coordinates or real world 

81 coordinates. 

82 subarea_extent: list, optional 

83 List of coordinates to constraint data by, in order lower latitude, 

84 upper latitude, lower longitude, upper longitude. 

85 

86 Returns 

87 ------- 

88 cubes: iris.cube.Cube 

89 Cube loaded 

90 

91 Raises 

92 ------ 

93 FileNotFoundError 

94 If the provided path does not exist 

95 ValueError 

96 If the constraint doesn't produce a single cube. 

97 """ 

98 cubes = read_cubes( 

99 file_paths=file_paths, 

100 constraint=constraint, 

101 model_names=model_names, 

102 subarea_type=subarea_type, 

103 subarea_extent=subarea_extent, 

104 ) 

105 # Check filtered cubes is a CubeList containing one cube. 

106 if len(cubes) == 1: 

107 return cubes[0] 

108 else: 

109 raise ValueError( 

110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

111 ) 

112 

113 

114def read_cubes( 

115 file_paths: list[str] | str, 

116 constraint: iris.Constraint | None = None, 

117 model_names: str | list[str] | None = None, 

118 subarea_type: str = None, 

119 subarea_extent: list = None, 

120 **kwargs, 

121) -> iris.cube.CubeList: 

122 """Read cubes from files. 

123 

124 Read operator that takes a path string (can include shell-style glob 

125 patterns), and loads the cubes matching the constraint. If any paths point 

126 to directory, all the files contained within are loaded. 

127 

128 Ensemble data can also be loaded. If it has a realization coordinate 

129 already, it will be directly used. If not, it will have its member number 

130 guessed from the filename, based on one of several common patterns. For 

131 example the pattern *emXX*, where XX is the realization. 

132 

133 Deterministic data will be loaded with a realization of 0, allowing it to be 

134 processed in the same way as ensemble data. 

135 

136 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

137 that the cubes merge across files. 

138 

139 Arguments 

140 --------- 

141 file_paths: str | list[str] 

142 Path or paths to where .pp/.nc files are located. Can include globs. 

143 constraint: iris.Constraint | iris.ConstraintCombination, optional 

144 Constraints to filter data by. Defaults to unconstrained. 

145 model_names: str | list[str], optional 

146 Names of the models that correspond to respective paths in file_paths. 

147 subarea_type: str, optional 

148 Whether to constrain data by model relative coordinates or real world 

149 coordinates. 

150 subarea_extent: list[float], optional 

151 List of coordinates to constraint data by, in order lower latitude, 

152 upper latitude, lower longitude, upper longitude. 

153 

154 Returns 

155 ------- 

156 cubes: iris.cube.CubeList 

157 Cubes loaded after being merged and concatenated. 

158 

159 Raises 

160 ------ 

161 FileNotFoundError 

162 If the provided path does not exist 

163 """ 

164 # Get iterable of paths. Each path corresponds to 1 model. 

165 paths = iter_maybe(file_paths) 

166 model_names = iter_maybe(model_names) 

167 

168 # Check we have appropriate number of model names. 

169 if model_names != (None,) and len(model_names) != len(paths): 

170 raise ValueError( 

171 f"The number of model names ({len(model_names)}) should equal " 

172 f"the number of paths given ({len(paths)})." 

173 ) 

174 

175 # Load the data for each model into a CubeList per model. 

176 model_cubes = ( 

177 _load_model(path, name, constraint) 

178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

179 ) 

180 

181 # Split out first model's cubes and mark it as the base for comparisons. 

182 cubes = next(model_cubes) 

183 for cube in cubes: 

184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

185 cube.attributes["cset_comparison_base"] = 1 

186 

187 # Load the rest of the models. 

188 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

189 

190 # Enable different point-based observation sources to be concatenated. 

191 cubes = _check_combine_point_observations(cubes) 

192 

193 # Unify time units so different case studies can merge. 

194 iris.util.unify_time_units(cubes) 

195 

196 # Select sub region. 

197 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

198 

199 # Merge and concatenate cubes now metadata has been fixed. 

200 cubes = _merge_cubes_check_ensemble(cubes) 

201 cubes = cubes.concatenate() 

202 

203 # Squeeze single valued coordinates into scalar coordinates. 

204 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes) 

205 

206 # Ensure dimension coordinates are bounded. 

207 for cube in cubes: 

208 for dim_coord in cube.coords(dim_coords=True): 

209 if (dim_coord.standard_name == "time") and ( 

210 dim_coord.name() 

211 not in itertools.chain.from_iterable( 

212 m.coord_names for m in cube.cell_methods if m.method != "point" 

213 ) 

214 ): 

215 # Instantaneous time coordinate 

216 continue 

217 # Iris can't guess the bounds of a scalar coordinate. 

218 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

219 dim_coord.guess_bounds() 

220 

221 logging.info("Loaded cubes: %s", cubes) 

222 if len(cubes) == 0: 

223 raise NoDataError("No cubes loaded, check your constraints!") 

224 return cubes 

225 

226 

227def _load_model( 

228 paths: str | list[str], 

229 model_name: str | None, 

230 constraint: iris.Constraint | None, 

231) -> iris.cube.CubeList: 

232 """Load a single model's data into a CubeList.""" 

233 input_files = _check_input_files(paths) 

234 # If unset, a constraint of None lets everything be loaded. 

235 logging.debug("Constraint: %s", constraint) 

236 cubes = iris.load(input_files, constraint, callback=_loading_callback) 

237 # Make the UM's winds consistent with LFRic. 

238 _fix_um_winds(cubes) 

239 

240 # Add model_name attribute to each cube to make it available at any further 

241 # step without needing to pass it as function parameter. 

242 if model_name is not None: 

243 for cube in cubes: 

244 cube.attributes["model_name"] = model_name 

245 return cubes 

246 

247 

248def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

249 """Get an iterable of files to load, and check that they all exist. 

250 

251 Arguments 

252 --------- 

253 input_paths: list[str] 

254 List of paths to input files or directories. The path may itself contain 

255 glob patterns, but unlike in shells it will match directly first. 

256 

257 Returns 

258 ------- 

259 list[Path] 

260 A list of files to load. 

261 

262 Raises 

263 ------ 

264 FileNotFoundError: 

265 If the provided arguments don't resolve to at least one existing file. 

266 """ 

267 files = [] 

268 for raw_filename in iter_maybe(input_paths): 

269 # Match glob-like files first, if they exist. 

270 raw_path = Path(raw_filename) 

271 if raw_path.is_file(): 

272 files.append(raw_path) 

273 else: 

274 for input_path in glob.glob(raw_filename): 

275 # Convert string paths into Path objects. 

276 input_path = Path(input_path) 

277 # Get the list of files in the directory, or use it directly. 

278 if input_path.is_dir(): 

279 logging.debug("Checking directory '%s' for files", input_path) 

280 files.extend(p for p in input_path.iterdir() if p.is_file()) 

281 else: 

282 files.append(input_path) 

283 

284 files.sort() 

285 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

286 if len(files) == 0: 

287 raise FileNotFoundError(f"No files found for {input_paths}") 

288 return files 

289 

290 

291def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList): 

292 """Merge CubeList, renumbering realizations of 0 if required. 

293 

294 An unsuccessful merge indicates common input cube attributes, most 

295 commonly from ensemble members missing an explicit realization 

296 coordinate. Therefore the members are renumbered before being merged 

297 again. 

298 """ 

299 try: 

300 cubes = cubes.merge() 

301 except iris.exceptions.MergeError: 

302 _log_once( 

303 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.", 

304 level=logging.WARNING, 

305 ) 

306 for ir, cube in enumerate(cubes): 

307 if cube.coord("realization").points == 0: 307 ↛ 306line 307 didn't jump to line 306 because the condition on line 307 was always true

308 cube.coord("realization").points = ir + 1 

309 cubes = cubes.merge() 

310 return cubes 

311 

312 

313def _cutout_cubes( 

314 cubes: iris.cube.CubeList, 

315 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

316 subarea_extent: list[float], 

317): 

318 """Cut out a subarea from a CubeList.""" 

319 if subarea_type is None: 

320 logging.debug("Subarea selection is disabled.") 

321 return cubes 

322 

323 # If selected, cutout according to number of grid cells to trim from each edge. 

324 cutout_cubes = iris.cube.CubeList() 

325 # Find spatial coordinates 

326 for cube in cubes: 

327 # Find dimension coordinates. 

328 lat_name, lon_name = get_cube_yxcoordname(cube) 

329 

330 # Compute cutout based on number of cells to trim from edges. 

331 if subarea_type == "gridcells": 

332 logging.debug( 

333 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

334 subarea_extent[0], 

335 subarea_extent[1], 

336 subarea_extent[2], 

337 subarea_extent[3], 

338 ) 

339 lat_points = np.sort(cube.coord(lat_name).points) 

340 lon_points = np.sort(cube.coord(lon_name).points) 

341 # Define cutout region using user provided cell points. 

342 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

343 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

344 

345 # Compute cutout based on specified coordinate values. 

346 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

347 # If not gridcells, cutout by requested geographic area, 

348 logging.debug( 

349 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

350 subarea_extent[0], 

351 subarea_extent[1], 

352 subarea_extent[2], 

353 subarea_extent[3], 

354 ) 

355 # Define cutout region using user provided coordinates. 

356 lats = np.array(subarea_extent[0:2]) 

357 lons = np.array(subarea_extent[2:4]) 

358 # Ensure cutout longitudes are within +/- 180.0 bounds. 

359 while lons[0] < -180.0: 

360 lons += 360.0 

361 while lons[1] > 180.0: 

362 lons -= 360.0 

363 # If the coordinate system is rotated we convert coordinates into 

364 # model-relative coordinates to extract the appropriate cutout. 

365 coord_system = cube.coord(lat_name).coord_system 

366 if subarea_type == "realworld" and isinstance( 

367 coord_system, iris.coord_systems.RotatedGeogCS 

368 ): 

369 lons, lats = rotate_pole( 

370 lons, 

371 lats, 

372 pole_lon=coord_system.grid_north_pole_longitude, 

373 pole_lat=coord_system.grid_north_pole_latitude, 

374 ) 

375 else: 

376 raise ValueError("Unknown subarea_type:", subarea_type) 

377 

378 # Do cutout and add to cutout_cubes. 

379 intersection_args = {lat_name: lats, lon_name: lons} 

380 logging.debug("Cutting out coords: %s", intersection_args) 

381 try: 

382 cutout_cubes.append(cube.intersection(**intersection_args)) 

383 except IndexError as err: 

384 raise ValueError( 

385 "Region cutout error. Check and update SUBAREA_EXTENT." 

386 "Cutout region requested should be contained within data area. " 

387 "Also check if cutout region requested is smaller than input grid spacing." 

388 ) from err 

389 

390 return cutout_cubes 

391 

392 

393def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube: 

394 """Compose together the needed callbacks into a single function.""" 

395 # Most callbacks operate in-place, but save the cube when returned! 

396 _realization_callback(cube) 

397 _um_normalise_callback(cube) 

398 _lfric_normalise_callback(cube) 

399 cube = _lfric_time_coord_fix_callback(cube) 

400 _normalise_var0_varname(cube) 

401 cube = _fix_no_spatial_coords_callback(cube) 

402 _fix_spatial_coords_callback(cube) 

403 _fix_pressure_coord_callback(cube) 

404 _fix_um_radtime(cube) 

405 _fix_cell_methods(cube) 

406 cube = _convert_cube_units_callback(cube) 

407 cube = _grid_longitude_fix_callback(cube) 

408 _fix_lfric_cloud_base_altitude(cube) 

409 _proleptic_gregorian_fix(cube) 

410 _lfric_time_callback(cube) 

411 _lfric_forecast_period_callback(cube) 

412 cube = _fix_no_time_coords_callback(cube) 

413 _normalise_ML_varname(cube) 

414 return cube 

415 

416 

417def _realization_callback(cube): 

418 """Add a realization coordinate initialised to 0 if missing. 

419 

420 This means deterministic and ensemble cubes can assume realization coordinate through the rest 

421 of the code. 

422 """ 

423 # Only add if realization coordinate does not exist. 

424 if not cube.coords("realization"): 

425 cube.add_aux_coord( 

426 iris.coords.DimCoord(0, standard_name="realization", units="1") 

427 ) 

428 

429 

430@functools.lru_cache(None) 

431def _log_once(msg, level=logging.WARNING): 

432 """Print a warning message, skipping recent duplicates.""" 

433 logging.log(level, msg) 

434 

435 

436def _um_normalise_callback(cube: iris.cube.Cube): 

437 """Normalise UM STASH variable long names to LFRic variable names. 

438 

439 Note standard names will remain associated with cubes where different. 

440 Long name will be used consistently in output filename and titles. 

441 """ 

442 # Convert STASH to LFRic variable name 

443 if "STASH" in cube.attributes: 

444 stash = cube.attributes["STASH"] 

445 try: 

446 (name, grid) = STASH_TO_LFRIC[str(stash)] 

447 cube.long_name = name 

448 except KeyError: 

449 # Don't change cubes with unknown stash codes. 

450 _log_once( 

451 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.", 

452 level=logging.WARNING, 

453 ) 

454 

455 

456def _lfric_normalise_callback(cube: iris.cube.Cube): 

457 """Normalise attributes that prevents LFRic cube from merging. 

458 

459 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

460 no relation to the data contained. These attributes are removed. 

461 

462 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

463 multiple UM fields, however it can be encoded in any order. This attribute 

464 is sorted to prevent this. This attribute is only present in LFRic data that 

465 has been converted to look like UM data. 

466 """ 

467 # Remove unwanted attributes. 

468 cube.attributes.pop("timeStamp", None) 

469 cube.attributes.pop("uuid", None) 

470 cube.attributes.pop("name", None) 

471 cube.attributes.pop("source", None) 

472 cube.attributes.pop("analysis_source", None) 

473 cube.attributes.pop("history", None) 

474 

475 # Sort STASH code list. 

476 stash_list = cube.attributes.get("um_stash_source") 

477 if stash_list: 

478 # Parse the string as a list, sort, then re-encode as a string. 

479 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

480 

481 

482def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

483 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

484 

485 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

486 issue, though the coordinate satisfies all the properties for a DimCoord. 

487 Scalar time values are left as AuxCoords. 

488 """ 

489 # This issue seems to come from iris's handling of NetCDF files where time 

490 # always ends up as an AuxCoord. 

491 if cube.coords("time"): 

492 time_coord = cube.coord("time") 

493 if ( 

494 not isinstance(time_coord, iris.coords.DimCoord) 

495 and len(cube.coord_dims(time_coord)) == 1 

496 ): 

497 # Fudge the bounds to foil checking for strict monotonicity. 

498 if time_coord.has_bounds(): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8: 

500 time_coord.bounds = [ 

501 [ 

502 time_coord.bounds[i][0] + 1.0e-8 * float(i), 

503 time_coord.bounds[i][1], 

504 ] 

505 for i in range(len(time_coord.bounds)) 

506 ] 

507 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

508 return cube 

509 

510 

511def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube: 

512 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

513 

514 This is necessary if comparing two models with different conventions -- 

515 for example, models where the prime meridian is defined as 0 deg or 

516 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

517 so that it falls in this range. Checks are for near-180 bounds given 

518 model data bounds may not extend exactly to 0. or 360. 

519 Input cubes on non-rotated grid coordinates are not impacted. 

520 """ 

521 try: 

522 y, x = get_cube_yxcoordname(cube) 

523 except ValueError: 

524 # Don't modify non-spatial cubes. 

525 return cube 

526 

527 long_coord = cube.coord(x) 

528 # Wrap longitudes if rotated pole coordinates 

529 coord_system = long_coord.coord_system 

530 if x == "grid_longitude" and isinstance( 

531 coord_system, iris.coord_systems.RotatedGeogCS 

532 ): 

533 long_points = long_coord.points.copy() 

534 long_centre = np.median(long_points) 

535 while long_centre < -175.0: 

536 long_centre += 360.0 

537 long_points += 360.0 

538 while long_centre >= 175.0: 

539 long_centre -= 360.0 

540 long_points -= 360.0 

541 long_coord.points = long_points 

542 

543 # Update coord bounds to be consistent with wrapping. 

544 if long_coord.has_bounds(): 

545 long_coord.bounds = None 

546 long_coord.guess_bounds() 

547 

548 return cube 

549 

550 

551def _fix_no_spatial_coords_callback(cube: iris.cube.Cube): 

552 import CSET.operators._utils as utils 

553 

554 # Don't modify spatial cubes that already have spatial dimensions 

555 if utils.is_spatialdim(cube): 

556 return cube 

557 

558 else: 

559 # attempt to get lat/long from cube attributes 

560 try: 

561 lat_min = cube.attributes.get("geospatial_lat_min") 

562 lat_max = cube.attributes.get("geospatial_lat_max") 

563 lon_min = cube.attributes.get("geospatial_lon_min") 

564 lon_max = cube.attributes.get("geospatial_lon_max") 

565 

566 lon_val = (lon_min + lon_max) / 2.0 

567 lat_val = (lat_min + lat_max) / 2.0 

568 

569 lat_coord = iris.coords.DimCoord( 

570 lat_val, 

571 standard_name="latitude", 

572 units="degrees_north", 

573 var_name="latitude", 

574 coord_system=iris.coord_systems.GeogCS(6371229.0), 

575 circular=True, 

576 ) 

577 

578 lon_coord = iris.coords.DimCoord( 

579 lon_val, 

580 standard_name="longitude", 

581 units="degrees_east", 

582 var_name="longitude", 

583 coord_system=iris.coord_systems.GeogCS(6371229.0), 

584 circular=True, 

585 ) 

586 

587 cube.add_aux_coord(lat_coord) 

588 cube.add_aux_coord(lon_coord) 

589 return cube 

590 

591 # if lat/long are not in attributes, then return cube unchanged: 

592 except TypeError: 

593 return cube 

594 

595 

596def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

597 """Check latitude and longitude coordinates name. 

598 

599 This is necessary as some models define their grid as on rotated 

600 'grid_latitude' and 'grid_longitude' coordinates while others define 

601 the grid on non-rotated 'latitude' and 'longitude'. 

602 Cube dimensions need to be made consistent to avoid recipe failures, 

603 particularly where comparing multiple input models with differing spatial 

604 coordinates. 

605 """ 

606 # Check if cube is spatial. 

607 if not is_spatialdim(cube): 

608 # Don't modify non-spatial cubes. 

609 return 

610 

611 # Get spatial coords and dimension index. 

612 y_name, x_name = get_cube_yxcoordname(cube) 

613 ny = get_cube_coordindex(cube, y_name) 

614 nx = get_cube_coordindex(cube, x_name) 

615 

616 # Remove spatial coords bounds if erroneous values detected. 

617 # Aims to catch some errors in input coord bounds by setting 

618 # invalid threshold of 10000.0 

619 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds(): 

620 bx_max = np.max(np.abs(cube.coord(x_name).bounds)) 

621 by_max = np.max(np.abs(cube.coord(y_name).bounds)) 

622 if bx_max > 10000.0 or by_max > 10000.0: 

623 cube.coord(x_name).bounds = None 

624 cube.coord(y_name).bounds = None 

625 

626 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

627 # [latitude, longitude] for instances where rotated_pole=90.0 

628 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

629 coord_system = cube.coord("grid_latitude").coord_system 

630 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None) 

631 if pole_lat == 90.0: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true

632 lats = cube.coord("grid_latitude").points 

633 lons = cube.coord("grid_longitude").points 

634 

635 cube.remove_coord("grid_latitude") 

636 cube.add_dim_coord( 

637 iris.coords.DimCoord( 

638 lats, 

639 standard_name="latitude", 

640 var_name="latitude", 

641 units="degrees", 

642 coord_system=iris.coord_systems.GeogCS(6371229.0), 

643 circular=True, 

644 ), 

645 ny, 

646 ) 

647 y_name = "latitude" 

648 cube.remove_coord("grid_longitude") 

649 cube.add_dim_coord( 

650 iris.coords.DimCoord( 

651 lons, 

652 standard_name="longitude", 

653 var_name="longitude", 

654 units="degrees", 

655 coord_system=iris.coord_systems.GeogCS(6371229.0), 

656 circular=True, 

657 ), 

658 nx, 

659 ) 

660 x_name = "longitude" 

661 

662 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

663 # rotated pole attributes for cases with [lat, lon] inputs 

664 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

665 "degrees", 

666 "degrees_north", 

667 "degrees_south", 

668 ]: 

669 # Add grid_latitude AuxCoord 

670 if "grid_latitude" not in [ 

671 coord.name() for coord in cube.coords(dim_coords=False) 

672 ]: 

673 cube.add_aux_coord( 

674 iris.coords.AuxCoord( 

675 cube.coord(y_name).points, 

676 var_name="grid_latitude", 

677 units="degrees", 

678 ), 

679 ny, 

680 ) 

681 # Ensure input latitude DimCoord has CoordSystem 

682 # This attribute is sometimes lost on iris.save 

683 if not cube.coord(y_name).coord_system: 

684 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

685 

686 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

687 "degrees", 

688 "degrees_west", 

689 "degrees_east", 

690 ]: 

691 # Add grid_longitude AuxCoord 

692 if "grid_longitude" not in [ 

693 coord.name() for coord in cube.coords(dim_coords=False) 

694 ]: 

695 cube.add_aux_coord( 

696 iris.coords.AuxCoord( 

697 cube.coord(x_name).points, 

698 var_name="grid_longitude", 

699 units="degrees", 

700 ), 

701 nx, 

702 ) 

703 

704 # Ensure input longitude DimCoord has CoordSystem 

705 # This attribute is sometimes lost on iris.save 

706 if not cube.coord(x_name).coord_system: 

707 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

708 

709 

710def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

711 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

712 

713 This problem was raised because the AIFS model data from ECMWF 

714 defines the pressure coordinate with the name "pressure_level" rather 

715 than compliant CF coordinate names. 

716 

717 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

718 and approach the coordinates in a unified way. 

719 """ 

720 for coord in cube.dim_coords: 

721 if coord.name() in ["pressure_level", "pressure_levels"]: 

722 coord.rename("pressure") 

723 

724 if coord.name() == "pressure": 

725 if str(cube.coord("pressure").units) != "hPa": 

726 cube.coord("pressure").convert_units("hPa") 

727 

728 

729def _fix_um_radtime(cube: iris.cube.Cube): 

730 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

731 

732 This callback does not have any effect for output diagnostics with 

733 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

734 diagnostics are checked. 

735 Note this callback does not interpolate the data in time, only adjust 

736 timestamps to sit on the hour to enable time-to-time difference plotting 

737 with models which may output radiation data on the hour. 

738 """ 

739 try: 

740 if cube.attributes["STASH"] in [ 

741 "m01s01i207", 

742 "m01s01i208", 

743 "m01s02i205", 

744 "m01s02i201", 

745 "m01s01i207", 

746 "m01s02i207", 

747 "m01s01i235", 

748 ]: 

749 time_coord = cube.coord("time") 

750 

751 # Convert time points to datetime objects 

752 time_unit = time_coord.units 

753 time_points = time_unit.num2date(time_coord.points) 

754 # Skip if times don't need fixing. 

755 if time_points[0].minute == 0 and time_points[0].second == 0: 

756 return 

757 if time_points[0].minute == 30 and time_points[0].second == 0: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 return 

759 

760 # Subtract time difference from the hour from each time point 

761 n_minute = time_points[0].minute 

762 n_second = time_points[0].second 

763 # If times closer to next hour, compute difference to add on to following hour 

764 if n_minute > 30: 

765 n_minute = n_minute - 60 

766 # Compute new diagnostic time stamp 

767 new_time_points = ( 

768 time_points 

769 - datetime.timedelta(minutes=n_minute) 

770 - datetime.timedelta(seconds=n_second) 

771 ) 

772 

773 # Convert back to numeric values using the original time unit. 

774 new_time_values = time_unit.date2num(new_time_points) 

775 

776 # Replace the time coordinate with updated values. 

777 time_coord.points = new_time_values 

778 

779 # Recompute forecast_period with corrected values. 

780 if cube.coord("forecast_period"): 780 ↛ exitline 780 didn't return from function '_fix_um_radtime' because the condition on line 780 was always true

781 fcst_prd_points = cube.coord("forecast_period").points 

782 new_fcst_points = ( 

783 time_unit.num2date(fcst_prd_points) 

784 - datetime.timedelta(minutes=n_minute) 

785 - datetime.timedelta(seconds=n_second) 

786 ) 

787 cube.coord("forecast_period").points = time_unit.date2num( 

788 new_fcst_points 

789 ) 

790 except KeyError: 

791 pass 

792 

793 

794def _fix_cell_methods(cube: iris.cube.Cube): 

795 """To fix the assumed cell_methods in accumulation STASH from UM. 

796 

797 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

798 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

799 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

800 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

801 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

802 enabling the cell_method constraint on reading to select correct input. 

803 """ 

804 # Shift "mean" cell_method to "sum" for selected UM inputs. 

805 if cube.attributes.get("STASH") in [ 

806 "m01s21i104", 

807 "m01s04i201", 

808 "m01s04i202", 

809 "m01s05i201", 

810 "m01s05i202", 

811 ]: 

812 # Check if input cell_method contains "mean" time-processing. 

813 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 813 ↛ exitline 813 didn't return from function '_fix_cell_methods' because the condition on line 813 was always true

814 # Retrieve interval and any comment information. 

815 for cell_method in cube.cell_methods: 

816 interval_str = cell_method.intervals 

817 comment_str = cell_method.comments 

818 

819 # Remove input aggregation method. 

820 cube.cell_methods = () 

821 

822 # Replace "mean" with "sum" cell_method to indicate aggregation. 

823 cube.add_cell_method( 

824 iris.coords.CellMethod( 

825 method="sum", 

826 coords="time", 

827 intervals=interval_str, 

828 comments=comment_str, 

829 ) 

830 ) 

831 

832 

833def _convert_cube_units_callback(cube: iris.cube.Cube): 

834 """Adjust diagnostic units for specific variables. 

835 

836 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

837 converted here to mm hr-1. 

838 

839 Visibility diagnostics are converted here from m to km to improve output 

840 formatting. 

841 """ 

842 # Convert precipitation diagnostic units if required. 

843 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

844 if any("surface_microphysical" in name for name in varnames): 

845 if cube.units == "kg m-2 s-1": 

846 _log_once( 

847 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1", 

848 level=logging.DEBUG, 

849 ) 

850 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

851 # This is a 1:1 conversion, so we just change the units. 

852 cube.units = "mm s-1" 

853 # Convert the units to per hour. 

854 cube.convert_units("mm hr-1") 

855 elif cube.units == "kg m-2": 855 ↛ 865line 855 didn't jump to line 865 because the condition on line 855 was always true

856 _log_once( 

857 "Converting precipitation amount units from kg m-2 to mm", 

858 level=logging.DEBUG, 

859 ) 

860 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

861 # This is a 1:1 conversion, so we just change the units. 

862 cube.units = "mm" 

863 

864 # Convert visibility diagnostic units if required. 

865 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

866 if any("visibility" in name for name in varnames): 

867 if cube.units == "m": 867 ↛ 872line 867 didn't jump to line 872 because the condition on line 867 was always true

868 _log_once("Converting visibility units m to km.", level=logging.DEBUG) 

869 # Convert the units to km. 

870 cube.convert_units("km") 

871 

872 return cube 

873 

874 

875def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

876 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

877 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

878 if any("cloud_base_altitude" in name for name in varnames): 

879 # Mask cube where set > 144kft to catch default 144.35695538058164 

880 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0) 

881 

882 

883def _fix_um_winds(cubes: iris.cube.CubeList): 

884 """To make winds from the UM consistent with those from LFRic. 

885 

886 Diagnostics of wind are not always consistent between the UM 

887 and LFric. Here, winds from the UM are adjusted to make them i 

888 consistent with LFRic. 

889 """ 

890 # Check whether we have components of the wind identified by STASH, 

891 # (so this will apply only to cubes from the UM), but not the 

892 # wind speed and calculate it if it is missing. Note that 

893 # this will be biased low in general because the components will mostly 

894 # be time averages. For simplicity, we do this only if there is just one 

895 # cube of a component. A more complicated approach would be to consider 

896 # the cell methods, but it may not be warranted. 

897 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

898 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

899 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

900 try: 

901 if cubes.extract(u_constr) and cubes.extract(v_constr): 901 ↛ 902line 901 didn't jump to line 902 because the condition on line 901 was never true

902 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

903 _add_wind_speed_um(cubes) 

904 # Convert winds in the UM to be relative to true east and true north. 

905 _convert_wind_true_dirn_um(cubes) 

906 except (KeyError, AttributeError): 

907 pass 

908 

909 

910def _add_wind_speed_um(cubes: iris.cube.CubeList): 

911 """Add windspeeds to cubes from the UM.""" 

912 wspd10 = ( 

913 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

914 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

915 ) ** 0.5 

916 wspd10.attributes["STASH"] = "m01s03i227" 

917 wspd10.standard_name = "wind_speed" 

918 wspd10.long_name = "wind_speed_at_10m" 

919 cubes.append(wspd10) 

920 

921 

922def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

923 """To convert winds to true directions. 

924 

925 Convert from the components relative to the grid to true directions. 

926 This functionality only handles the simplest case. 

927 """ 

928 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225")) 

929 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226")) 

930 for u, v in zip(u_grids, v_grids, strict=True): 

931 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0)) 

932 u.data = true_u.core_data() 

933 v.data = true_v.core_data() 

934 

935 

936def _normalise_var0_varname(cube: iris.cube.Cube): 

937 """Fix varnames for consistency to allow merging. 

938 

939 Some model data netCDF sometimes have a coordinate name end in 

940 "_0" etc, where duplicate coordinates of same name are defined but 

941 with different attributes. This can be inconsistently managed in 

942 different model inputs and can cause cubes to fail to merge. 

943 """ 

944 for coord in cube.coords(): 

945 if coord.var_name and coord.var_name.endswith("_0"): 

946 coord.var_name = coord.var_name.removesuffix("_0") 

947 if coord.var_name and coord.var_name.endswith("_1"): 

948 coord.var_name = coord.var_name.removesuffix("_1") 

949 if coord.var_name and coord.var_name.endswith("_2"): 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true

950 coord.var_name = coord.var_name.removesuffix("_2") 

951 if coord.var_name and coord.var_name.endswith("_3"): 951 ↛ 952line 951 didn't jump to line 952 because the condition on line 951 was never true

952 coord.var_name = coord.var_name.removesuffix("_3") 

953 

954 if cube.var_name and cube.var_name.endswith("_0"): 

955 cube.var_name = cube.var_name.removesuffix("_0") 

956 

957 

958def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

959 """Convert the calendars of time units to use a standard calendar.""" 

960 try: 

961 time_coord = cube.coord("time") 

962 if time_coord.units.calendar == "proleptic_gregorian": 

963 logging.debug( 

964 "Changing proleptic Gregorian calendar to standard calendar for %s", 

965 repr(time_coord.units), 

966 ) 

967 time_coord.units = time_coord.units.change_calendar("standard") 

968 except iris.exceptions.CoordinateNotFoundError: 

969 pass 

970 

971 

972def _lfric_time_callback(cube: iris.cube.Cube): 

973 """Fix time coordinate metadata if missing dimensions. 

974 

975 Some model data does not contain forecast_reference_time or forecast_period as 

976 expected coordinates, and so we cannot aggregate over case studies without this 

977 metadata. This callback fixes these issues. 

978 

979 This callback also ensures all time coordinates are referenced as hours since 

980 1970-01-01 00:00:00 for consistency across different model inputs. 

981 

982 Notes 

983 ----- 

984 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

985 """ 

986 # Construct forecast_reference time if it doesn't exist. 

987 try: 

988 tcoord = cube.coord("time") 

989 # Set time coordinate to common basis "hours since 1970" 

990 try: 

991 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

992 except ValueError: 

993 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

994 

995 if not cube.coords("forecast_reference_time"): 

996 try: 

997 init_time = datetime.datetime.fromisoformat( 

998 tcoord.attributes["time_origin"] 

999 ) 

1000 frt_point = tcoord.units.date2num(init_time) 

1001 frt_coord = iris.coords.AuxCoord( 

1002 frt_point, 

1003 units=tcoord.units, 

1004 standard_name="forecast_reference_time", 

1005 long_name="forecast_reference_time", 

1006 ) 

1007 cube.add_aux_coord(frt_coord) 

1008 except KeyError: 

1009 logging.warning( 

1010 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

1011 ) 

1012 

1013 # Remove time_origin to allow multiple case studies to merge. 

1014 tcoord.attributes.pop("time_origin", None) 

1015 

1016 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

1017 if not cube.coords("forecast_period"): 

1018 try: 

1019 # Create array of forecast lead times. 

1020 init_coord = cube.coord("forecast_reference_time") 

1021 init_time_points_in_tcoord_units = tcoord.units.date2num( 

1022 init_coord.units.num2date(init_coord.points) 

1023 ) 

1024 lead_times = tcoord.points - init_time_points_in_tcoord_units 

1025 

1026 # Get unit for lead time from time coordinate's unit. 

1027 # Convert all lead time to hours for consistency between models. 

1028 if "seconds" in str(tcoord.units): 1028 ↛ 1029line 1028 didn't jump to line 1029 because the condition on line 1028 was never true

1029 lead_times = lead_times / 3600.0 

1030 units = "hours" 

1031 elif "hours" in str(tcoord.units): 1031 ↛ 1034line 1031 didn't jump to line 1034 because the condition on line 1031 was always true

1032 units = "hours" 

1033 else: 

1034 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

1035 

1036 # Create lead time coordinate. 

1037 lead_time_coord = iris.coords.AuxCoord( 

1038 lead_times, 

1039 standard_name="forecast_period", 

1040 long_name="forecast_period", 

1041 units=units, 

1042 ) 

1043 

1044 # Associate lead time coordinate with time dimension. 

1045 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

1046 except iris.exceptions.CoordinateNotFoundError: 

1047 logging.warning( 

1048 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1049 ) 

1050 except iris.exceptions.CoordinateNotFoundError: 

1051 logging.warning("No time coordinate on cube.") 

1052 

1053 

1054def _lfric_forecast_period_callback(cube: iris.cube.Cube): 

1055 """Check forecast_period name and units.""" 

1056 try: 

1057 coord = cube.coord("forecast_period") 

1058 if coord.units != "hours": 

1059 cube.coord("forecast_period").convert_units("hours") 

1060 if not coord.standard_name: 

1061 coord.standard_name = "forecast_period" 

1062 except iris.exceptions.CoordinateNotFoundError: 

1063 pass 

1064 

1065 

1066def _fix_no_time_coords_callback(cube: iris.cube.Cube): 

1067 """Add dummy time coord to process cubes that don't have sequence coord.""" 

1068 # Only add if time coordinate does not exist. 

1069 if not cube.coords("time"): 

1070 cube.add_aux_coord( 

1071 iris.coords.DimCoord( 

1072 0, standard_name="time", units="hours since 0001-01-01 00:00:00" 

1073 ) 

1074 ) 

1075 

1076 return cube 

1077 

1078 

1079def _normalise_ML_varname(cube: iris.cube.Cube): 

1080 """Fix plev variable names to standard names.""" 

1081 if cube.coords("pressure"): 

1082 if cube.name() == "x_wind": 

1083 cube.long_name = "zonal_wind_at_pressure_levels" 

1084 if cube.name() == "y_wind": 

1085 cube.long_name = "meridional_wind_at_pressure_levels" 

1086 if cube.name() == "air_temperature": 

1087 cube.long_name = "temperature_at_pressure_levels" 

1088 if cube.name() == "specific_humidity": 1088 ↛ 1089line 1088 didn't jump to line 1089 because the condition on line 1088 was never true

1089 cube.long_name = ( 

1090 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging" 

1091 ) 

1092 

1093 

1094def _check_combine_point_observations(cubes: iris.cube.CubeList): 

1095 """Enable cubes containing different point observation sources to be concatenated.""" 

1096 nstation = 0 

1097 cset_comparison = None 

1098 for cube in cubes: 

1099 if "station" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

1100 if "obs_source" in [coord.name() for coord in cube.coords()]: 1100 ↛ 1101line 1100 didn't jump to line 1101 because the condition on line 1100 was never true

1101 cube.remove_coord("obs_source") 

1102 cube.coord("station").points = cube.coord("station").points + nstation 

1103 nstation = nstation + len(cube.coord("station").points) 

1104 if "cset_comparison_base" in cube.attributes: 1104 ↛ 1107line 1104 didn't jump to line 1107 because the condition on line 1104 was always true

1105 cset_comparison = cube.attributes["cset_comparison_base"] 

1106 else: 

1107 cube.attributes["cset_comparison_base"] = cset_comparison 

1108 

1109 return cubes