Coverage for src / CSET / operators / read.py: 91%

366 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 11:58 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for reading various types of files from disk.""" 

16 

17import ast 

18import datetime 

19import functools 

20import glob 

21import itertools 

22import logging 

23from pathlib import Path 

24from typing import Literal 

25 

26import iris 

27import iris.coord_systems 

28import iris.coords 

29import iris.cube 

30import iris.exceptions 

31import iris.util 

32import numpy as np 

33from iris.analysis.cartography import rotate_pole, rotate_winds 

34 

35from CSET._common import iter_maybe 

36from CSET.operators._stash_to_lfric import STASH_TO_LFRIC 

37from CSET.operators._utils import get_cube_yxcoordname 

38 

39 

40class NoDataError(FileNotFoundError): 

41 """Error that no data has been loaded.""" 

42 

43 

44def read_cube( 

45 file_paths: list[str] | str, 

46 constraint: iris.Constraint = None, 

47 model_names: list[str] | str | None = None, 

48 subarea_type: str = None, 

49 subarea_extent: list[float] = None, 

50 **kwargs, 

51) -> iris.cube.Cube: 

52 """Read a single cube from files. 

53 

54 Read operator that takes a path string (can include shell-style glob 

55 patterns), and loads the cube matching the constraint. If any paths point to 

56 directory, all the files contained within are loaded. 

57 

58 Ensemble data can also be loaded. If it has a realization coordinate 

59 already, it will be directly used. If not, it will have its member number 

60 guessed from the filename, based on one of several common patterns. For 

61 example the pattern *emXX*, where XX is the realization. 

62 

63 Deterministic data will be loaded with a realization of 0, allowing it to be 

64 processed in the same way as ensemble data. 

65 

66 Arguments 

67 --------- 

68 file_paths: str | list[str] 

69 Path or paths to where .pp/.nc files are located 

70 constraint: iris.Constraint | iris.ConstraintCombination, optional 

71 Constraints to filter data by. Defaults to unconstrained. 

72 model_names: str | list[str], optional 

73 Names of the models that correspond to respective paths in file_paths. 

74 subarea_type: "gridcells" | "modelrelative" | "realworld", optional 

75 Whether to constrain data by model relative coordinates or real world 

76 coordinates. 

77 subarea_extent: list, optional 

78 List of coordinates to constraint data by, in order lower latitude, 

79 upper latitude, lower longitude, upper longitude. 

80 

81 Returns 

82 ------- 

83 cubes: iris.cube.Cube 

84 Cube loaded 

85 

86 Raises 

87 ------ 

88 FileNotFoundError 

89 If the provided path does not exist 

90 ValueError 

91 If the constraint doesn't produce a single cube. 

92 """ 

93 cubes = read_cubes( 

94 file_paths=file_paths, 

95 constraint=constraint, 

96 model_names=model_names, 

97 subarea_type=subarea_type, 

98 subarea_extent=subarea_extent, 

99 ) 

100 # Check filtered cubes is a CubeList containing one cube. 

101 if len(cubes) == 1: 

102 return cubes[0] 

103 else: 

104 raise ValueError( 

105 f"Constraint doesn't produce single cube: {constraint}\n{cubes}" 

106 ) 

107 

108 

109def read_cubes( 

110 file_paths: list[str] | str, 

111 constraint: iris.Constraint | None = None, 

112 model_names: str | list[str] | None = None, 

113 subarea_type: str = None, 

114 subarea_extent: list = None, 

115 **kwargs, 

116) -> iris.cube.CubeList: 

117 """Read cubes from files. 

118 

119 Read operator that takes a path string (can include shell-style glob 

120 patterns), and loads the cubes matching the constraint. If any paths point 

121 to directory, all the files contained within are loaded. 

122 

123 Ensemble data can also be loaded. If it has a realization coordinate 

124 already, it will be directly used. If not, it will have its member number 

125 guessed from the filename, based on one of several common patterns. For 

126 example the pattern *emXX*, where XX is the realization. 

127 

128 Deterministic data will be loaded with a realization of 0, allowing it to be 

129 processed in the same way as ensemble data. 

130 

131 Data output by XIOS (such as LFRic) has its per-file metadata removed so 

132 that the cubes merge across files. 

133 

134 Arguments 

135 --------- 

136 file_paths: str | list[str] 

137 Path or paths to where .pp/.nc files are located. Can include globs. 

138 constraint: iris.Constraint | iris.ConstraintCombination, optional 

139 Constraints to filter data by. Defaults to unconstrained. 

140 model_names: str | list[str], optional 

141 Names of the models that correspond to respective paths in file_paths. 

142 subarea_type: str, optional 

143 Whether to constrain data by model relative coordinates or real world 

144 coordinates. 

145 subarea_extent: list[float], optional 

146 List of coordinates to constraint data by, in order lower latitude, 

147 upper latitude, lower longitude, upper longitude. 

