Coverage for src / CSET / operators / read.py: 90%
396 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:46 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import dask
27import iris
28import iris.coord_systems
29import iris.coords
30import iris.cube
31import iris.exceptions
32import iris.util
33import numpy as np
34from iris.analysis.cartography import rotate_pole, rotate_winds
36from CSET._common import iter_maybe
37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
38from CSET.operators._utils import (
39 get_cube_coordindex,
40 get_cube_yxcoordname,
41 is_spatialdim,
42)
45class NoDataError(FileNotFoundError):
46 """Error that no data has been loaded."""
49def read_cube(
50 file_paths: list[str] | str,
51 constraint: iris.Constraint = None,
52 model_names: list[str] | str | None = None,
53 subarea_type: str = None,
54 subarea_extent: list[float] = None,
55 **kwargs,
56) -> iris.cube.Cube:
57 """Read a single cube from files.
59 Read operator that takes a path string (can include shell-style glob
60 patterns), and loads the cube matching the constraint. If any paths point to
61 directory, all the files contained within are loaded.
63 Ensemble data can also be loaded. If it has a realization coordinate
64 already, it will be directly used. If not, it will have its member number
65 guessed from the filename, based on one of several common patterns. For
66 example the pattern *emXX*, where XX is the realization.
68 Deterministic data will be loaded with a realization of 0, allowing it to be
69 processed in the same way as ensemble data.
71 Arguments
72 ---------
73 file_paths: str | list[str]
74 Path or paths to where .pp/.nc files are located
75 constraint: iris.Constraint | iris.ConstraintCombination, optional
76 Constraints to filter data by. Defaults to unconstrained.
77 model_names: str | list[str], optional
78 Names of the models that correspond to respective paths in file_paths.
79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
80 Whether to constrain data by model relative coordinates or real world
81 coordinates.
82 subarea_extent: list, optional
83 List of coordinates to constraint data by, in order lower latitude,
84 upper latitude, lower longitude, upper longitude.
86 Returns
87 -------
88 cubes: iris.cube.Cube
89 Cube loaded
91 Raises
92 ------
93 FileNotFoundError
94 If the provided path does not exist
95 ValueError
96 If the constraint doesn't produce a single cube.
97 """
98 cubes = read_cubes(
99 file_paths=file_paths,
100 constraint=constraint,
101 model_names=model_names,
102 subarea_type=subarea_type,
103 subarea_extent=subarea_extent,
104 )
105 # Check filtered cubes is a CubeList containing one cube.
106 if len(cubes) == 1:
107 return cubes[0]
108 else:
109 raise ValueError(
110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
111 )
114def read_cubes(
115 file_paths: list[str] | str,
116 constraint: iris.Constraint | None = None,
117 model_names: str | list[str] | None = None,
118 subarea_type: str = None,
119 subarea_extent: list = None,
120 **kwargs,
121) -> iris.cube.CubeList:
122 """Read cubes from files.
124 Read operator that takes a path string (can include shell-style glob
125 patterns), and loads the cubes matching the constraint. If any paths point
126 to directory, all the files contained within are loaded.
128 Ensemble data can also be loaded. If it has a realization coordinate
129 already, it will be directly used. If not, it will have its member number
130 guessed from the filename, based on one of several common patterns. For
131 example the pattern *emXX*, where XX is the realization.
133 Deterministic data will be loaded with a realization of 0, allowing it to be
134 processed in the same way as ensemble data.
136 Data output by XIOS (such as LFRic) has its per-file metadata removed so
137 that the cubes merge across files.
139 Arguments
140 ---------
141 file_paths: str | list[str]
142 Path or paths to where .pp/.nc files are located. Can include globs.
143 constraint: iris.Constraint | iris.ConstraintCombination, optional
144 Constraints to filter data by. Defaults to unconstrained.
145 model_names: str | list[str], optional
146 Names of the models that correspond to respective paths in file_paths.
147 subarea_type: str, optional
148 Whether to constrain data by model relative coordinates or real world
149 coordinates.
150 subarea_extent: list[float], optional
151 List of coordinates to constraint data by, in order lower latitude,
152 upper latitude, lower longitude, upper longitude.
154 Returns
155 -------
156 cubes: iris.cube.CubeList
157 Cubes loaded after being merged and concatenated.
159 Raises
160 ------
161 FileNotFoundError
162 If the provided path does not exist
163 """
164 # Get iterable of paths. Each path corresponds to 1 model.
165 paths = iter_maybe(file_paths)
166 model_names = iter_maybe(model_names)
168 # Check we have appropriate number of model names.
169 if model_names != (None,) and len(model_names) != len(paths):
170 raise ValueError(
171 f"The number of model names ({len(model_names)}) should equal "
172 f"the number of paths given ({len(paths)})."
173 )
175 # Load the data for each model into a CubeList per model.
176 model_cubes = (
177 _load_model(path, name, constraint)
178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
179 )
181 # Split out first model's cubes and mark it as the base for comparisons.
182 cubes = next(model_cubes)
183 for cube in cubes:
184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
185 cube.attributes["cset_comparison_base"] = 1
187 # Load the rest of the models.
188 cubes.extend(itertools.chain.from_iterable(model_cubes))
190 # Unify time units so different case studies can merge.
191 iris.util.unify_time_units(cubes)
193 # Select sub region.
194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
196 # Merge and concatenate cubes now metadata has been fixed.
197 cubes = _merge_cubes_check_ensemble(cubes)
198 cubes = cubes.concatenate()
200 # Squeeze single valued coordinates into scalar coordinates.
201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
203 # Ensure dimension coordinates are bounded.
204 for cube in cubes:
205 for dim_coord in cube.coords(dim_coords=True):
206 # Iris can't guess the bounds of a scalar coordinate.
207 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
208 dim_coord.guess_bounds()
210 logging.info("Loaded cubes: %s", cubes)
211 if len(cubes) == 0:
212 raise NoDataError("No cubes loaded, check your constraints!")
213 return cubes
216def _load_model(
217 paths: str | list[str],
218 model_name: str | None,
219 constraint: iris.Constraint | None,
220) -> iris.cube.CubeList:
221 """Load a single model's data into a CubeList."""
222 input_files = _check_input_files(paths)
223 # If unset, a constraint of None lets everything be loaded.
224 logging.debug("Constraint: %s", constraint)
225 cubes = iris.load(input_files, constraint, callback=_loading_callback)
226 # Make the UM's winds consistent with LFRic.
227 _fix_um_winds(cubes)
229 # Add model_name attribute to each cube to make it available at any further
230 # step without needing to pass it as function parameter.
231 if model_name is not None:
232 for cube in cubes:
233 cube.attributes["model_name"] = model_name
234 return cubes
237def _check_input_files(input_paths: str | list[str]) -> list[Path]:
238 """Get an iterable of files to load, and check that they all exist.
240 Arguments
241 ---------
242 input_paths: list[str]
243 List of paths to input files or directories. The path may itself contain
244 glob patterns, but unlike in shells it will match directly first.
246 Returns
247 -------
248 list[Path]
249 A list of files to load.
251 Raises
252 ------
253 FileNotFoundError:
254 If the provided arguments don't resolve to at least one existing file.
255 """
256 files = []
257 for raw_filename in iter_maybe(input_paths):
258 # Match glob-like files first, if they exist.
259 raw_path = Path(raw_filename)
260 if raw_path.is_file():
261 files.append(raw_path)
262 else:
263 for input_path in glob.glob(raw_filename):
264 # Convert string paths into Path objects.
265 input_path = Path(input_path)
266 # Get the list of files in the directory, or use it directly.
267 if input_path.is_dir():
268 logging.debug("Checking directory '%s' for files", input_path)
269 files.extend(p for p in input_path.iterdir() if p.is_file())
270 else:
271 files.append(input_path)
273 files.sort()
274 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
275 if len(files) == 0:
276 raise FileNotFoundError(f"No files found for {input_paths}")
277 return files
280def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList):
281 """Attempt to merge CubeList. If unsuccessful indicates common input cube attributes, so update realization to support ensemble inputs."""
282 try:
283 cubes = cubes.merge()
284 except iris.exceptions.MergeError:
285 _log_once(
286 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.",
287 level=logging.WARNING,
288 )
289 for ir, cube in enumerate(cubes):
290 if cube.coord("realization").points == 0: 290 ↛ 289line 290 didn't jump to line 289 because the condition on line 290 was always true
291 cube.coord("realization").points = ir + 1
292 cubes = cubes.merge()
293 return cubes
296def _cutout_cubes(
297 cubes: iris.cube.CubeList,
298 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
299 subarea_extent: list[float],
300):
301 """Cut out a subarea from a CubeList."""
302 if subarea_type is None:
303 logging.debug("Subarea selection is disabled.")
304 return cubes
306 # If selected, cutout according to number of grid cells to trim from each edge.
307 cutout_cubes = iris.cube.CubeList()
308 # Find spatial coordinates
309 for cube in cubes:
310 # Find dimension coordinates.
311 lat_name, lon_name = get_cube_yxcoordname(cube)
313 # Compute cutout based on number of cells to trim from edges.
314 if subarea_type == "gridcells":
315 logging.debug(
316 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
317 subarea_extent[0],
318 subarea_extent[1],
319 subarea_extent[2],
320 subarea_extent[3],
321 )
322 lat_points = np.sort(cube.coord(lat_name).points)
323 lon_points = np.sort(cube.coord(lon_name).points)
324 # Define cutout region using user provided cell points.
325 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
326 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
328 # Compute cutout based on specified coordinate values.
329 elif subarea_type == "realworld" or subarea_type == "modelrelative":
330 # If not gridcells, cutout by requested geographic area,
331 logging.debug(
332 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
333 subarea_extent[0],
334 subarea_extent[1],
335 subarea_extent[2],
336 subarea_extent[3],
337 )
338 # Define cutout region using user provided coordinates.
339 lats = np.array(subarea_extent[0:2])
340 lons = np.array(subarea_extent[2:4])
341 # Ensure cutout longitudes are within +/- 180.0 bounds.
342 while lons[0] < -180.0:
343 lons += 360.0
344 while lons[1] > 180.0:
345 lons -= 360.0
346 # If the coordinate system is rotated we convert coordinates into
347 # model-relative coordinates to extract the appropriate cutout.
348 coord_system = cube.coord(lat_name).coord_system
349 if subarea_type == "realworld" and isinstance(
350 coord_system, iris.coord_systems.RotatedGeogCS
351 ):
352 lons, lats = rotate_pole(
353 lons,
354 lats,
355 pole_lon=coord_system.grid_north_pole_longitude,
356 pole_lat=coord_system.grid_north_pole_latitude,
357 )
358 else:
359 raise ValueError("Unknown subarea_type:", subarea_type)
361 # Do cutout and add to cutout_cubes.
362 intersection_args = {lat_name: lats, lon_name: lons}
363 logging.debug("Cutting out coords: %s", intersection_args)
364 try:
365 cutout_cubes.append(cube.intersection(**intersection_args))
366 except IndexError as err:
367 raise ValueError(
368 "Region cutout error. Check and update SUBAREA_EXTENT."
369 "Cutout region requested should be contained within data area. "
370 "Also check if cutout region requested is smaller than input grid spacing."
371 ) from err
373 return cutout_cubes
376def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
377 """Compose together the needed callbacks into a single function."""
378 # Most callbacks operate in-place, but save the cube when returned!
379 _realization_callback(cube)
380 _um_normalise_callback(cube)
381 _lfric_normalise_callback(cube)
382 cube = _lfric_time_coord_fix_callback(cube)
383 _normalise_var0_varname(cube)
384 cube = _fix_no_spatial_coords_callback(cube)
385 _fix_spatial_coords_callback(cube)
386 _fix_pressure_coord_callback(cube)
387 _fix_um_radtime(cube)
388 _fix_cell_methods(cube)
389 cube = _convert_cube_units_callback(cube)
390 cube = _grid_longitude_fix_callback(cube)
391 _fix_lfric_cloud_base_altitude(cube)
392 _proleptic_gregorian_fix(cube)
393 _lfric_time_callback(cube)
394 _lfric_forecast_period_callback(cube)
395 _normalise_ML_varname(cube)
396 return cube
399def _realization_callback(cube):
400 """Add a realization coordinate, and initialise to 0, in all cubes if not present.
402 This means deterministic and ensemble cubes can assume realization coordinate through the rest
403 of the code.
404 """
405 # Only add if realization coordinate does not exist.
406 if not cube.coords("realization"):
407 cube.add_aux_coord(
408 iris.coords.DimCoord(0, standard_name="realization", units="1")
409 )
412@functools.lru_cache(None)
413def _log_once(msg, level=logging.WARNING):
414 """Print a warning message, skipping recent duplicates."""
415 logging.log(level, msg)
418def _um_normalise_callback(cube: iris.cube.Cube):
419 """Normalise UM STASH variable long names to LFRic variable names.
421 Note standard names will remain associated with cubes where different.
422 Long name will be used consistently in output filename and titles.
423 """
424 # Convert STASH to LFRic variable name
425 if "STASH" in cube.attributes:
426 stash = cube.attributes["STASH"]
427 try:
428 (name, grid) = STASH_TO_LFRIC[str(stash)]
429 cube.long_name = name
430 except KeyError:
431 # Don't change cubes with unknown stash codes.
432 _log_once(
433 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.",
434 level=logging.WARNING,
435 )
438def _lfric_normalise_callback(cube: iris.cube.Cube):
439 """Normalise attributes that prevents LFRic cube from merging.
441 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
442 no relation to the data contained. These attributes are removed.
444 The um_stash_source is a list of STASH codes for when an LFRic field maps to
445 multiple UM fields, however it can be encoded in any order. This attribute
446 is sorted to prevent this. This attribute is only present in LFRic data that
447 has been converted to look like UM data.
448 """
449 # Remove unwanted attributes.
450 cube.attributes.pop("timeStamp", None)
451 cube.attributes.pop("uuid", None)
452 cube.attributes.pop("name", None)
453 cube.attributes.pop("source", None)
454 cube.attributes.pop("analysis_source", None)
455 cube.attributes.pop("history", None)
457 # Sort STASH code list.
458 stash_list = cube.attributes.get("um_stash_source")
459 if stash_list:
460 # Parse the string as a list, sort, then re-encode as a string.
461 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
464def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
465 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
467 The coordinate is converted and replaced if not. SLAMed LFRic data has this
468 issue, though the coordinate satisfies all the properties for a DimCoord.
469 Scalar time values are left as AuxCoords.
470 """
471 # This issue seems to come from iris's handling of NetCDF files where time
472 # always ends up as an AuxCoord.
473 if cube.coords("time"):
474 time_coord = cube.coord("time")
475 if (
476 not isinstance(time_coord, iris.coords.DimCoord)
477 and len(cube.coord_dims(time_coord)) == 1
478 ):
479 # Fudge the bounds to foil checking for strict monotonicity.
480 if time_coord.has_bounds(): 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
482 time_coord.bounds = [
483 [
484 time_coord.bounds[i][0] + 1.0e-8 * float(i),
485 time_coord.bounds[i][1],
486 ]
487 for i in range(len(time_coord.bounds))
488 ]
489 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
490 return cube
493def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
494 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
496 This is necessary if comparing two models with different conventions --
497 for example, models where the prime meridian is defined as 0 deg or
498 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
499 so that it falls in this range. Checks are for near-180 bounds given
500 model data bounds may not extend exactly to 0. or 360.
501 Input cubes on non-rotated grid coordinates are not impacted.
502 """
503 try:
504 y, x = get_cube_yxcoordname(cube)
505 except ValueError:
506 # Don't modify non-spatial cubes.
507 return cube
509 long_coord = cube.coord(x)
510 # Wrap longitudes if rotated pole coordinates
511 coord_system = long_coord.coord_system
512 if x == "grid_longitude" and isinstance(
513 coord_system, iris.coord_systems.RotatedGeogCS
514 ):
515 long_points = long_coord.points.copy()
516 long_centre = np.median(long_points)
517 while long_centre < -175.0:
518 long_centre += 360.0
519 long_points += 360.0
520 while long_centre >= 175.0:
521 long_centre -= 360.0
522 long_points -= 360.0
523 long_coord.points = long_points
525 # Update coord bounds to be consistent with wrapping.
526 if long_coord.has_bounds():
527 long_coord.bounds = None
528 long_coord.guess_bounds()
530 return cube
533def _fix_no_spatial_coords_callback(cube: iris.cube.Cube):
534 import CSET.operators._utils as utils
536 # Don't modify spatial cubes that already have spatial dimensions
537 if utils.is_spatialdim(cube):
538 return cube
540 else:
541 # attempt to get lat/long from cube attributes
542 try:
543 lat_min = cube.attributes.get("geospatial_lat_min")
544 lat_max = cube.attributes.get("geospatial_lat_max")
545 lon_min = cube.attributes.get("geospatial_lon_min")
546 lon_max = cube.attributes.get("geospatial_lon_max")
548 lon_val = (lon_min + lon_max) / 2.0
549 lat_val = (lat_min + lat_max) / 2.0
551 lat_coord = iris.coords.DimCoord(
552 lat_val,
553 standard_name="latitude",
554 units="degrees_north",
555 var_name="latitude",
556 coord_system=iris.coord_systems.GeogCS(6371229.0),
557 circular=True,
558 )
560 lon_coord = iris.coords.DimCoord(
561 lon_val,
562 standard_name="longitude",
563 units="degrees_east",
564 var_name="longitude",
565 coord_system=iris.coord_systems.GeogCS(6371229.0),
566 circular=True,
567 )
569 cube.add_aux_coord(lat_coord)
570 cube.add_aux_coord(lon_coord)
571 return cube
573 # if lat/long are not in attributes, then return cube unchanged:
574 except TypeError:
575 return cube
578def _fix_spatial_coords_callback(cube: iris.cube.Cube):
579 """Check latitude and longitude coordinates name.
581 This is necessary as some models define their grid as on rotated
582 'grid_latitude' and 'grid_longitude' coordinates while others define
583 the grid on non-rotated 'latitude' and 'longitude'.
584 Cube dimensions need to be made consistent to avoid recipe failures,
585 particularly where comparing multiple input models with differing spatial
586 coordinates.
587 """
588 # Check if cube is spatial.
589 if not is_spatialdim(cube):
590 # Don't modify non-spatial cubes.
591 return
593 # Get spatial coords and dimension index.
594 y_name, x_name = get_cube_yxcoordname(cube)
595 ny = get_cube_coordindex(cube, y_name)
596 nx = get_cube_coordindex(cube, x_name)
598 # Remove spatial coords bounds if erroneous values detected.
599 # Aims to catch some errors in input coord bounds by setting
600 # invalid threshold of 10000.0
601 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
602 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
603 by_max = np.max(np.abs(cube.coord(y_name).bounds))
604 if bx_max > 10000.0 or by_max > 10000.0:
605 cube.coord(x_name).bounds = None
606 cube.coord(y_name).bounds = None
608 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
609 # [latitude, longitude] for instances where rotated_pole=90.0
610 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
611 coord_system = cube.coord("grid_latitude").coord_system
612 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
613 if pole_lat == 90.0: 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true
614 lats = cube.coord("grid_latitude").points
615 lons = cube.coord("grid_longitude").points
617 cube.remove_coord("grid_latitude")
618 cube.add_dim_coord(
619 iris.coords.DimCoord(
620 lats,
621 standard_name="latitude",
622 var_name="latitude",
623 units="degrees",
624 coord_system=iris.coord_systems.GeogCS(6371229.0),
625 circular=True,
626 ),
627 ny,
628 )
629 y_name = "latitude"
630 cube.remove_coord("grid_longitude")
631 cube.add_dim_coord(
632 iris.coords.DimCoord(
633 lons,
634 standard_name="longitude",
635 var_name="longitude",
636 units="degrees",
637 coord_system=iris.coord_systems.GeogCS(6371229.0),
638 circular=True,
639 ),
640 nx,
641 )
642 x_name = "longitude"
644 # Create additional AuxCoord [grid_latitude, grid_longitude] with
645 # rotated pole attributes for cases with [lat, lon] inputs
646 if y_name in ["latitude"] and cube.coord(y_name).units in [
647 "degrees",
648 "degrees_north",
649 "degrees_south",
650 ]:
651 # Add grid_latitude AuxCoord
652 if "grid_latitude" not in [ 652 ↛ 665line 652 didn't jump to line 665 because the condition on line 652 was always true
653 coord.name() for coord in cube.coords(dim_coords=False)
654 ]:
655 cube.add_aux_coord(
656 iris.coords.AuxCoord(
657 cube.coord(y_name).points,
658 var_name="grid_latitude",
659 units="degrees",
660 ),
661 ny,
662 )
663 # Ensure input latitude DimCoord has CoordSystem
664 # This attribute is sometimes lost on iris.save
665 if not cube.coord(y_name).coord_system:
666 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
668 if x_name in ["longitude"] and cube.coord(x_name).units in [
669 "degrees",
670 "degrees_west",
671 "degrees_east",
672 ]:
673 # Add grid_longitude AuxCoord
674 if "grid_longitude" not in [ 674 ↛ 688line 674 didn't jump to line 688 because the condition on line 674 was always true
675 coord.name() for coord in cube.coords(dim_coords=False)
676 ]:
677 cube.add_aux_coord(
678 iris.coords.AuxCoord(
679 cube.coord(x_name).points,
680 var_name="grid_longitude",
681 units="degrees",
682 ),
683 nx,
684 )
686 # Ensure input longitude DimCoord has CoordSystem
687 # This attribute is sometimes lost on iris.save
688 if not cube.coord(x_name).coord_system:
689 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
692def _fix_pressure_coord_callback(cube: iris.cube.Cube):
693 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
695 This problem was raised because the AIFS model data from ECMWF
696 defines the pressure coordinate with the name "pressure_level" rather
697 than compliant CF coordinate names.
699 Additionally, set the units of pressure to be hPa to be consistent with the UM,
700 and approach the coordinates in a unified way.
701 """
702 for coord in cube.dim_coords:
703 if coord.name() in ["pressure_level", "pressure_levels"]:
704 coord.rename("pressure")
706 if coord.name() == "pressure":
707 if str(cube.coord("pressure").units) != "hPa":
708 cube.coord("pressure").convert_units("hPa")
711def _fix_um_radtime(cube: iris.cube.Cube):
712 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
714 This callback does not have any effect for output diagnostics with
715 timestamps exactly 00 or 30 minutes past the hour. Only radiation
716 diagnostics are checked.
717 Note this callback does not interpolate the data in time, only adjust
718 timestamps to sit on the hour to enable time-to-time difference plotting
719 with models which may output radiation data on the hour.
720 """
721 try:
722 if cube.attributes["STASH"] in [
723 "m01s01i207",
724 "m01s01i208",
725 "m01s02i205",
726 "m01s02i201",
727 "m01s01i207",
728 "m01s02i207",
729 "m01s01i235",
730 ]:
731 time_coord = cube.coord("time")
733 # Convert time points to datetime objects
734 time_unit = time_coord.units
735 time_points = time_unit.num2date(time_coord.points)
736 # Skip if times don't need fixing.
737 if time_points[0].minute == 0 and time_points[0].second == 0:
738 return
739 if time_points[0].minute == 30 and time_points[0].second == 0: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true
740 return
742 # Subtract time difference from the hour from each time point
743 n_minute = time_points[0].minute
744 n_second = time_points[0].second
745 # If times closer to next hour, compute difference to add on to following hour
746 if n_minute > 30:
747 n_minute = n_minute - 60
748 # Compute new diagnostic time stamp
749 new_time_points = (
750 time_points
751 - datetime.timedelta(minutes=n_minute)
752 - datetime.timedelta(seconds=n_second)
753 )
755 # Convert back to numeric values using the original time unit.
756 new_time_values = time_unit.date2num(new_time_points)
758 # Replace the time coordinate with updated values.
759 time_coord.points = new_time_values
761 # Recompute forecast_period with corrected values.
762 if cube.coord("forecast_period"): 762 ↛ exitline 762 didn't return from function '_fix_um_radtime' because the condition on line 762 was always true
763 fcst_prd_points = cube.coord("forecast_period").points
764 new_fcst_points = (
765 time_unit.num2date(fcst_prd_points)
766 - datetime.timedelta(minutes=n_minute)
767 - datetime.timedelta(seconds=n_second)
768 )
769 cube.coord("forecast_period").points = time_unit.date2num(
770 new_fcst_points
771 )
772 except KeyError:
773 pass
776def _fix_cell_methods(cube: iris.cube.Cube):
777 """To fix the assumed cell_methods in accumulation STASH from UM.
779 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
780 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
781 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
782 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
783 "sum". This callback changes "mean" cube attribute cell_method to "sum",
784 enabling the cell_method constraint on reading to select correct input.
785 """
786 # Shift "mean" cell_method to "sum" for selected UM inputs.
787 if cube.attributes.get("STASH") in [
788 "m01s21i104",
789 "m01s04i201",
790 "m01s04i202",
791 "m01s05i201",
792 "m01s05i202",
793 ]:
794 # Check if input cell_method contains "mean" time-processing.
795 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 795 ↛ exitline 795 didn't return from function '_fix_cell_methods' because the condition on line 795 was always true
796 # Retrieve interval and any comment information.
797 for cell_method in cube.cell_methods:
798 interval_str = cell_method.intervals
799 comment_str = cell_method.comments
801 # Remove input aggregation method.
802 cube.cell_methods = ()
804 # Replace "mean" with "sum" cell_method to indicate aggregation.
805 cube.add_cell_method(
806 iris.coords.CellMethod(
807 method="sum",
808 coords="time",
809 intervals=interval_str,
810 comments=comment_str,
811 )
812 )
815def _convert_cube_units_callback(cube: iris.cube.Cube):
816 """Adjust diagnostic units for specific variables.
818 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
819 converted here to mm hr-1.
821 Visibility diagnostics are converted here from m to km to improve output
822 formatting.
823 """
824 # Convert precipitation diagnostic units if required.
825 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
826 if any("surface_microphysical" in name for name in varnames):
827 if cube.units == "kg m-2 s-1":
828 _log_once(
829 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1",
830 level=logging.DEBUG,
831 )
832 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
833 # This is a 1:1 conversion, so we just change the units.
834 cube.units = "mm s-1"
835 # Convert the units to per hour.
836 cube.convert_units("mm hr-1")
837 elif cube.units == "kg m-2": 837 ↛ 847line 837 didn't jump to line 847 because the condition on line 837 was always true
838 _log_once(
839 "Converting precipitation amount units from kg m-2 to mm",
840 level=logging.DEBUG,
841 )
842 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
843 # This is a 1:1 conversion, so we just change the units.
844 cube.units = "mm"
846 # Convert visibility diagnostic units if required.
847 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
848 if any("visibility" in name for name in varnames):
849 if cube.units == "m": 849 ↛ 854line 849 didn't jump to line 854 because the condition on line 849 was always true
850 _log_once("Converting visibility units m to km.", level=logging.DEBUG)
851 # Convert the units to km.
852 cube.convert_units("km")
854 return cube
857def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
858 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
859 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
860 if any("cloud_base_altitude" in name for name in varnames):
861 # Mask cube where set > 144kft to catch default 144.35695538058164
862 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0)
865def _fix_um_winds(cubes: iris.cube.CubeList):
866 """To make winds from the UM consistent with those from LFRic.
868 Diagnostics of wind are not always consistent between the UM
869 and LFric. Here, winds from the UM are adjusted to make them i
870 consistent with LFRic.
871 """
872 # Check whether we have components of the wind identified by STASH,
873 # (so this will apply only to cubes from the UM), but not the
874 # wind speed and calculate it if it is missing. Note that
875 # this will be biased low in general because the components will mostly
876 # be time averages. For simplicity, we do this only if there is just one
877 # cube of a component. A more complicated approach would be to consider
878 # the cell methods, but it may not be warranted.
879 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
880 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
881 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
882 try:
883 if cubes.extract(u_constr) and cubes.extract(v_constr): 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true
884 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
885 _add_wind_speed_um(cubes)
886 # Convert winds in the UM to be relative to true east and true north.
887 _convert_wind_true_dirn_um(cubes)
888 except (KeyError, AttributeError):
889 pass
892def _add_wind_speed_um(cubes: iris.cube.CubeList):
893 """Add windspeeds to cubes from the UM."""
894 wspd10 = (
895 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
896 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
897 ) ** 0.5
898 wspd10.attributes["STASH"] = "m01s03i227"
899 wspd10.standard_name = "wind_speed"
900 wspd10.long_name = "wind_speed_at_10m"
901 cubes.append(wspd10)
904def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
905 """To convert winds to true directions.
907 Convert from the components relative to the grid to true directions.
908 This functionality only handles the simplest case.
909 """
910 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
911 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
912 for u, v in zip(u_grids, v_grids, strict=True):
913 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
914 u.data = true_u.core_data()
915 v.data = true_v.core_data()
918def _normalise_var0_varname(cube: iris.cube.Cube):
919 """Fix varnames for consistency to allow merging.
921 Some model data netCDF sometimes have a coordinate name end in
922 "_0" etc, where duplicate coordinates of same name are defined but
923 with different attributes. This can be inconsistently managed in
924 different model inputs and can cause cubes to fail to merge.
925 """
926 for coord in cube.coords():
927 if coord.var_name and coord.var_name.endswith("_0"):
928 coord.var_name = coord.var_name.removesuffix("_0")
929 if coord.var_name and coord.var_name.endswith("_1"):
930 coord.var_name = coord.var_name.removesuffix("_1")
931 if coord.var_name and coord.var_name.endswith("_2"): 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true
932 coord.var_name = coord.var_name.removesuffix("_2")
933 if coord.var_name and coord.var_name.endswith("_3"): 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 coord.var_name = coord.var_name.removesuffix("_3")
936 if cube.var_name and cube.var_name.endswith("_0"):
937 cube.var_name = cube.var_name.removesuffix("_0")
940def _proleptic_gregorian_fix(cube: iris.cube.Cube):
941 """Convert the calendars of time units to use a standard calendar."""
942 try:
943 time_coord = cube.coord("time")
944 if time_coord.units.calendar == "proleptic_gregorian":
945 logging.debug(
946 "Changing proleptic Gregorian calendar to standard calendar for %s",
947 repr(time_coord.units),
948 )
949 time_coord.units = time_coord.units.change_calendar("standard")
950 except iris.exceptions.CoordinateNotFoundError:
951 pass
954def _lfric_time_callback(cube: iris.cube.Cube):
955 """Fix time coordinate metadata if missing dimensions.
957 Some model data does not contain forecast_reference_time or forecast_period as
958 expected coordinates, and so we cannot aggregate over case studies without this
959 metadata. This callback fixes these issues.
961 This callback also ensures all time coordinates are referenced as hours since
962 1970-01-01 00:00:00 for consistency across different model inputs.
964 Notes
965 -----
966 Some parts of the code have been adapted from Paul Earnshaw's scripts.
967 """
968 # Construct forecast_reference time if it doesn't exist.
969 try:
970 tcoord = cube.coord("time")
971 # Set time coordinate to common basis "hours since 1970"
972 try:
973 tcoord.convert_units("hours since 1970-01-01 00:00:00")
974 except ValueError:
975 logging.warning("Unrecognised base time unit: %s", tcoord.units)
977 if not cube.coords("forecast_reference_time"):
978 try:
979 init_time = datetime.datetime.fromisoformat(
980 tcoord.attributes["time_origin"]
981 )
982 frt_point = tcoord.units.date2num(init_time)
983 frt_coord = iris.coords.AuxCoord(
984 frt_point,
985 units=tcoord.units,
986 standard_name="forecast_reference_time",
987 long_name="forecast_reference_time",
988 )
989 cube.add_aux_coord(frt_coord)
990 except KeyError:
991 logging.warning(
992 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
993 )
995 # Remove time_origin to allow multiple case studies to merge.
996 tcoord.attributes.pop("time_origin", None)
998 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
999 if not cube.coords("forecast_period"):
1000 try:
1001 # Create array of forecast lead times.
1002 init_coord = cube.coord("forecast_reference_time")
1003 init_time_points_in_tcoord_units = tcoord.units.date2num(
1004 init_coord.units.num2date(init_coord.points)
1005 )
1006 lead_times = tcoord.points - init_time_points_in_tcoord_units
1008 # Get unit for lead time from time coordinate's unit.
1009 # Convert all lead time to hours for consistency between models.
1010 if "seconds" in str(tcoord.units): 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 lead_times = lead_times / 3600.0
1012 units = "hours"
1013 elif "hours" in str(tcoord.units): 1013 ↛ 1016line 1013 didn't jump to line 1016 because the condition on line 1013 was always true
1014 units = "hours"
1015 else:
1016 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
1018 # Create lead time coordinate.
1019 lead_time_coord = iris.coords.AuxCoord(
1020 lead_times,
1021 standard_name="forecast_period",
1022 long_name="forecast_period",
1023 units=units,
1024 )
1026 # Associate lead time coordinate with time dimension.
1027 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1028 except iris.exceptions.CoordinateNotFoundError:
1029 logging.warning(
1030 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1031 )
1032 except iris.exceptions.CoordinateNotFoundError:
1033 logging.warning("No time coordinate on cube.")
1036def _lfric_forecast_period_callback(cube: iris.cube.Cube):
1037 """Check forecast_period name and units."""
1038 try:
1039 coord = cube.coord("forecast_period")
1040 if coord.units != "hours":
1041 cube.coord("forecast_period").convert_units("hours")
1042 if not coord.standard_name:
1043 coord.standard_name = "forecast_period"
1044 except iris.exceptions.CoordinateNotFoundError:
1045 pass
1048def _normalise_ML_varname(cube: iris.cube.Cube):
1049 """Fix plev variable names to standard names."""
1050 if cube.coords("pressure"):
1051 if cube.name() == "x_wind":
1052 cube.long_name = "zonal_wind_at_pressure_levels"
1053 if cube.name() == "y_wind":
1054 cube.long_name = "meridional_wind_at_pressure_levels"
1055 if cube.name() == "air_temperature":
1056 cube.long_name = "temperature_at_pressure_levels"
1057 if cube.name() == "specific_humidity": 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true
1058 cube.long_name = (
1059 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging"
1060 )