Coverage for src/CSET/operators/read.py: 91%
407 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 12:45 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 12:45 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import dask
27import iris
28import iris.coord_systems
29import iris.coords
30import iris.cube
31import iris.exceptions
32import iris.util
33import numpy as np
34from iris.analysis.cartography import rotate_pole, rotate_winds
36from CSET._common import iter_maybe
37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
38from CSET.operators._utils import (
39 get_cube_coordindex,
40 get_cube_yxcoordname,
41 is_spatialdim,
42)
43from CSET.operators.regrid import restructure_ugrid
46class NoDataError(FileNotFoundError):
47 """Error that no data has been loaded."""
50def read_cube(
51 file_paths: list[str] | str,
52 constraint: iris.Constraint = None,
53 model_names: list[str] | str | None = None,
54 subarea_type: str = None,
55 subarea_extent: list[float] = None,
56 **kwargs,
57) -> iris.cube.Cube:
58 """Read a single cube from files.
60 Read operator that takes a path string (can include shell-style glob
61 patterns), and loads the cube matching the constraint. If any paths point to
62 directory, all the files contained within are loaded.
64 Ensemble data can also be loaded. If it has a realization coordinate
65 already, it will be directly used. If not, it will have its member number
66 guessed from the filename, based on one of several common patterns. For
67 example the pattern *emXX*, where XX is the realization.
69 Deterministic data will be loaded with a realization of 0, allowing it to be
70 processed in the same way as ensemble data.
72 Arguments
73 ---------
74 file_paths: str | list[str]
75 Path or paths to where .pp/.nc files are located
76 constraint: iris.Constraint | iris.ConstraintCombination, optional
77 Constraints to filter data by. Defaults to unconstrained.
78 model_names: str | list[str], optional
79 Names of the models that correspond to respective paths in file_paths.
80 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
81 Whether to constrain data by model relative coordinates or real world
82 coordinates.
83 subarea_extent: list, optional
84 List of coordinates to constraint data by, in order lower latitude,
85 upper latitude, lower longitude, upper longitude.
87 Returns
88 -------
89 cubes: iris.cube.Cube
90 Cube loaded
92 Raises
93 ------
94 FileNotFoundError
95 If the provided path does not exist
96 ValueError
97 If the constraint doesn't produce a single cube.
98 """
99 cubes = read_cubes(
100 file_paths=file_paths,
101 constraint=constraint,
102 model_names=model_names,
103 subarea_type=subarea_type,
104 subarea_extent=subarea_extent,
105 )
106 # Check filtered cubes is a CubeList containing one cube.
107 if len(cubes) == 1:
108 return cubes[0]
109 else:
110 raise ValueError(
111 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
112 )
115def read_cubes(
116 file_paths: list[str] | str,
117 constraint: iris.Constraint | None = None,
118 model_names: str | list[str] | None = None,
119 subarea_type: str = None,
120 subarea_extent: list = None,
121 **kwargs,
122) -> iris.cube.CubeList:
123 """Read cubes from files.
125 Read operator that takes a path string (can include shell-style glob
126 patterns), and loads the cubes matching the constraint. If any paths point
127 to directory, all the files contained within are loaded.
129 Ensemble data can also be loaded. If it has a realization coordinate
130 already, it will be directly used. If not, it will have its member number
131 guessed from the filename, based on one of several common patterns. For
132 example the pattern *emXX*, where XX is the realization.
134 Deterministic data will be loaded with a realization of 0, allowing it to be
135 processed in the same way as ensemble data.
137 Data output by XIOS (such as LFRic) has its per-file metadata removed so
138 that the cubes merge across files.
140 Arguments
141 ---------
142 file_paths: str | list[str]
143 Path or paths to where .pp/.nc files are located. Can include globs.
144 constraint: iris.Constraint | iris.ConstraintCombination, optional
145 Constraints to filter data by. Defaults to unconstrained.
146 model_names: str | list[str], optional
147 Names of the models that correspond to respective paths in file_paths.
148 subarea_type: str, optional
149 Whether to constrain data by model relative coordinates or real world
150 coordinates.
151 subarea_extent: list[float], optional
152 List of coordinates to constraint data by, in order lower latitude,
153 upper latitude, lower longitude, upper longitude.
155 Returns
156 -------
157 cubes: iris.cube.CubeList
158 Cubes loaded after being merged and concatenated.
160 Raises
161 ------
162 FileNotFoundError
163 If the provided path does not exist
164 """
165 # Get iterable of paths. Each path corresponds to 1 model.
166 paths = iter_maybe(file_paths)
167 model_names = iter_maybe(model_names)
169 # Check we have appropriate number of model names.
170 if model_names != (None,) and len(model_names) != len(paths):
171 raise ValueError(
172 f"The number of model names ({len(model_names)}) should equal "
173 f"the number of paths given ({len(paths)})."
174 )
176 # Load the data for each model into a CubeList per model.
177 model_cubes = (
178 _load_model(path, name, constraint)
179 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
180 )
182 # Split out first model's cubes and mark it as the base for comparisons.
183 cubes = next(model_cubes)
184 for cube in cubes:
185 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
186 cube.attributes["cset_comparison_base"] = 1
188 # Load the rest of the models.
189 cubes.extend(itertools.chain.from_iterable(model_cubes))
191 # Unify time units so different case studies can merge.
192 iris.util.unify_time_units(cubes)
194 # Select sub region.
195 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
197 # Merge and concatenate cubes now metadata has been fixed.
198 cubes = _merge_cubes_check_ensemble(cubes)
199 cubes = cubes.concatenate()
201 # Squeeze single valued coordinates into scalar coordinates.
202 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
204 # Ensure dimension coordinates are bounded.
205 for cube in cubes:
206 for dim_coord in cube.coords(dim_coords=True):
207 # Iris can't guess the bounds of a scalar coordinate.
208 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
209 dim_coord.guess_bounds()
211 logging.info("Loaded cubes: %s", cubes)
212 if len(cubes) == 0:
213 raise NoDataError("No cubes loaded, check your constraints!")
214 return cubes
217def _load_model(
218 paths: str | list[str],
219 model_name: str | None,
220 constraint: iris.Constraint | None,
221) -> iris.cube.CubeList:
222 """Load a single model's data into a CubeList."""
223 input_files = _check_input_files(paths)
224 # If unset, a constraint of None lets everything be loaded.
225 logging.debug("Constraint: %s", constraint)
227 cubes = iris.load(input_files)
229 # If a cube called latitude exists, chances are its unstructured.
230 # Using extract, not extract_cubes in a try as there might be more
231 # than one cube called latitude if we are aggregating.
232 if len(cubes.extract("latitude")) > 0: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 cubes = restructure_ugrid(cubes)
235 for cube in cubes:
236 _loading_callback(cube, None, None)
238 # Extract required cubes based on constraints
239 cubes = cubes.extract(constraint)
241 # cubes = iris.load(input_files, constraint, callback=_loading_callback)
242 # Make the UM's winds consistent with LFRic.
243 _fix_um_winds(cubes)
245 # Add model_name attribute to each cube to make it available at any further
246 # step without needing to pass it as function parameter.
247 if model_name is not None:
248 for cube in cubes:
249 cube.attributes["model_name"] = model_name
250 return cubes
253def _check_input_files(input_paths: str | list[str]) -> list[Path]:
254 """Get an iterable of files to load, and check that they all exist.
256 Arguments
257 ---------
258 input_paths: list[str]
259 List of paths to input files or directories. The path may itself contain
260 glob patterns, but unlike in shells it will match directly first.
262 Returns
263 -------
264 list[Path]
265 A list of files to load.
267 Raises
268 ------
269 FileNotFoundError:
270 If the provided arguments don't resolve to at least one existing file.
271 """
272 files = []
273 for raw_filename in iter_maybe(input_paths):
274 # Match glob-like files first, if they exist.
275 raw_path = Path(raw_filename)
276 if raw_path.is_file():
277 files.append(raw_path)
278 else:
279 for input_path in glob.glob(raw_filename):
280 # Convert string paths into Path objects.
281 input_path = Path(input_path)
282 # Get the list of files in the directory, or use it directly.
283 if input_path.is_dir():
284 logging.debug("Checking directory '%s' for files", input_path)
285 files.extend(p for p in input_path.iterdir() if p.is_file())
286 else:
287 files.append(input_path)
289 files.sort()
290 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
291 if len(files) == 0:
292 raise FileNotFoundError(f"No files found for {input_paths}")
293 return files
296def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList):
297 """Merge CubeList, renumbering realizations of 0 if required.
299 An unsuccessful merge indicates common input cube attributes, most
300 commonly from ensemble members missing an explicit realization
301 coordinate. Therefore the members are renumbered before being merged
302 again.
303 """
304 try:
305 cubes = cubes.merge()
306 except iris.exceptions.MergeError:
307 _log_once(
308 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.",
309 level=logging.WARNING,
310 )
311 for ir, cube in enumerate(cubes):
312 if cube.coord("realization").points == 0: 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true
313 cube.coord("realization").points = ir + 1
314 cubes = cubes.merge()
315 return cubes
318def _cutout_cubes(
319 cubes: iris.cube.CubeList,
320 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
321 subarea_extent: list[float],
322):
323 """Cut out a subarea from a CubeList."""
324 if subarea_type is None:
325 logging.debug("Subarea selection is disabled.")
326 return cubes
328 # If selected, cutout according to number of grid cells to trim from each edge.
329 cutout_cubes = iris.cube.CubeList()
330 # Find spatial coordinates
331 for cube in cubes:
332 # Find dimension coordinates.
333 lat_name, lon_name = get_cube_yxcoordname(cube)
335 # Compute cutout based on number of cells to trim from edges.
336 if subarea_type == "gridcells":
337 logging.debug(
338 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
339 subarea_extent[0],
340 subarea_extent[1],
341 subarea_extent[2],
342 subarea_extent[3],
343 )
344 lat_points = np.sort(cube.coord(lat_name).points)
345 lon_points = np.sort(cube.coord(lon_name).points)
346 # Define cutout region using user provided cell points.
347 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
348 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
350 # Compute cutout based on specified coordinate values.
351 elif subarea_type == "realworld" or subarea_type == "modelrelative":
352 # If not gridcells, cutout by requested geographic area,
353 logging.debug(
354 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
355 subarea_extent[0],
356 subarea_extent[1],
357 subarea_extent[2],
358 subarea_extent[3],
359 )
360 # Define cutout region using user provided coordinates.
361 lats = np.array(subarea_extent[0:2])
362 lons = np.array(subarea_extent[2:4])
363 # Ensure cutout longitudes are within +/- 180.0 bounds.
364 while lons[0] < -180.0:
365 lons += 360.0
366 while lons[1] > 180.0:
367 lons -= 360.0
368 # If the coordinate system is rotated we convert coordinates into
369 # model-relative coordinates to extract the appropriate cutout.
370 coord_system = cube.coord(lat_name).coord_system
371 if subarea_type == "realworld" and isinstance(
372 coord_system, iris.coord_systems.RotatedGeogCS
373 ):
374 lons, lats = rotate_pole(
375 lons,
376 lats,
377 pole_lon=coord_system.grid_north_pole_longitude,
378 pole_lat=coord_system.grid_north_pole_latitude,
379 )
380 else:
381 raise ValueError("Unknown subarea_type:", subarea_type)
383 # Do cutout and add to cutout_cubes.
384 intersection_args = {lat_name: lats, lon_name: lons}
385 logging.debug("Cutting out coords: %s", intersection_args)
386 try:
387 cutout_cubes.append(cube.intersection(**intersection_args))
388 except IndexError as err:
389 raise ValueError(
390 "Region cutout error. Check and update SUBAREA_EXTENT."
391 "Cutout region requested should be contained within data area. "
392 "Also check if cutout region requested is smaller than input grid spacing."
393 ) from err
395 return cutout_cubes
398def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
399 """Compose together the needed callbacks into a single function."""
400 # Most callbacks operate in-place, but save the cube when returned!
401 _realization_callback(cube)
402 _um_normalise_callback(cube)
403 _lfric_normalise_callback(cube)
404 cube = _lfric_time_coord_fix_callback(cube)
405 _normalise_var0_varname(cube)
406 cube = _fix_no_spatial_coords_callback(cube)
407 _fix_spatial_coords_callback(cube)
408 _fix_pressure_coord_callback(cube)
409 _fix_um_radtime(cube)
410 _fix_cell_methods(cube)
411 cube = _convert_cube_units_callback(cube)
412 cube = _grid_longitude_fix_callback(cube)
413 _fix_lfric_cloud_base_altitude(cube)
414 _proleptic_gregorian_fix(cube)
415 _lfric_time_callback(cube)
416 _lfric_forecast_period_callback(cube)
417 cube = _fix_no_time_coords_callback(cube)
418 _normalise_ML_varname(cube)
419 return cube
422def _realization_callback(cube):
423 """Add a realization coordinate initialised to 0 if missing.
425 This means deterministic and ensemble cubes can assume realization coordinate through the rest
426 of the code.
427 """
428 # Only add if realization coordinate does not exist.
429 if not cube.coords("realization"):
430 cube.add_aux_coord(
431 iris.coords.DimCoord(0, standard_name="realization", units="1")
432 )
435@functools.lru_cache(None)
436def _log_once(msg, level=logging.WARNING):
437 """Print a warning message, skipping recent duplicates."""
438 logging.log(level, msg)
441def _um_normalise_callback(cube: iris.cube.Cube):
442 """Normalise UM STASH variable long names to LFRic variable names.
444 Note standard names will remain associated with cubes where different.
445 Long name will be used consistently in output filename and titles.
446 """
447 # Convert STASH to LFRic variable name
448 if "STASH" in cube.attributes:
449 stash = cube.attributes["STASH"]
450 try:
451 (name, grid) = STASH_TO_LFRIC[str(stash)]
452 cube.long_name = name
453 except KeyError:
454 # Don't change cubes with unknown stash codes.
455 _log_once(
456 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.",
457 level=logging.WARNING,
458 )
461def _lfric_normalise_callback(cube: iris.cube.Cube):
462 """Normalise attributes that prevents LFRic cube from merging.
464 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
465 no relation to the data contained. These attributes are removed.
467 The um_stash_source is a list of STASH codes for when an LFRic field maps to
468 multiple UM fields, however it can be encoded in any order. This attribute
469 is sorted to prevent this. This attribute is only present in LFRic data that
470 has been converted to look like UM data.
471 """
472 # Remove unwanted attributes.
473 cube.attributes.pop("timeStamp", None)
474 cube.attributes.pop("uuid", None)
475 cube.attributes.pop("name", None)
476 cube.attributes.pop("source", None)
477 cube.attributes.pop("analysis_source", None)
478 cube.attributes.pop("history", None)
480 # Sort STASH code list.
481 stash_list = cube.attributes.get("um_stash_source")
482 if stash_list:
483 # Parse the string as a list, sort, then re-encode as a string.
484 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
487def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
488 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
490 The coordinate is converted and replaced if not. SLAMed LFRic data has this
491 issue, though the coordinate satisfies all the properties for a DimCoord.
492 Scalar time values are left as AuxCoords.
493 """
494 # This issue seems to come from iris's handling of NetCDF files where time
495 # always ends up as an AuxCoord.
496 if cube.coords("time"):
497 time_coord = cube.coord("time")
498 if (
499 not isinstance(time_coord, iris.coords.DimCoord)
500 and len(cube.coord_dims(time_coord)) == 1
501 ):
502 # Fudge the bounds to foil checking for strict monotonicity.
503 if time_coord.has_bounds(): 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
505 time_coord.bounds = [
506 [
507 time_coord.bounds[i][0] + 1.0e-8 * float(i),
508 time_coord.bounds[i][1],
509 ]
510 for i in range(len(time_coord.bounds))
511 ]
512 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
513 return cube
516def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
517 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
519 This is necessary if comparing two models with different conventions --
520 for example, models where the prime meridian is defined as 0 deg or
521 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
522 so that it falls in this range. Checks are for near-180 bounds given
523 model data bounds may not extend exactly to 0. or 360.
524 Input cubes on non-rotated grid coordinates are not impacted.
525 """
526 try:
527 y, x = get_cube_yxcoordname(cube)
528 except ValueError:
529 # Don't modify non-spatial cubes.
530 return cube
532 long_coord = cube.coord(x)
533 # Wrap longitudes if rotated pole coordinates
534 coord_system = long_coord.coord_system
535 if x == "grid_longitude" and isinstance(
536 coord_system, iris.coord_systems.RotatedGeogCS
537 ):
538 long_points = long_coord.points.copy()
539 long_centre = np.median(long_points)
540 while long_centre < -175.0:
541 long_centre += 360.0
542 long_points += 360.0
543 while long_centre >= 175.0:
544 long_centre -= 360.0
545 long_points -= 360.0
546 long_coord.points = long_points
548 # Update coord bounds to be consistent with wrapping.
549 if long_coord.has_bounds():
550 long_coord.bounds = None
551 long_coord.guess_bounds()
553 return cube
556def _fix_no_spatial_coords_callback(cube: iris.cube.Cube):
557 import CSET.operators._utils as utils
559 # Don't modify spatial cubes that already have spatial dimensions
560 if utils.is_spatialdim(cube):
561 return cube
563 else:
564 # attempt to get lat/long from cube attributes
565 try:
566 lat_min = cube.attributes.get("geospatial_lat_min")
567 lat_max = cube.attributes.get("geospatial_lat_max")
568 lon_min = cube.attributes.get("geospatial_lon_min")
569 lon_max = cube.attributes.get("geospatial_lon_max")
571 lon_val = (lon_min + lon_max) / 2.0
572 lat_val = (lat_min + lat_max) / 2.0
574 lat_coord = iris.coords.DimCoord(
575 lat_val,
576 standard_name="latitude",
577 units="degrees_north",
578 var_name="latitude",
579 coord_system=iris.coord_systems.GeogCS(6371229.0),
580 circular=True,
581 )
583 lon_coord = iris.coords.DimCoord(
584 lon_val,
585 standard_name="longitude",
586 units="degrees_east",
587 var_name="longitude",
588 coord_system=iris.coord_systems.GeogCS(6371229.0),
589 circular=True,
590 )
592 cube.add_aux_coord(lat_coord)
593 cube.add_aux_coord(lon_coord)
594 return cube
596 # if lat/long are not in attributes, then return cube unchanged:
597 except TypeError:
598 return cube
601def _fix_spatial_coords_callback(cube: iris.cube.Cube):
602 """Check latitude and longitude coordinates name.
604 This is necessary as some models define their grid as on rotated
605 'grid_latitude' and 'grid_longitude' coordinates while others define
606 the grid on non-rotated 'latitude' and 'longitude'.
607 Cube dimensions need to be made consistent to avoid recipe failures,
608 particularly where comparing multiple input models with differing spatial
609 coordinates.
610 """
611 # Check if cube is spatial.
612 if not is_spatialdim(cube):
613 # Don't modify non-spatial cubes.
614 return
616 # Get spatial coords and dimension index.
617 y_name, x_name = get_cube_yxcoordname(cube)
618 ny = get_cube_coordindex(cube, y_name)
619 nx = get_cube_coordindex(cube, x_name)
621 # Remove spatial coords bounds if erroneous values detected.
622 # Aims to catch some errors in input coord bounds by setting
623 # invalid threshold of 10000.0
624 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
625 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
626 by_max = np.max(np.abs(cube.coord(y_name).bounds))
627 if bx_max > 10000.0 or by_max > 10000.0:
628 cube.coord(x_name).bounds = None
629 cube.coord(y_name).bounds = None
631 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
632 # [latitude, longitude] for instances where rotated_pole=90.0
633 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
634 coord_system = cube.coord("grid_latitude").coord_system
635 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
636 if pole_lat == 90.0: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true
637 lats = cube.coord("grid_latitude").points
638 lons = cube.coord("grid_longitude").points
640 cube.remove_coord("grid_latitude")
641 cube.add_dim_coord(
642 iris.coords.DimCoord(
643 lats,
644 standard_name="latitude",
645 var_name="latitude",
646 units="degrees",
647 coord_system=iris.coord_systems.GeogCS(6371229.0),
648 circular=True,
649 ),
650 ny,
651 )
652 y_name = "latitude"
653 cube.remove_coord("grid_longitude")
654 cube.add_dim_coord(
655 iris.coords.DimCoord(
656 lons,
657 standard_name="longitude",
658 var_name="longitude",
659 units="degrees",
660 coord_system=iris.coord_systems.GeogCS(6371229.0),
661 circular=True,
662 ),
663 nx,
664 )
665 x_name = "longitude"
667 # Create additional AuxCoord [grid_latitude, grid_longitude] with
668 # rotated pole attributes for cases with [lat, lon] inputs
669 if y_name in ["latitude"] and cube.coord(y_name).units in [
670 "degrees",
671 "degrees_north",
672 "degrees_south",
673 ]:
674 # Add grid_latitude AuxCoord
675 if "grid_latitude" not in [
676 coord.name() for coord in cube.coords(dim_coords=False)
677 ]:
678 cube.add_aux_coord(
679 iris.coords.AuxCoord(
680 cube.coord(y_name).points,
681 var_name="grid_latitude",
682 units="degrees",
683 ),
684 ny,
685 )
686 # Ensure input latitude DimCoord has CoordSystem
687 # This attribute is sometimes lost on iris.save
688 if not cube.coord(y_name).coord_system:
689 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
691 if x_name in ["longitude"] and cube.coord(x_name).units in [
692 "degrees",
693 "degrees_west",
694 "degrees_east",
695 ]:
696 # Add grid_longitude AuxCoord
697 if "grid_longitude" not in [
698 coord.name() for coord in cube.coords(dim_coords=False)
699 ]:
700 cube.add_aux_coord(
701 iris.coords.AuxCoord(
702 cube.coord(x_name).points,
703 var_name="grid_longitude",
704 units="degrees",
705 ),
706 nx,
707 )
709 # Ensure input longitude DimCoord has CoordSystem
710 # This attribute is sometimes lost on iris.save
711 if not cube.coord(x_name).coord_system:
712 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
715def _fix_pressure_coord_callback(cube: iris.cube.Cube):
716 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
718 This problem was raised because the AIFS model data from ECMWF
719 defines the pressure coordinate with the name "pressure_level" rather
720 than compliant CF coordinate names.
722 Additionally, set the units of pressure to be hPa to be consistent with the UM,
723 and approach the coordinates in a unified way.
724 """
725 for coord in cube.dim_coords:
726 if coord.name() in ["pressure_level", "pressure_levels"]:
727 coord.rename("pressure")
729 if coord.name() == "pressure":
730 if str(cube.coord("pressure").units) != "hPa":
731 cube.coord("pressure").convert_units("hPa")
734def _fix_um_radtime(cube: iris.cube.Cube):
735 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
737 This callback does not have any effect for output diagnostics with
738 timestamps exactly 00 or 30 minutes past the hour. Only radiation
739 diagnostics are checked.
740 Note this callback does not interpolate the data in time, only adjust
741 timestamps to sit on the hour to enable time-to-time difference plotting
742 with models which may output radiation data on the hour.
743 """
744 try:
745 if cube.attributes["STASH"] in [
746 "m01s01i207",
747 "m01s01i208",
748 "m01s02i205",
749 "m01s02i201",
750 "m01s01i207",
751 "m01s02i207",
752 "m01s01i235",
753 ]:
754 time_coord = cube.coord("time")
756 # Convert time points to datetime objects
757 time_unit = time_coord.units
758 time_points = time_unit.num2date(time_coord.points)
759 # Skip if times don't need fixing.
760 if time_points[0].minute == 0 and time_points[0].second == 0:
761 return
762 if time_points[0].minute == 30 and time_points[0].second == 0: 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true
763 return
765 # Subtract time difference from the hour from each time point
766 n_minute = time_points[0].minute
767 n_second = time_points[0].second
768 # If times closer to next hour, compute difference to add on to following hour
769 if n_minute > 30:
770 n_minute = n_minute - 60
771 # Compute new diagnostic time stamp
772 new_time_points = (
773 time_points
774 - datetime.timedelta(minutes=n_minute)
775 - datetime.timedelta(seconds=n_second)
776 )
778 # Convert back to numeric values using the original time unit.
779 new_time_values = time_unit.date2num(new_time_points)
781 # Replace the time coordinate with updated values.
782 time_coord.points = new_time_values
784 # Recompute forecast_period with corrected values.
785 if cube.coord("forecast_period"): 785 ↛ exitline 785 didn't return from function '_fix_um_radtime' because the condition on line 785 was always true
786 fcst_prd_points = cube.coord("forecast_period").points
787 new_fcst_points = (
788 time_unit.num2date(fcst_prd_points)
789 - datetime.timedelta(minutes=n_minute)
790 - datetime.timedelta(seconds=n_second)
791 )
792 cube.coord("forecast_period").points = time_unit.date2num(
793 new_fcst_points
794 )
795 except KeyError:
796 pass
799def _fix_cell_methods(cube: iris.cube.Cube):
800 """To fix the assumed cell_methods in accumulation STASH from UM.
802 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
803 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
804 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
805 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
806 "sum". This callback changes "mean" cube attribute cell_method to "sum",
807 enabling the cell_method constraint on reading to select correct input.
808 """
809 # Shift "mean" cell_method to "sum" for selected UM inputs.
810 if cube.attributes.get("STASH") in [
811 "m01s21i104",
812 "m01s04i201",
813 "m01s04i202",
814 "m01s05i201",
815 "m01s05i202",
816 ]:
817 # Check if input cell_method contains "mean" time-processing.
818 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 818 ↛ exitline 818 didn't return from function '_fix_cell_methods' because the condition on line 818 was always true
819 # Retrieve interval and any comment information.
820 for cell_method in cube.cell_methods:
821 interval_str = cell_method.intervals
822 comment_str = cell_method.comments
824 # Remove input aggregation method.
825 cube.cell_methods = ()
827 # Replace "mean" with "sum" cell_method to indicate aggregation.
828 cube.add_cell_method(
829 iris.coords.CellMethod(
830 method="sum",
831 coords="time",
832 intervals=interval_str,
833 comments=comment_str,
834 )
835 )
838def _convert_cube_units_callback(cube: iris.cube.Cube):
839 """Adjust diagnostic units for specific variables.
841 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
842 converted here to mm hr-1.
844 Visibility diagnostics are converted here from m to km to improve output
845 formatting.
846 """
847 # Convert precipitation diagnostic units if required.
848 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
849 if any("surface_microphysical" in name for name in varnames):
850 if cube.units == "kg m-2 s-1":
851 _log_once(
852 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1",
853 level=logging.DEBUG,
854 )
855 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
856 # This is a 1:1 conversion, so we just change the units.
857 cube.units = "mm s-1"
858 # Convert the units to per hour.
859 cube.convert_units("mm hr-1")
860 elif cube.units == "kg m-2": 860 ↛ 870line 860 didn't jump to line 870 because the condition on line 860 was always true
861 _log_once(
862 "Converting precipitation amount units from kg m-2 to mm",
863 level=logging.DEBUG,
864 )
865 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
866 # This is a 1:1 conversion, so we just change the units.
867 cube.units = "mm"
869 # Convert visibility diagnostic units if required.
870 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
871 if any("visibility" in name for name in varnames):
872 if cube.units == "m": 872 ↛ 877line 872 didn't jump to line 877 because the condition on line 872 was always true
873 _log_once("Converting visibility units m to km.", level=logging.DEBUG)
874 # Convert the units to km.
875 cube.convert_units("km")
877 return cube
880def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
881 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
882 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
883 if any("cloud_base_altitude" in name for name in varnames):
884 # Mask cube where set > 144kft to catch default 144.35695538058164
885 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0)
888def _fix_um_winds(cubes: iris.cube.CubeList):
889 """To make winds from the UM consistent with those from LFRic.
891 Diagnostics of wind are not always consistent between the UM
892 and LFric. Here, winds from the UM are adjusted to make them i
893 consistent with LFRic.
894 """
895 # Check whether we have components of the wind identified by STASH,
896 # (so this will apply only to cubes from the UM), but not the
897 # wind speed and calculate it if it is missing. Note that
898 # this will be biased low in general because the components will mostly
899 # be time averages. For simplicity, we do this only if there is just one
900 # cube of a component. A more complicated approach would be to consider
901 # the cell methods, but it may not be warranted.
902 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
903 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
904 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
905 try:
906 if cubes.extract(u_constr) and cubes.extract(v_constr): 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true
907 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
908 _add_wind_speed_um(cubes)
909 # Convert winds in the UM to be relative to true east and true north.
910 _convert_wind_true_dirn_um(cubes)
911 except (KeyError, AttributeError):
912 pass
915def _add_wind_speed_um(cubes: iris.cube.CubeList):
916 """Add windspeeds to cubes from the UM."""
917 wspd10 = (
918 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
919 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
920 ) ** 0.5
921 wspd10.attributes["STASH"] = "m01s03i227"
922 wspd10.standard_name = "wind_speed"
923 wspd10.long_name = "wind_speed_at_10m"
924 cubes.append(wspd10)
927def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
928 """To convert winds to true directions.
930 Convert from the components relative to the grid to true directions.
931 This functionality only handles the simplest case.
932 """
933 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
934 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
935 for u, v in zip(u_grids, v_grids, strict=True):
936 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
937 u.data = true_u.core_data()
938 v.data = true_v.core_data()
941def _normalise_var0_varname(cube: iris.cube.Cube):
942 """Fix varnames for consistency to allow merging.
944 Some model data netCDF sometimes have a coordinate name end in
945 "_0" etc, where duplicate coordinates of same name are defined but
946 with different attributes. This can be inconsistently managed in
947 different model inputs and can cause cubes to fail to merge.
948 """
949 for coord in cube.coords():
950 if coord.var_name and coord.var_name.endswith("_0"):
951 coord.var_name = coord.var_name.removesuffix("_0")
952 if coord.var_name and coord.var_name.endswith("_1"):
953 coord.var_name = coord.var_name.removesuffix("_1")
954 if coord.var_name and coord.var_name.endswith("_2"): 954 ↛ 955line 954 didn't jump to line 955 because the condition on line 954 was never true
955 coord.var_name = coord.var_name.removesuffix("_2")
956 if coord.var_name and coord.var_name.endswith("_3"): 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true
957 coord.var_name = coord.var_name.removesuffix("_3")
959 if cube.var_name and cube.var_name.endswith("_0"):
960 cube.var_name = cube.var_name.removesuffix("_0")
963def _proleptic_gregorian_fix(cube: iris.cube.Cube):
964 """Convert the calendars of time units to use a standard calendar."""
965 try:
966 time_coord = cube.coord("time")
967 if time_coord.units.calendar == "proleptic_gregorian":
968 logging.debug(
969 "Changing proleptic Gregorian calendar to standard calendar for %s",
970 repr(time_coord.units),
971 )
972 time_coord.units = time_coord.units.change_calendar("standard")
973 except iris.exceptions.CoordinateNotFoundError:
974 pass
977def _lfric_time_callback(cube: iris.cube.Cube):
978 """Fix time coordinate metadata if missing dimensions.
980 Some model data does not contain forecast_reference_time or forecast_period as
981 expected coordinates, and so we cannot aggregate over case studies without this
982 metadata. This callback fixes these issues.
984 This callback also ensures all time coordinates are referenced as hours since
985 1970-01-01 00:00:00 for consistency across different model inputs.
987 Notes
988 -----
989 Some parts of the code have been adapted from Paul Earnshaw's scripts.
990 """
991 # Construct forecast_reference time if it doesn't exist.
992 try:
993 tcoord = cube.coord("time")
994 # Set time coordinate to common basis "hours since 1970"
995 try:
996 tcoord.convert_units("hours since 1970-01-01 00:00:00")
997 except ValueError:
998 logging.warning("Unrecognised base time unit: %s", tcoord.units)
1000 if not cube.coords("forecast_reference_time"):
1001 try:
1002 init_time = datetime.datetime.fromisoformat(
1003 tcoord.attributes["time_origin"]
1004 )
1005 frt_point = tcoord.units.date2num(init_time)
1006 frt_coord = iris.coords.AuxCoord(
1007 frt_point,
1008 units=tcoord.units,
1009 standard_name="forecast_reference_time",
1010 long_name="forecast_reference_time",
1011 )
1012 cube.add_aux_coord(frt_coord)
1013 except KeyError:
1014 logging.warning(
1015 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
1016 )
1018 # Remove time_origin to allow multiple case studies to merge.
1019 tcoord.attributes.pop("time_origin", None)
1021 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
1022 if not cube.coords("forecast_period"):
1023 try:
1024 # Create array of forecast lead times.
1025 init_coord = cube.coord("forecast_reference_time")
1026 init_time_points_in_tcoord_units = tcoord.units.date2num(
1027 init_coord.units.num2date(init_coord.points)
1028 )
1029 lead_times = tcoord.points - init_time_points_in_tcoord_units
1031 # Get unit for lead time from time coordinate's unit.
1032 # Convert all lead time to hours for consistency between models.
1033 if "seconds" in str(tcoord.units): 1033 ↛ 1034line 1033 didn't jump to line 1034 because the condition on line 1033 was never true
1034 lead_times = lead_times / 3600.0
1035 units = "hours"
1036 elif "hours" in str(tcoord.units): 1036 ↛ 1039line 1036 didn't jump to line 1039 because the condition on line 1036 was always true
1037 units = "hours"
1038 else:
1039 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
1041 # Create lead time coordinate.
1042 lead_time_coord = iris.coords.AuxCoord(
1043 lead_times,
1044 standard_name="forecast_period",
1045 long_name="forecast_period",
1046 units=units,
1047 )
1049 # Associate lead time coordinate with time dimension.
1050 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1051 except iris.exceptions.CoordinateNotFoundError:
1052 logging.warning(
1053 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1054 )
1055 except iris.exceptions.CoordinateNotFoundError:
1056 logging.warning("No time coordinate on cube.")
1059def _lfric_forecast_period_callback(cube: iris.cube.Cube):
1060 """Check forecast_period name and units."""
1061 try:
1062 coord = cube.coord("forecast_period")
1063 if coord.units != "hours":
1064 cube.coord("forecast_period").convert_units("hours")
1065 if not coord.standard_name:
1066 coord.standard_name = "forecast_period"
1067 except iris.exceptions.CoordinateNotFoundError:
1068 pass
1071def _fix_no_time_coords_callback(cube: iris.cube.Cube):
1072 """Add dummy time coord to process cubes that don't have sequence coord."""
1073 # Only add if time coordinate does not exist.
1074 if not cube.coords("time"):
1075 cube.add_aux_coord(
1076 iris.coords.DimCoord(
1077 0, standard_name="time", units="hours since 0001-01-01 00:00:00"
1078 )
1079 )
1081 return cube
1084def _normalise_ML_varname(cube: iris.cube.Cube):
1085 """Fix plev variable names to standard names."""
1086 if cube.coords("pressure"):
1087 if cube.name() == "x_wind":
1088 cube.long_name = "zonal_wind_at_pressure_levels"
1089 if cube.name() == "y_wind":
1090 cube.long_name = "meridional_wind_at_pressure_levels"
1091 if cube.name() == "air_temperature":
1092 cube.long_name = "temperature_at_pressure_levels"
1093 if cube.name() == "specific_humidity": 1093 ↛ 1094line 1093 didn't jump to line 1094 because the condition on line 1093 was never true
1094 cube.long_name = (
1095 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging"
1096 )