148 

149 Returns 

150 ------- 

151 cubes: iris.cube.CubeList 

152 Cubes loaded after being merged and concatenated. 

153 

154 Raises 

155 ------ 

156 FileNotFoundError 

157 If the provided path does not exist 

158 """ 

159 # Get iterable of paths. Each path corresponds to 1 model. 

160 paths = iter_maybe(file_paths) 

161 model_names = iter_maybe(model_names) 

162 

163 # Check we have appropriate number of model names. 

164 if model_names != (None,) and len(model_names) != len(paths): 

165 raise ValueError( 

166 f"The number of model names ({len(model_names)}) should equal " 

167 f"the number of paths given ({len(paths)})." 

168 ) 

169 

170 # Load the data for each model into a CubeList per model. 

171 model_cubes = ( 

172 _load_model(path, name, constraint) 

173 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None) 

174 ) 

175 

176 # Split out first model's cubes and mark it as the base for comparisons. 

177 cubes = next(model_cubes) 

178 for cube in cubes: 

179 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes. 

180 cube.attributes["cset_comparison_base"] = 1 

181 

182 # Load the rest of the models. 

183 cubes.extend(itertools.chain.from_iterable(model_cubes)) 

184 

185 # Unify time units so different case studies can merge. 

186 iris.util.unify_time_units(cubes) 

187 

188 # Select sub region. 

189 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent) 

190 # Merge and concatenate cubes now metadata has been fixed. 

191 cubes = cubes.merge() 

192 cubes = cubes.concatenate() 

193 

194 # Ensure dimension coordinates are bounded. 

195 for cube in cubes: 

196 for dim_coord in cube.coords(dim_coords=True): 

197 # Iris can't guess the bounds of a scalar coordinate. 

198 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1: 

199 dim_coord.guess_bounds() 

200 

201 logging.info("Loaded cubes: %s", cubes) 

202 if len(cubes) == 0: 

203 raise NoDataError("No cubes loaded, check your constraints!") 

204 return cubes 

205 

206 

207def _load_model( 

208 paths: str | list[str], 

209 model_name: str | None, 

210 constraint: iris.Constraint | None, 

211) -> iris.cube.CubeList: 

212 """Load a single model's data into a CubeList.""" 

213 input_files = _check_input_files(paths) 

214 # If unset, a constraint of None lets everything be loaded. 

215 logging.debug("Constraint: %s", constraint) 

216 cubes = iris.load( 

217 input_files, constraint, callback=_create_callback(is_ensemble=False) 

218 ) 

219 # Make the UM's winds consistent with LFRic. 

220 _fix_um_winds(cubes) 

221 

222 # Reload with ensemble handling if needed. 

223 if _is_ensemble(cubes): 

224 cubes = iris.load( 

225 input_files, constraint, callback=_create_callback(is_ensemble=True) 

226 ) 

227 

228 # Add model_name attribute to each cube to make it available at any further 

229 # step without needing to pass it as function parameter. 

230 if model_name is not None: 

231 for cube in cubes: 

232 cube.attributes["model_name"] = model_name 

233 return cubes 

234 

235 

236def _check_input_files(input_paths: str | list[str]) -> list[Path]: 

237 """Get an iterable of files to load, and check that they all exist. 

238 

239 Arguments 

240 --------- 

241 input_paths: list[str] 

242 List of paths to input files or directories. The path may itself contain 

243 glob patterns, but unlike in shells it will match directly first. 

244 

245 Returns 

246 ------- 

247 list[Path] 

248 A list of files to load. 

249 

250 Raises 

251 ------ 

252 FileNotFoundError: 

253 If the provided arguments don't resolve to at least one existing file. 

254 """ 

255 files = [] 

256 for raw_filename in iter_maybe(input_paths): 

257 # Match glob-like files first, if they exist. 

258 raw_path = Path(raw_filename) 

259 if raw_path.is_file(): 

260 files.append(raw_path) 

261 else: 

262 for input_path in glob.glob(raw_filename): 

263 # Convert string paths into Path objects. 

264 input_path = Path(input_path) 

265 # Get the list of files in the directory, or use it directly. 

266 if input_path.is_dir(): 

267 logging.debug("Checking directory '%s' for files", input_path) 

268 files.extend(p for p in input_path.iterdir() if p.is_file()) 

269 else: 

270 files.append(input_path) 

271 

272 files.sort() 

273 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files)) 

274 if len(files) == 0: 

275 raise FileNotFoundError(f"No files found for {input_paths}") 

276 return files 

277 

278 

279def _cutout_cubes( 

280 cubes: iris.cube.CubeList, 

281 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None, 

282 subarea_extent: list[float, float, float, float], 

283): 

284 """Cut out a subarea from a CubeList.""" 

285 if subarea_type is None: 

286 logging.debug("Subarea selection is disabled.") 

287 return cubes 

288 

289 # If selected, cutout according to number of grid cells to trim from each edge. 

290 cutout_cubes = iris.cube.CubeList() 

291 # Find spatial coordinates 

292 for cube in cubes: 

293 # Find dimension coordinates. 

294 lat_name, lon_name = get_cube_yxcoordname(cube) 

295 

296 # Compute cutout based on number of cells to trim from edges. 

297 if subarea_type == "gridcells": 

298 logging.debug( 

299 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s", 

300 subarea_extent[0], 

301 subarea_extent[1], 

302 subarea_extent[2], 

303 subarea_extent[3], 

304 ) 

305 lat_points = np.sort(cube.coord(lat_name).points) 

306 lon_points = np.sort(cube.coord(lon_name).points) 

307 # Define cutout region using user provided cell points. 

308 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]] 

309 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]] 

310 

311 # Compute cutout based on specified coordinate values. 

312 elif subarea_type == "realworld" or subarea_type == "modelrelative": 

313 # If not gridcells, cutout by requested geographic area, 

314 logging.debug( 

315 "User requested LLat: %s ULat: %s LLon: %s ULon: %s", 

316 subarea_extent[0], 

317 subarea_extent[1], 

318 subarea_extent[2], 

319 subarea_extent[3], 

320 ) 

321 # Define cutout region using user provided coordinates. 

322 lats = np.array(subarea_extent[0:2]) 

323 lons = np.array(subarea_extent[2:4]) 

324 # Ensure cutout longitudes are within +/- 180.0 bounds. 

325 while lons[0] < -180.0: 

326 lons += 360.0 

327 while lons[1] > 180.0: 

328 lons -= 360.0 

329 # If the coordinate system is rotated we convert coordinates into 

330 # model-relative coordinates to extract the appropriate cutout. 

331 coord_system = cube.coord(lat_name).coord_system 

332 if subarea_type == "realworld" and isinstance( 

333 coord_system, iris.coord_systems.RotatedGeogCS 

334 ): 

335 lons, lats = rotate_pole( 

336 lons, 

337 lats, 

338 pole_lon=coord_system.grid_north_pole_longitude, 

339 pole_lat=coord_system.grid_north_pole_latitude, 

340 ) 

341 else: 

342 raise ValueError("Unknown subarea_type:", subarea_type) 

343 

344 # Do cutout and add to cutout_cubes. 

345 intersection_args = {lat_name: lats, lon_name: lons} 

346 logging.debug("Cutting out coords: %s", intersection_args) 

347 try: 

348 cutout_cubes.append(cube.intersection(**intersection_args)) 

349 except IndexError as err: 

350 raise ValueError( 

351 "Region cutout error. Check and update SUBAREA_EXTENT." 

352 "Cutout region requested should be contained within data area. " 

353 "Also check if cutout region requested is smaller than input grid spacing." 

354 ) from err 

355 

356 return cutout_cubes 

357 

358 

359def _is_ensemble(cubelist: iris.cube.CubeList) -> bool: 

360 """Test if a CubeList is likely to be ensemble data. 

361 

362 If cubes either have a realization dimension, or there are multiple files 

363 for the same time-step, we can assume it is ensemble data. 

364 """ 

365 unique_cubes = set() 

366 for cube in cubelist: 

367 # Ignore realization of 0, as that is given to deterministic data. 

368 if cube.coords("realization") and any(cube.coord("realization").points != 0): 

369 return True 

370 # Compare XML representation of cube structure check for duplicates. 

371 cube_content = cube.xml() 

372 if cube_content in unique_cubes: 

373 logging.info("Ensemble data loaded.") 

374 return True 

375 else: 

376 unique_cubes.add(cube_content) 

377 logging.info("Deterministic data loaded.") 

378 return False 

379 

380 

381def _create_callback(is_ensemble: bool) -> callable: 

382 """Compose together the needed callbacks into a single function.""" 

383 

384 def callback(cube: iris.cube.Cube, field, filename: str): 

385 if is_ensemble: 

386 _ensemble_callback(cube, field, filename) 

387 else: 

388 _deterministic_callback(cube, field, filename) 

389 

390 _um_normalise_callback(cube, field, filename) 

391 _lfric_normalise_callback(cube, field, filename) 

392 _lfric_time_coord_fix_callback(cube, field, filename) 

393 _normalise_var0_varname(cube) 

394 _fix_spatial_coords_callback(cube) 

395 _fix_pressure_coord_callback(cube) 

396 _fix_um_radtime(cube) 

397 _fix_cell_methods(cube) 

398 _convert_cube_units_callback(cube) 

399 _grid_longitude_fix_callback(cube) 

400 _fix_lfric_cloud_base_altitude(cube) 

401 _proleptic_gregorian_fix(cube) 

402 _lfric_time_callback(cube) 

403 _lfric_forecast_period_standard_name_callback(cube) 

404 

405 return callback 

406 

407 

408def _ensemble_callback(cube, field, filename): 

409 """Add a realization coordinate to a cube. 

410 

411 Uses the filename to add an ensemble member ('realization') to each cube. 

412 Assumes data is formatted enuk_um_0XX/enukaa_pd0HH.pp where XX is the 

413 ensemble member. 

414 

415 Arguments 

416 --------- 

417 cube: Cube 

418 ensemble member cube 

419 field 

420 Raw data variable, unused. 

421 filename: str 

422 filename of ensemble member data 

423 """ 

424 if not cube.coords("realization"): 

425 if "em" in filename: 

426 # Assuming format is *emXX* 

427 loc = filename.find("em") + 2 

428 member = np.int32(filename[loc : loc + 2]) 

429 else: 

430 # Assuming raw fields files format is enuk_um_0XX/enukaa_pd0HH 

431 member = np.int32(filename[-15:-13]) 

432 

433 cube.add_aux_coord(iris.coords.AuxCoord(member, standard_name="realization")) 

434 

435 

436def _deterministic_callback(cube, field, filename): 

437 """Give deterministic cubes a realization of 0. 

438 

439 This means they can be handled in the same way as ensembles through the rest 

440 of the code. 

441 """ 

442 # Only add if realization coordinate does not exist. 

443 if not cube.coords("realization"): 

444 cube.add_aux_coord( 

445 iris.coords.AuxCoord(np.int32(0), standard_name="realization", units="1") 

446 ) 

447 

448 

449@functools.lru_cache(None) 

450def _warn_once(msg): 

451 """Print a warning message, skipping recent duplicates.""" 

452 logging.warning(msg) 

453 

454 

455def _um_normalise_callback(cube: iris.cube.Cube, field, filename): 

456 """Normalise UM STASH variable long names to LFRic variable names. 

457 

458 Note standard names will remain associated with cubes where different. 

459 Long name will be used consistently in output filename and titles. 

460 """ 

461 # Convert STASH to LFRic variable name 

462 if "STASH" in cube.attributes: 

463 stash = cube.attributes["STASH"] 

464 try: 

465 (name, grid) = STASH_TO_LFRIC[str(stash)] 

466 cube.long_name = name 

467 except KeyError: 

468 # Don't change cubes with unknown stash codes. 

469 _warn_once( 

470 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update." 

471 ) 

472 

473 

474def _lfric_normalise_callback(cube: iris.cube.Cube, field, filename): 

475 """Normalise attributes that prevents LFRic cube from merging. 

476 

477 The uuid and timeStamp relate to the output file, as saved by XIOS, and has 

478 no relation to the data contained. These attributes are removed. 

479 

480 The um_stash_source is a list of STASH codes for when an LFRic field maps to 

481 multiple UM fields, however it can be encoded in any order. This attribute 

482 is sorted to prevent this. This attribute is only present in LFRic data that 

483 has been converted to look like UM data. 

484 """ 

485 # Remove unwanted attributes. 

486 cube.attributes.pop("timeStamp", None) 

487 cube.attributes.pop("uuid", None) 

488 cube.attributes.pop("name", None) 

489 

490 # Sort STASH code list. 

491 stash_list = cube.attributes.get("um_stash_source") 

492 if stash_list: 

493 # Parse the string as a list, sort, then re-encode as a string. 

494 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list))) 

495 

496 

497def _lfric_time_coord_fix_callback(cube: iris.cube.Cube, field, filename): 

498 """Ensure the time coordinate is a DimCoord rather than an AuxCoord. 

499 

500 The coordinate is converted and replaced if not. SLAMed LFRic data has this 

501 issue, though the coordinate satisfies all the properties for a DimCoord. 

502 Scalar time values are left as AuxCoords. 

503 """ 

504 # This issue seems to come from iris's handling of NetCDF files where time 

505 # always ends up as an AuxCoord. 

506 if cube.coords("time"): 

507 time_coord = cube.coord("time") 

508 if ( 

509 not isinstance(time_coord, iris.coords.DimCoord) 

510 and len(cube.coord_dims(time_coord)) == 1 

511 ): 

512 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord) 

513 

514 # Force single-valued coordinates to be scalar coordinates. 

515 return iris.util.squeeze(cube) 

516 

517 

518def _grid_longitude_fix_callback(cube: iris.cube.Cube): 

519 """Check grid_longitude coordinates are in the range -180 deg to 180 deg. 

520 

521 This is necessary if comparing two models with different conventions -- 

522 for example, models where the prime meridian is defined as 0 deg or 

523 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude 

524 so that it falls in this range. Checks are for near-180 bounds given 

525 model data bounds may not extend exactly to 0. or 360. 

526 Input cubes on non-rotated grid coordinates are not impacted. 

527 """ 

528 import CSET.operators._utils as utils 

529 

530 try: 

531 y, x = utils.get_cube_yxcoordname(cube) 

532 except ValueError: 

533 # Don't modify non-spatial cubes. 

534 return cube 

535 

536 long_coord = cube.coord(x) 

537 # Wrap longitudes if rotated pole coordinates 

538 coord_system = long_coord.coord_system 

539 if x == "grid_longitude" and isinstance( 

540 coord_system, iris.coord_systems.RotatedGeogCS 

541 ): 

542 long_points = long_coord.points.copy() 

543 long_centre = np.median(long_points) 

544 while long_centre < -175.0: 

545 long_centre += 360.0 

546 long_points += 360.0 

547 while long_centre >= 175.0: 

548 long_centre -= 360.0 

549 long_points -= 360.0 

550 long_coord.points = long_points 

551 

552 # Update coord bounds to be consistent with wrapping. 

553 if long_coord.has_bounds() and np.size(long_coord) > 1: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 long_coord.bounds = None 

555 long_coord.guess_bounds() 

556 

557 return cube 

558 

559 

560def _fix_spatial_coords_callback(cube: iris.cube.Cube): 

561 """Check latitude and longitude coordinates name. 

562 

563 This is necessary as some models define their grid as on rotated 

564 'grid_latitude' and 'grid_longitude' coordinates while others define 

565 the grid on non-rotated 'latitude' and 'longitude'. 

566 Cube dimensions need to be made consistent to avoid recipe failures, 

567 particularly where comparing multiple input models with differing spatial 

568 coordinates. 

569 """ 

570 import CSET.operators._utils as utils 

571 

572 # Check if cube is spatial. 

573 if not utils.is_spatialdim(cube): 

574 # Don't modify non-spatial cubes. 

575 return cube 

576 

577 # Get spatial coords and dimension index. 

578 y_name, x_name = utils.get_cube_yxcoordname(cube) 

579 ny = utils.get_cube_coordindex(cube, y_name) 

580 nx = utils.get_cube_coordindex(cube, x_name) 

581 

582 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord 

583 # [latitude, longitude] for instances where rotated_pole=90.0 

584 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]: 

585 coord_system = cube.coord("grid_latitude").coord_system 

586 pole_lat = coord_system.grid_north_pole_latitude 

587 if pole_lat == 90.0: 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 lats = cube.coord("grid_latitude").points 

589 lons = cube.coord("grid_longitude").points 

590 

591 cube.remove_coord("grid_latitude") 

592 cube.add_dim_coord( 

593 iris.coords.DimCoord( 

594 lats, 

595 standard_name="latitude", 

596 var_name="latitude", 

597 units="degrees", 

598 coord_system=iris.coord_systems.GeogCS(6371229.0), 

599 circular=True, 

600 ), 

601 ny, 

602 ) 

603 y_name = "latitude" 

604 cube.remove_coord("grid_longitude") 

605 cube.add_dim_coord( 

606 iris.coords.DimCoord( 

607 lons, 

608 standard_name="longitude", 

609 var_name="longitude", 

610 units="degrees", 

611 coord_system=iris.coord_systems.GeogCS(6371229.0), 

612 circular=True, 

613 ), 

614 nx, 

615 ) 

616 x_name = "longitude" 

617 

618 # Create additional AuxCoord [grid_latitude, grid_longitude] with 

619 # rotated pole attributes for cases with [lat, lon] inputs 

620 if y_name in ["latitude"] and cube.coord(y_name).units in [ 

621 "degrees", 

622 "degrees_north", 

623 "degrees_south", 

624 ]: 

625 # Add grid_latitude AuxCoord 

626 if "grid_latitude" not in [ 626 ↛ 639line 626 didn't jump to line 639 because the condition on line 626 was always true

627 coord.name() for coord in cube.coords(dim_coords=False) 

628 ]: 

629 cube.add_aux_coord( 

630 iris.coords.AuxCoord( 

631 cube.coord(y_name).points, 

632 var_name="grid_latitude", 

633 units="degrees", 

634 ), 

635 ny, 

636 ) 

637 # Ensure input latitude DimCoord has CoordSystem 

638 # This attribute is sometimes lost on iris.save 

639 if not cube.coord(y_name).coord_system: 

640 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

641 

642 if x_name in ["longitude"] and cube.coord(x_name).units in [ 

643 "degrees", 

644 "degrees_west", 

645 "degrees_east", 

646 ]: 

647 # Add grid_longitude AuxCoord 

648 if "grid_longitude" not in [ 648 ↛ 662line 648 didn't jump to line 662 because the condition on line 648 was always true

649 coord.name() for coord in cube.coords(dim_coords=False) 

650 ]: 

651 cube.add_aux_coord( 

652 iris.coords.AuxCoord( 

653 cube.coord(x_name).points, 

654 var_name="grid_longitude", 

655 units="degrees", 

656 ), 

657 nx, 

658 ) 

659 

660 # Ensure input longitude DimCoord has CoordSystem 

661 # This attribute is sometimes lost on iris.save 

662 if not cube.coord(x_name).coord_system: 

663 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0) 

664 

665 

666def _fix_pressure_coord_callback(cube: iris.cube.Cube): 

667 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units. 

668 

669 This problem was raised because the AIFS model data from ECMWF 

670 defines the pressure coordinate with the name "pressure_level" rather 

671 than compliant CF coordinate names. 

672 

673 Additionally, set the units of pressure to be hPa to be consistent with the UM, 

674 and approach the coordinates in a unified way. 

675 """ 

676 for coord in cube.dim_coords: 

677 if coord.name() in ["pressure_level", "pressure_levels"]: 

678 coord.rename("pressure") 

679 

680 if coord.name() == "pressure": 

681 if str(cube.coord("pressure").units) != "hPa": 

682 cube.coord("pressure").convert_units("hPa") 

683 

684 

685def _fix_um_radtime(cube: iris.cube.Cube): 

686 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour. 

687 

688 This callback does not have any effect for output diagnostics with 

689 timestamps exactly 00 or 30 minutes past the hour. Only radiation 

690 diagnostics are checked. 

691 Note this callback does not interpolate the data in time, only adjust 

692 timestamps to sit on the hour to enable time-to-time difference plotting 

693 with models which may output radiation data on the hour. 

694 """ 

695 try: 

696 if cube.attributes["STASH"] in [ 

697 "m01s01i207", 

698 "m01s01i208", 

699 "m01s02i205", 

700 "m01s02i201", 

701 "m01s01i207", 

702 "m01s02i207", 

703 "m01s01i235", 

704 ]: 

705 time_coord = cube.coord("time") 

706 

707 # Convert time points to datetime objects 

708 time_unit = time_coord.units 

709 time_points = time_unit.num2date(time_coord.points) 

710 # Skip if times don't need fixing. 

711 if time_points[0].minute == 0 and time_points[0].second == 0: 

712 return 

713 if time_points[0].minute == 30 and time_points[0].second == 0: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 return 

715 

716 # Subtract time difference from the hour from each time point 

717 n_minute = time_points[0].minute 

718 n_second = time_points[0].second 

719 # If times closer to next hour, compute difference to add on to following hour 

720 if n_minute > 30: 

721 n_minute = n_minute - 60 

722 # Compute new diagnostic time stamp 

723 new_time_points = ( 

724 time_points 

725 - datetime.timedelta(minutes=n_minute) 

726 - datetime.timedelta(seconds=n_second) 

727 ) 

728 

729 # Convert back to numeric values using the original time unit. 

730 new_time_values = time_unit.date2num(new_time_points) 

731 

732 # Replace the time coordinate with updated values. 

733 time_coord.points = new_time_values 

734 

735 # Recompute forecast_period with corrected values. 

736 if cube.coord("forecast_period"): 736 ↛ exitline 736 didn't return from function '_fix_um_radtime' because the condition on line 736 was always true

737 fcst_prd_points = cube.coord("forecast_period").points 

738 new_fcst_points = ( 

739 time_unit.num2date(fcst_prd_points) 

740 - datetime.timedelta(minutes=n_minute) 

741 - datetime.timedelta(seconds=n_second) 

742 ) 

743 cube.coord("forecast_period").points = time_unit.date2num( 

744 new_fcst_points 

745 ) 

746 except KeyError: 

747 pass 

748 

749 

750def _fix_cell_methods(cube: iris.cube.Cube): 

751 """To fix the assumed cell_methods in accumulation STASH from UM. 

752 

753 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount 

754 (m01s04i202, m01s05i202) in UM is being output as a time accumulation, 

755 over each hour (TAcc1hr), but input cubes show cell_methods as "mean". 

756 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are 

757 "sum". This callback changes "mean" cube attribute cell_method to "sum", 

758 enabling the cell_method constraint on reading to select correct input. 

759 """ 

760 # Shift "mean" cell_method to "sum" for selected UM inputs. 

761 if cube.attributes.get("STASH") in [ 

762 "m01s21i104", 

763 "m01s04i201", 

764 "m01s04i202", 

765 "m01s05i201", 

766 "m01s05i202", 

767 ]: 

768 # Check if input cell_method contains "mean" time-processing. 

769 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 769 ↛ exitline 769 didn't return from function '_fix_cell_methods' because the condition on line 769 was always true

770 # Retrieve interval and any comment information. 

771 for cell_method in cube.cell_methods: 

772 interval_str = cell_method.intervals 

773 comment_str = cell_method.comments 

774 

775 # Remove input aggregation method. 

776 cube.cell_methods = () 

777 

778 # Replace "mean" with "sum" cell_method to indicate aggregation. 

779 cube.add_cell_method( 

780 iris.coords.CellMethod( 

781 method="sum", 

782 coords="time", 

783 intervals=interval_str, 

784 comments=comment_str, 

785 ) 

786 ) 

787 

788 

789def _convert_cube_units_callback(cube: iris.cube.Cube): 

790 """Adjust diagnostic units for specific variables. 

791 

792 Some precipitation diagnostics are output with unit kg m-2 s-1 and are 

793 converted here to mm hr-1. 

794 

795 Visibility diagnostics are converted here from m to km to improve output 

796 formatting. 

797 """ 

798 # Convert precipitation diagnostic units if required. 

799 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

800 if any("surface_microphysical" in name for name in varnames): 

801 if cube.units == "kg m-2 s-1": 

802 logging.debug( 

803 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1" 

804 ) 

805 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water. 

806 # This is a 1:1 conversion, so we just change the units. 

807 cube.units = "mm s-1" 

808 # Convert the units to per hour. 

809 cube.convert_units("mm hr-1") 

810 elif cube.units == "kg m-2": 810 ↛ 817line 810 didn't jump to line 817 because the condition on line 810 was always true

811 logging.debug("Converting precipitation amount units from kg m-2 to mm") 

812 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water. 

813 # This is a 1:1 conversion, so we just change the units. 

814 cube.units = "mm" 

815 

816 # Convert visibility diagnostic units if required. 

817 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

818 if any("visibility" in name for name in varnames): 

819 if cube.units == "m": 819 ↛ 824line 819 didn't jump to line 824 because the condition on line 819 was always true

820 logging.debug("Converting visibility units m to km.") 

821 # Convert the units to km. 

822 cube.convert_units("km") 

823 

824 return cube 

825 

826 

827def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube): 

828 """Mask cloud_base_altitude diagnostic in regions with no cloud.""" 

829 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name]) 

830 if any("cloud_base_altitude" in name for name in varnames): 

831 # Mask cube where set > 144kft to catch default 144.35695538058164 

832 cube.data = np.ma.masked_array(cube.data) 

833 cube.data[cube.data > 144.0] = np.ma.masked 

834 

835 

836def _fix_um_winds(cubes: iris.cube.CubeList): 

837 """To make winds from the UM consistent with those from LFRic. 

838 

839 Diagnostics of wind are not always consistent between the UM 

840 and LFric. Here, winds from the UM are adjusted to make them i 

841 consistent with LFRic. 

842 """ 

843 # Check whether we have components of the wind identified by STASH, 

844 # (so this will apply only to cubes from the UM), but not the 

845 # wind speed and calculate it if it is missing. Note that 

846 # this will be biased low in general because the components will mostly 

847 # be time averages. For simplicity, we do this only if there is just one 

848 # cube of a component. A more complicated approach would be to consider 

849 # the cell methods, but it may not be warranted. 

850 u_constr = iris.AttributeConstraint(STASH="m01s03i225") 

851 v_constr = iris.AttributeConstraint(STASH="m01s03i226") 

852 speed_constr = iris.AttributeConstraint(STASH="m01s03i227") 

853 try: 

854 if cubes.extract(u_constr) and cubes.extract(v_constr): 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr): 

856 _add_wind_speed_um(cubes) 

857 # Convert winds in the UM to be relative to true east and true north. 

858 _convert_wind_true_dirn_um(cubes) 

859 except (KeyError, AttributeError): 

860 pass 

861 

862 

863def _add_wind_speed_um(cubes: iris.cube.CubeList): 

864 """Add windspeeds to cubes from the UM.""" 

865 wspd10 = ( 

866 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2 

867 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2 

868 ) ** 0.5 

869 wspd10.attributes["STASH"] = "m01s03i227" 

870 wspd10.standard_name = "wind_speed" 

871 wspd10.long_name = "wind_speed_at_10m" 

872 cubes.append(wspd10) 

873 

874 

875def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList): 

876 """To convert winds to true directions. 

877 

878 Convert from the components relative to the grid to true directions. 

879 This functionality only handles the simplest case. 

880 """ 

881 u_grid = cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225")) 

882 v_grid = cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226")) 

883 true_u, true_v = rotate_winds(u_grid, v_grid, iris.coord_systems.GeogCS(6371229.0)) 

884 u_grid.data = true_u.data 

885 v_grid.data = true_v.data 

886 

887 

888def _normalise_var0_varname(cube: iris.cube.Cube): 

889 """Fix varnames for consistency to allow merging. 

890 

891 Some model data netCDF sometimes have a coordinate name end in 

892 "_0" etc, where duplicate coordinates of same name are defined but 

893 with different attributes. This can be inconsistently managed in 

894 different model inputs and can cause cubes to fail to merge. 

895 """ 

896 for coord in cube.coords(): 

897 if coord.var_name and coord.var_name.endswith("_0"): 

898 coord.var_name = coord.var_name.removesuffix("_0") 

899 if coord.var_name and coord.var_name.endswith("_1"): 

900 coord.var_name = coord.var_name.removesuffix("_1") 

901 if coord.var_name and coord.var_name.endswith("_2"): 901 ↛ 902line 901 didn't jump to line 902 because the condition on line 901 was never true

902 coord.var_name = coord.var_name.removesuffix("_2") 

903 if coord.var_name and coord.var_name.endswith("_3"): 903 ↛ 904line 903 didn't jump to line 904 because the condition on line 903 was never true

904 coord.var_name = coord.var_name.removesuffix("_3") 

905 

906 if cube.var_name and cube.var_name.endswith("_0"): 

907 cube.var_name = cube.var_name.removesuffix("_0") 

908 

909 

910def _proleptic_gregorian_fix(cube: iris.cube.Cube): 

911 """Convert the calendars of time units to use a standard calendar.""" 

912 try: 

913 time_coord = cube.coord("time") 

914 if time_coord.units.calendar == "proleptic_gregorian": 

915 logging.debug( 

916 "Changing proleptic Gregorian calendar to standard calendar for %s", 

917 repr(time_coord.units), 

918 ) 

919 time_coord.units = time_coord.units.change_calendar("standard") 

920 except iris.exceptions.CoordinateNotFoundError: 

921 pass 

922 

923 

924def _lfric_time_callback(cube: iris.cube.Cube): 

925 """Fix time coordinate metadata if missing dimensions. 

926 

927 Some model data does not contain forecast_reference_time or forecast_period as 

928 expected coordinates, and so we cannot aggregate over case studies without this 

929 metadata. This callback fixes these issues. 

930 

931 This callback also ensures all time coordinates are referenced as hours since 

932 1970-01-01 00:00:00 for consistency across different model inputs. 

933 

934 Notes 

935 ----- 

936 Some parts of the code have been adapted from Paul Earnshaw's scripts. 

937 """ 

938 # Construct forecast_reference time if it doesn't exist. 

939 try: 

940 tcoord = cube.coord("time") 

941 # Set time coordinate to common basis "hours since 1970" 

942 try: 

943 tcoord.convert_units("hours since 1970-01-01 00:00:00") 

944 except ValueError: 

945 logging.warning("Unrecognised base time unit: %s", tcoord.units) 

946 

947 if not cube.coords("forecast_reference_time"): 

948 try: 

949 init_time = datetime.datetime.fromisoformat( 

950 tcoord.attributes["time_origin"] 

951 ) 

952 frt_point = tcoord.units.date2num(init_time) 

953 frt_coord = iris.coords.AuxCoord( 

954 frt_point, 

955 units=tcoord.units, 

956 standard_name="forecast_reference_time", 

957 long_name="forecast_reference_time", 

958 ) 

959 cube.add_aux_coord(frt_coord) 

960 except KeyError: 

961 logging.warning( 

962 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from." 

963 ) 

964 

965 # Remove time_origin to allow multiple case studies to merge. 

966 tcoord.attributes.pop("time_origin", None) 

967 

968 # Construct forecast_period axis (forecast lead time) if it doesn't exist. 

969 if not cube.coords("forecast_period"): 

970 try: 

971 # Create array of forecast lead times. 

972 init_coord = cube.coord("forecast_reference_time") 

973 init_time_points_in_tcoord_units = tcoord.units.date2num( 

974 init_coord.units.num2date(init_coord.points) 

975 ) 

976 lead_times = tcoord.points - init_time_points_in_tcoord_units 

977 

978 # Get unit for lead time from time coordinate's unit. 

979 # Convert all lead time to hours for consistency between models. 

980 if "seconds" in str(tcoord.units): 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true

981 lead_times = lead_times / 3600.0 

982 units = "hours" 

983 elif "hours" in str(tcoord.units): 983 ↛ 986line 983 didn't jump to line 986 because the condition on line 983 was always true

984 units = "hours" 

985 else: 

986 raise ValueError(f"Unrecognised base time unit: {tcoord.units}") 

987 

988 # Create lead time coordinate. 

989 lead_time_coord = iris.coords.AuxCoord( 

990 lead_times, 

991 standard_name="forecast_period", 

992 long_name="forecast_period", 

993 units=units, 

994 ) 

995 

996 # Associate lead time coordinate with time dimension. 

997 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time")) 

998 except iris.exceptions.CoordinateNotFoundError: 

999 logging.warning( 

1000 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period" 

1001 ) 

1002 except iris.exceptions.CoordinateNotFoundError: 

1003 logging.warning("No time coordinate on cube.") 

1004 

1005 

1006def _lfric_forecast_period_standard_name_callback(cube: iris.cube.Cube): 

1007 """Add forecast_period standard name if missing.""" 

1008 try: 

1009 coord = cube.coord("forecast_period") 

1010 if not coord.standard_name: 

1011 coord.standard_name = "forecast_period" 

1012 except iris.exceptions.CoordinateNotFoundError: 

1013 pass