Coverage for src / CSET / operators / read.py: 90%
401 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:22 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import dask
27import iris
28import iris.coord_systems
29import iris.coords
30import iris.cube
31import iris.exceptions
32import iris.util
33import numpy as np
34from iris.analysis.cartography import rotate_pole, rotate_winds
36from CSET._common import iter_maybe
37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
38from CSET.operators._utils import (
39 get_cube_coordindex,
40 get_cube_yxcoordname,
41 is_spatialdim,
42)
45class NoDataError(FileNotFoundError):
46 """Error that no data has been loaded."""
49def read_cube(
50 file_paths: list[str] | str,
51 constraint: iris.Constraint = None,
52 model_names: list[str] | str | None = None,
53 subarea_type: str = None,
54 subarea_extent: list[float] = None,
55 **kwargs,
56) -> iris.cube.Cube:
57 """Read a single cube from files.
59 Read operator that takes a path string (can include shell-style glob
60 patterns), and loads the cube matching the constraint. If any paths point to
61 directory, all the files contained within are loaded.
63 Ensemble data can also be loaded. If it has a realization coordinate
64 already, it will be directly used. If not, it will have its member number
65 guessed from the filename, based on one of several common patterns. For
66 example the pattern *emXX*, where XX is the realization.
68 Deterministic data will be loaded with a realization of 0, allowing it to be
69 processed in the same way as ensemble data.
71 Arguments
72 ---------
73 file_paths: str | list[str]
74 Path or paths to where .pp/.nc files are located
75 constraint: iris.Constraint | iris.ConstraintCombination, optional
76 Constraints to filter data by. Defaults to unconstrained.
77 model_names: str | list[str], optional
78 Names of the models that correspond to respective paths in file_paths.
79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
80 Whether to constrain data by model relative coordinates or real world
81 coordinates.
82 subarea_extent: list, optional
83 List of coordinates to constraint data by, in order lower latitude,
84 upper latitude, lower longitude, upper longitude.
86 Returns
87 -------
88 cubes: iris.cube.Cube
89 Cube loaded
91 Raises
92 ------
93 FileNotFoundError
94 If the provided path does not exist
95 ValueError
96 If the constraint doesn't produce a single cube.
97 """
98 cubes = read_cubes(
99 file_paths=file_paths,
100 constraint=constraint,
101 model_names=model_names,
102 subarea_type=subarea_type,
103 subarea_extent=subarea_extent,
104 )
105 # Check filtered cubes is a CubeList containing one cube.
106 if len(cubes) == 1:
107 return cubes[0]
108 else:
109 raise ValueError(
110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
111 )
114def read_cubes(
115 file_paths: list[str] | str,
116 constraint: iris.Constraint | None = None,
117 model_names: str | list[str] | None = None,
118 subarea_type: str = None,
119 subarea_extent: list = None,
120 **kwargs,
121) -> iris.cube.CubeList:
122 """Read cubes from files.
124 Read operator that takes a path string (can include shell-style glob
125 patterns), and loads the cubes matching the constraint. If any paths point
126 to directory, all the files contained within are loaded.
128 Ensemble data can also be loaded. If it has a realization coordinate
129 already, it will be directly used. If not, it will have its member number
130 guessed from the filename, based on one of several common patterns. For
131 example the pattern *emXX*, where XX is the realization.
133 Deterministic data will be loaded with a realization of 0, allowing it to be
134 processed in the same way as ensemble data.
136 Data output by XIOS (such as LFRic) has its per-file metadata removed so
137 that the cubes merge across files.
139 Arguments
140 ---------
141 file_paths: str | list[str]
142 Path or paths to where .pp/.nc files are located. Can include globs.
143 constraint: iris.Constraint | iris.ConstraintCombination, optional
144 Constraints to filter data by. Defaults to unconstrained.
145 model_names: str | list[str], optional
146 Names of the models that correspond to respective paths in file_paths.
147 subarea_type: str, optional
148 Whether to constrain data by model relative coordinates or real world
149 coordinates.
150 subarea_extent: list[float], optional
151 List of coordinates to constraint data by, in order lower latitude,
152 upper latitude, lower longitude, upper longitude.
154 Returns
155 -------
156 cubes: iris.cube.CubeList
157 Cubes loaded after being merged and concatenated.
159 Raises
160 ------
161 FileNotFoundError
162 If the provided path does not exist
163 """
164 # Get iterable of paths. Each path corresponds to 1 model.
165 paths = iter_maybe(file_paths)
166 model_names = iter_maybe(model_names)
168 # Check we have appropriate number of model names.
169 if model_names != (None,) and len(model_names) != len(paths):
170 raise ValueError(
171 f"The number of model names ({len(model_names)}) should equal "
172 f"the number of paths given ({len(paths)})."
173 )
175 # Load the data for each model into a CubeList per model.
176 model_cubes = (
177 _load_model(path, name, constraint)
178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
179 )
181 # Split out first model's cubes and mark it as the base for comparisons.
182 cubes = next(model_cubes)
183 for cube in cubes:
184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
185 cube.attributes["cset_comparison_base"] = 1
187 # Load the rest of the models.
188 cubes.extend(itertools.chain.from_iterable(model_cubes))
190 # Unify time units so different case studies can merge.
191 iris.util.unify_time_units(cubes)
193 # Select sub region.
194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
196 # Merge and concatenate cubes now metadata has been fixed.
197 cubes = _merge_cubes_check_ensemble(cubes)
198 cubes = cubes.concatenate()
200 # Squeeze single valued coordinates into scalar coordinates.
201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
203 # Ensure dimension coordinates are bounded.
204 for cube in cubes:
205 for dim_coord in cube.coords(dim_coords=True):
206 # Iris can't guess the bounds of a scalar coordinate.
207 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
208 dim_coord.guess_bounds()
210 logging.info("Loaded cubes: %s", cubes)
211 if len(cubes) == 0:
212 raise NoDataError("No cubes loaded, check your constraints!")
213 return cubes
216def _load_model(
217 paths: str | list[str],
218 model_name: str | None,
219 constraint: iris.Constraint | None,
220) -> iris.cube.CubeList:
221 """Load a single model's data into a CubeList."""
222 input_files = _check_input_files(paths)
223 # If unset, a constraint of None lets everything be loaded.
224 logging.debug("Constraint: %s", constraint)
225 cubes = iris.load(input_files, constraint, callback=_loading_callback)
226 # Make the UM's winds consistent with LFRic.
227 _fix_um_winds(cubes)
229 # Add model_name attribute to each cube to make it available at any further
230 # step without needing to pass it as function parameter.
231 if model_name is not None:
232 for cube in cubes:
233 cube.attributes["model_name"] = model_name
234 return cubes
237def _check_input_files(input_paths: str | list[str]) -> list[Path]:
238 """Get an iterable of files to load, and check that they all exist.
240 Arguments
241 ---------
242 input_paths: list[str]
243 List of paths to input files or directories. The path may itself contain
244 glob patterns, but unlike in shells it will match directly first.
246 Returns
247 -------
248 list[Path]
249 A list of files to load.
251 Raises
252 ------
253 FileNotFoundError:
254 If the provided arguments don't resolve to at least one existing file.
255 """
256 files = []
257 for raw_filename in iter_maybe(input_paths):
258 # Match glob-like files first, if they exist.
259 raw_path = Path(raw_filename)
260 if raw_path.is_file():
261 files.append(raw_path)
262 else:
263 for input_path in glob.glob(raw_filename):
264 # Convert string paths into Path objects.
265 input_path = Path(input_path)
266 # Get the list of files in the directory, or use it directly.
267 if input_path.is_dir():
268 logging.debug("Checking directory '%s' for files", input_path)
269 files.extend(p for p in input_path.iterdir() if p.is_file())
270 else:
271 files.append(input_path)
273 files.sort()
274 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
275 if len(files) == 0:
276 raise FileNotFoundError(f"No files found for {input_paths}")
277 return files
280def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList):
281 """Merge CubeList, renumbering realizations of 0 if required.
283 An unsuccessful merge indicates common input cube attributes, most
284 commonly from ensemble members missing an explicit realization
285 coordinate. Therefore the members are renumbered before being merged
286 again.
287 """
288 try:
289 cubes = cubes.merge()
290 except iris.exceptions.MergeError:
291 _log_once(
292 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.",
293 level=logging.WARNING,
294 )
295 for ir, cube in enumerate(cubes):
296 if cube.coord("realization").points == 0: 296 ↛ 295line 296 didn't jump to line 295 because the condition on line 296 was always true
297 cube.coord("realization").points = ir + 1
298 cubes = cubes.merge()
299 return cubes
302def _cutout_cubes(
303 cubes: iris.cube.CubeList,
304 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
305 subarea_extent: list[float],
306):
307 """Cut out a subarea from a CubeList."""
308 if subarea_type is None:
309 logging.debug("Subarea selection is disabled.")
310 return cubes
312 # If selected, cutout according to number of grid cells to trim from each edge.
313 cutout_cubes = iris.cube.CubeList()
314 # Find spatial coordinates
315 for cube in cubes:
316 # Find dimension coordinates.
317 lat_name, lon_name = get_cube_yxcoordname(cube)
319 # Compute cutout based on number of cells to trim from edges.
320 if subarea_type == "gridcells":
321 logging.debug(
322 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
323 subarea_extent[0],
324 subarea_extent[1],
325 subarea_extent[2],
326 subarea_extent[3],
327 )
328 lat_points = np.sort(cube.coord(lat_name).points)
329 lon_points = np.sort(cube.coord(lon_name).points)
330 # Define cutout region using user provided cell points.
331 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
332 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
334 # Compute cutout based on specified coordinate values.
335 elif subarea_type == "realworld" or subarea_type == "modelrelative":
336 # If not gridcells, cutout by requested geographic area,
337 logging.debug(
338 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
339 subarea_extent[0],
340 subarea_extent[1],
341 subarea_extent[2],
342 subarea_extent[3],
343 )
344 # Define cutout region using user provided coordinates.
345 lats = np.array(subarea_extent[0:2])
346 lons = np.array(subarea_extent[2:4])
347 # Ensure cutout longitudes are within +/- 180.0 bounds.
348 while lons[0] < -180.0:
349 lons += 360.0
350 while lons[1] > 180.0:
351 lons -= 360.0
352 # If the coordinate system is rotated we convert coordinates into
353 # model-relative coordinates to extract the appropriate cutout.
354 coord_system = cube.coord(lat_name).coord_system
355 if subarea_type == "realworld" and isinstance(
356 coord_system, iris.coord_systems.RotatedGeogCS
357 ):
358 lons, lats = rotate_pole(
359 lons,
360 lats,
361 pole_lon=coord_system.grid_north_pole_longitude,
362 pole_lat=coord_system.grid_north_pole_latitude,
363 )
364 else:
365 raise ValueError("Unknown subarea_type:", subarea_type)
367 # Do cutout and add to cutout_cubes.
368 intersection_args = {lat_name: lats, lon_name: lons}
369 logging.debug("Cutting out coords: %s", intersection_args)
370 try:
371 cutout_cubes.append(cube.intersection(**intersection_args))
372 except IndexError as err:
373 raise ValueError(
374 "Region cutout error. Check and update SUBAREA_EXTENT."
375 "Cutout region requested should be contained within data area. "
376 "Also check if cutout region requested is smaller than input grid spacing."
377 ) from err
379 return cutout_cubes
382def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
383 """Compose together the needed callbacks into a single function."""
384 # Most callbacks operate in-place, but save the cube when returned!
385 _realization_callback(cube)
386 _um_normalise_callback(cube)
387 _lfric_normalise_callback(cube)
388 cube = _lfric_time_coord_fix_callback(cube)
389 _normalise_var0_varname(cube)
390 cube = _fix_no_spatial_coords_callback(cube)
391 _fix_spatial_coords_callback(cube)
392 _fix_pressure_coord_callback(cube)
393 _fix_um_radtime(cube)
394 _fix_cell_methods(cube)
395 cube = _convert_cube_units_callback(cube)
396 cube = _grid_longitude_fix_callback(cube)
397 _fix_lfric_cloud_base_altitude(cube)
398 _proleptic_gregorian_fix(cube)
399 _lfric_time_callback(cube)
400 _lfric_forecast_period_callback(cube)
401 cube = _fix_no_time_coords_callback(cube)
402 _normalise_ML_varname(cube)
403 return cube
406def _realization_callback(cube):
407 """Add a realization coordinate initialised to 0 if missing.
409 This means deterministic and ensemble cubes can assume realization coordinate through the rest
410 of the code.
411 """
412 # Only add if realization coordinate does not exist.
413 if not cube.coords("realization"):
414 cube.add_aux_coord(
415 iris.coords.DimCoord(0, standard_name="realization", units="1")
416 )
419@functools.lru_cache(None)
420def _log_once(msg, level=logging.WARNING):
421 """Print a warning message, skipping recent duplicates."""
422 logging.log(level, msg)
425def _um_normalise_callback(cube: iris.cube.Cube):
426 """Normalise UM STASH variable long names to LFRic variable names.
428 Note standard names will remain associated with cubes where different.
429 Long name will be used consistently in output filename and titles.
430 """
431 # Convert STASH to LFRic variable name
432 if "STASH" in cube.attributes:
433 stash = cube.attributes["STASH"]
434 try:
435 (name, grid) = STASH_TO_LFRIC[str(stash)]
436 cube.long_name = name
437 except KeyError:
438 # Don't change cubes with unknown stash codes.
439 _log_once(
440 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.",
441 level=logging.WARNING,
442 )
445def _lfric_normalise_callback(cube: iris.cube.Cube):
446 """Normalise attributes that prevents LFRic cube from merging.
448 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
449 no relation to the data contained. These attributes are removed.
451 The um_stash_source is a list of STASH codes for when an LFRic field maps to
452 multiple UM fields, however it can be encoded in any order. This attribute
453 is sorted to prevent this. This attribute is only present in LFRic data that
454 has been converted to look like UM data.
455 """
456 # Remove unwanted attributes.
457 cube.attributes.pop("timeStamp", None)
458 cube.attributes.pop("uuid", None)
459 cube.attributes.pop("name", None)
460 cube.attributes.pop("source", None)
461 cube.attributes.pop("analysis_source", None)
462 cube.attributes.pop("history", None)
464 # Sort STASH code list.
465 stash_list = cube.attributes.get("um_stash_source")
466 if stash_list:
467 # Parse the string as a list, sort, then re-encode as a string.
468 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
471def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
472 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
474 The coordinate is converted and replaced if not. SLAMed LFRic data has this
475 issue, though the coordinate satisfies all the properties for a DimCoord.
476 Scalar time values are left as AuxCoords.
477 """
478 # This issue seems to come from iris's handling of NetCDF files where time
479 # always ends up as an AuxCoord.
480 if cube.coords("time"):
481 time_coord = cube.coord("time")
482 if (
483 not isinstance(time_coord, iris.coords.DimCoord)
484 and len(cube.coord_dims(time_coord)) == 1
485 ):
486 # Fudge the bounds to foil checking for strict monotonicity.
487 if time_coord.has_bounds(): 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
489 time_coord.bounds = [
490 [
491 time_coord.bounds[i][0] + 1.0e-8 * float(i),
492 time_coord.bounds[i][1],
493 ]
494 for i in range(len(time_coord.bounds))
495 ]
496 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
497 return cube
500def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
501 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
503 This is necessary if comparing two models with different conventions --
504 for example, models where the prime meridian is defined as 0 deg or
505 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
506 so that it falls in this range. Checks are for near-180 bounds given
507 model data bounds may not extend exactly to 0. or 360.
508 Input cubes on non-rotated grid coordinates are not impacted.
509 """
510 try:
511 y, x = get_cube_yxcoordname(cube)
512 except ValueError:
513 # Don't modify non-spatial cubes.
514 return cube
516 long_coord = cube.coord(x)
517 # Wrap longitudes if rotated pole coordinates
518 coord_system = long_coord.coord_system
519 if x == "grid_longitude" and isinstance(
520 coord_system, iris.coord_systems.RotatedGeogCS
521 ):
522 long_points = long_coord.points.copy()
523 long_centre = np.median(long_points)
524 while long_centre < -175.0:
525 long_centre += 360.0
526 long_points += 360.0
527 while long_centre >= 175.0:
528 long_centre -= 360.0
529 long_points -= 360.0
530 long_coord.points = long_points
532 # Update coord bounds to be consistent with wrapping.
533 if long_coord.has_bounds():
534 long_coord.bounds = None
535 long_coord.guess_bounds()
537 return cube
540def _fix_no_spatial_coords_callback(cube: iris.cube.Cube):
541 import CSET.operators._utils as utils
543 # Don't modify spatial cubes that already have spatial dimensions
544 if utils.is_spatialdim(cube):
545 return cube
547 else:
548 # attempt to get lat/long from cube attributes
549 try:
550 lat_min = cube.attributes.get("geospatial_lat_min")
551 lat_max = cube.attributes.get("geospatial_lat_max")
552 lon_min = cube.attributes.get("geospatial_lon_min")
553 lon_max = cube.attributes.get("geospatial_lon_max")
555 lon_val = (lon_min + lon_max) / 2.0
556 lat_val = (lat_min + lat_max) / 2.0
558 lat_coord = iris.coords.DimCoord(
559 lat_val,
560 standard_name="latitude",
561 units="degrees_north",
562 var_name="latitude",
563 coord_system=iris.coord_systems.GeogCS(6371229.0),
564 circular=True,
565 )
567 lon_coord = iris.coords.DimCoord(
568 lon_val,
569 standard_name="longitude",
570 units="degrees_east",
571 var_name="longitude",
572 coord_system=iris.coord_systems.GeogCS(6371229.0),
573 circular=True,
574 )
576 cube.add_aux_coord(lat_coord)
577 cube.add_aux_coord(lon_coord)
578 return cube
580 # if lat/long are not in attributes, then return cube unchanged:
581 except TypeError:
582 return cube
585def _fix_spatial_coords_callback(cube: iris.cube.Cube):
586 """Check latitude and longitude coordinates name.
588 This is necessary as some models define their grid as on rotated
589 'grid_latitude' and 'grid_longitude' coordinates while others define
590 the grid on non-rotated 'latitude' and 'longitude'.
591 Cube dimensions need to be made consistent to avoid recipe failures,
592 particularly where comparing multiple input models with differing spatial
593 coordinates.
594 """
595 # Check if cube is spatial.
596 if not is_spatialdim(cube):
597 # Don't modify non-spatial cubes.
598 return
600 # Get spatial coords and dimension index.
601 y_name, x_name = get_cube_yxcoordname(cube)
602 ny = get_cube_coordindex(cube, y_name)
603 nx = get_cube_coordindex(cube, x_name)
605 # Remove spatial coords bounds if erroneous values detected.
606 # Aims to catch some errors in input coord bounds by setting
607 # invalid threshold of 10000.0
608 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
609 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
610 by_max = np.max(np.abs(cube.coord(y_name).bounds))
611 if bx_max > 10000.0 or by_max > 10000.0:
612 cube.coord(x_name).bounds = None
613 cube.coord(y_name).bounds = None
615 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
616 # [latitude, longitude] for instances where rotated_pole=90.0
617 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
618 coord_system = cube.coord("grid_latitude").coord_system
619 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
620 if pole_lat == 90.0: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true
621 lats = cube.coord("grid_latitude").points
622 lons = cube.coord("grid_longitude").points
624 cube.remove_coord("grid_latitude")
625 cube.add_dim_coord(
626 iris.coords.DimCoord(
627 lats,
628 standard_name="latitude",
629 var_name="latitude",
630 units="degrees",
631 coord_system=iris.coord_systems.GeogCS(6371229.0),
632 circular=True,
633 ),
634 ny,
635 )
636 y_name = "latitude"
637 cube.remove_coord("grid_longitude")
638 cube.add_dim_coord(
639 iris.coords.DimCoord(
640 lons,
641 standard_name="longitude",
642 var_name="longitude",
643 units="degrees",
644 coord_system=iris.coord_systems.GeogCS(6371229.0),
645 circular=True,
646 ),
647 nx,
648 )
649 x_name = "longitude"
651 # Create additional AuxCoord [grid_latitude, grid_longitude] with
652 # rotated pole attributes for cases with [lat, lon] inputs
653 if y_name in ["latitude"] and cube.coord(y_name).units in [
654 "degrees",
655 "degrees_north",
656 "degrees_south",
657 ]:
658 # Add grid_latitude AuxCoord
659 if "grid_latitude" not in [ 659 ↛ 672line 659 didn't jump to line 672 because the condition on line 659 was always true
660 coord.name() for coord in cube.coords(dim_coords=False)
661 ]:
662 cube.add_aux_coord(
663 iris.coords.AuxCoord(
664 cube.coord(y_name).points,
665 var_name="grid_latitude",
666 units="degrees",
667 ),
668 ny,
669 )
670 # Ensure input latitude DimCoord has CoordSystem
671 # This attribute is sometimes lost on iris.save
672 if not cube.coord(y_name).coord_system:
673 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
675 if x_name in ["longitude"] and cube.coord(x_name).units in [
676 "degrees",
677 "degrees_west",
678 "degrees_east",
679 ]:
680 # Add grid_longitude AuxCoord
681 if "grid_longitude" not in [ 681 ↛ 695line 681 didn't jump to line 695 because the condition on line 681 was always true
682 coord.name() for coord in cube.coords(dim_coords=False)
683 ]:
684 cube.add_aux_coord(
685 iris.coords.AuxCoord(
686 cube.coord(x_name).points,
687 var_name="grid_longitude",
688 units="degrees",
689 ),
690 nx,
691 )
693 # Ensure input longitude DimCoord has CoordSystem
694 # This attribute is sometimes lost on iris.save
695 if not cube.coord(x_name).coord_system:
696 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
699def _fix_pressure_coord_callback(cube: iris.cube.Cube):
700 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
702 This problem was raised because the AIFS model data from ECMWF
703 defines the pressure coordinate with the name "pressure_level" rather
704 than compliant CF coordinate names.
706 Additionally, set the units of pressure to be hPa to be consistent with the UM,
707 and approach the coordinates in a unified way.
708 """
709 for coord in cube.dim_coords:
710 if coord.name() in ["pressure_level", "pressure_levels"]:
711 coord.rename("pressure")
713 if coord.name() == "pressure":
714 if str(cube.coord("pressure").units) != "hPa":
715 cube.coord("pressure").convert_units("hPa")
718def _fix_um_radtime(cube: iris.cube.Cube):
719 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
721 This callback does not have any effect for output diagnostics with
722 timestamps exactly 00 or 30 minutes past the hour. Only radiation
723 diagnostics are checked.
724 Note this callback does not interpolate the data in time, only adjust
725 timestamps to sit on the hour to enable time-to-time difference plotting
726 with models which may output radiation data on the hour.
727 """
728 try:
729 if cube.attributes["STASH"] in [
730 "m01s01i207",
731 "m01s01i208",
732 "m01s02i205",
733 "m01s02i201",
734 "m01s01i207",
735 "m01s02i207",
736 "m01s01i235",
737 ]:
738 time_coord = cube.coord("time")
740 # Convert time points to datetime objects
741 time_unit = time_coord.units
742 time_points = time_unit.num2date(time_coord.points)
743 # Skip if times don't need fixing.
744 if time_points[0].minute == 0 and time_points[0].second == 0:
745 return
746 if time_points[0].minute == 30 and time_points[0].second == 0: 746 ↛ 747line 746 didn't jump to line 747 because the condition on line 746 was never true
747 return
749 # Subtract time difference from the hour from each time point
750 n_minute = time_points[0].minute
751 n_second = time_points[0].second
752 # If times closer to next hour, compute difference to add on to following hour
753 if n_minute > 30:
754 n_minute = n_minute - 60
755 # Compute new diagnostic time stamp
756 new_time_points = (
757 time_points
758 - datetime.timedelta(minutes=n_minute)
759 - datetime.timedelta(seconds=n_second)
760 )
762 # Convert back to numeric values using the original time unit.
763 new_time_values = time_unit.date2num(new_time_points)
765 # Replace the time coordinate with updated values.
766 time_coord.points = new_time_values
768 # Recompute forecast_period with corrected values.
769 if cube.coord("forecast_period"): 769 ↛ exitline 769 didn't return from function '_fix_um_radtime' because the condition on line 769 was always true
770 fcst_prd_points = cube.coord("forecast_period").points
771 new_fcst_points = (
772 time_unit.num2date(fcst_prd_points)
773 - datetime.timedelta(minutes=n_minute)
774 - datetime.timedelta(seconds=n_second)
775 )
776 cube.coord("forecast_period").points = time_unit.date2num(
777 new_fcst_points
778 )
779 except KeyError:
780 pass
783def _fix_cell_methods(cube: iris.cube.Cube):
784 """To fix the assumed cell_methods in accumulation STASH from UM.
786 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
787 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
788 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
789 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
790 "sum". This callback changes "mean" cube attribute cell_method to "sum",
791 enabling the cell_method constraint on reading to select correct input.
792 """
793 # Shift "mean" cell_method to "sum" for selected UM inputs.
794 if cube.attributes.get("STASH") in [
795 "m01s21i104",
796 "m01s04i201",
797 "m01s04i202",
798 "m01s05i201",
799 "m01s05i202",
800 ]:
801 # Check if input cell_method contains "mean" time-processing.
802 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 802 ↛ exitline 802 didn't return from function '_fix_cell_methods' because the condition on line 802 was always true
803 # Retrieve interval and any comment information.
804 for cell_method in cube.cell_methods:
805 interval_str = cell_method.intervals
806 comment_str = cell_method.comments
808 # Remove input aggregation method.
809 cube.cell_methods = ()
811 # Replace "mean" with "sum" cell_method to indicate aggregation.
812 cube.add_cell_method(
813 iris.coords.CellMethod(
814 method="sum",
815 coords="time",
816 intervals=interval_str,
817 comments=comment_str,
818 )
819 )
822def _convert_cube_units_callback(cube: iris.cube.Cube):
823 """Adjust diagnostic units for specific variables.
825 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
826 converted here to mm hr-1.
828 Visibility diagnostics are converted here from m to km to improve output
829 formatting.
830 """
831 # Convert precipitation diagnostic units if required.
832 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
833 if any("surface_microphysical" in name for name in varnames):
834 if cube.units == "kg m-2 s-1":
835 _log_once(
836 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1",
837 level=logging.DEBUG,
838 )
839 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
840 # This is a 1:1 conversion, so we just change the units.
841 cube.units = "mm s-1"
842 # Convert the units to per hour.
843 cube.convert_units("mm hr-1")
844 elif cube.units == "kg m-2": 844 ↛ 854line 844 didn't jump to line 854 because the condition on line 844 was always true
845 _log_once(
846 "Converting precipitation amount units from kg m-2 to mm",
847 level=logging.DEBUG,
848 )
849 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
850 # This is a 1:1 conversion, so we just change the units.
851 cube.units = "mm"
853 # Convert visibility diagnostic units if required.
854 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
855 if any("visibility" in name for name in varnames):
856 if cube.units == "m": 856 ↛ 861line 856 didn't jump to line 861 because the condition on line 856 was always true
857 _log_once("Converting visibility units m to km.", level=logging.DEBUG)
858 # Convert the units to km.
859 cube.convert_units("km")
861 return cube
864def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
865 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
866 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
867 if any("cloud_base_altitude" in name for name in varnames):
868 # Mask cube where set > 144kft to catch default 144.35695538058164
869 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0)
872def _fix_um_winds(cubes: iris.cube.CubeList):
873 """To make winds from the UM consistent with those from LFRic.
875 Diagnostics of wind are not always consistent between the UM
876 and LFric. Here, winds from the UM are adjusted to make them i
877 consistent with LFRic.
878 """
879 # Check whether we have components of the wind identified by STASH,
880 # (so this will apply only to cubes from the UM), but not the
881 # wind speed and calculate it if it is missing. Note that
882 # this will be biased low in general because the components will mostly
883 # be time averages. For simplicity, we do this only if there is just one
884 # cube of a component. A more complicated approach would be to consider
885 # the cell methods, but it may not be warranted.
886 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
887 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
888 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
889 try:
890 if cubes.extract(u_constr) and cubes.extract(v_constr): 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
892 _add_wind_speed_um(cubes)
893 # Convert winds in the UM to be relative to true east and true north.
894 _convert_wind_true_dirn_um(cubes)
895 except (KeyError, AttributeError):
896 pass
899def _add_wind_speed_um(cubes: iris.cube.CubeList):
900 """Add windspeeds to cubes from the UM."""
901 wspd10 = (
902 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
903 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
904 ) ** 0.5
905 wspd10.attributes["STASH"] = "m01s03i227"
906 wspd10.standard_name = "wind_speed"
907 wspd10.long_name = "wind_speed_at_10m"
908 cubes.append(wspd10)
911def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
912 """To convert winds to true directions.
914 Convert from the components relative to the grid to true directions.
915 This functionality only handles the simplest case.
916 """
917 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
918 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
919 for u, v in zip(u_grids, v_grids, strict=True):
920 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
921 u.data = true_u.core_data()
922 v.data = true_v.core_data()
925def _normalise_var0_varname(cube: iris.cube.Cube):
926 """Fix varnames for consistency to allow merging.
928 Some model data netCDF sometimes have a coordinate name end in
929 "_0" etc, where duplicate coordinates of same name are defined but
930 with different attributes. This can be inconsistently managed in
931 different model inputs and can cause cubes to fail to merge.
932 """
933 for coord in cube.coords():
934 if coord.var_name and coord.var_name.endswith("_0"):
935 coord.var_name = coord.var_name.removesuffix("_0")
936 if coord.var_name and coord.var_name.endswith("_1"):
937 coord.var_name = coord.var_name.removesuffix("_1")
938 if coord.var_name and coord.var_name.endswith("_2"): 938 ↛ 939line 938 didn't jump to line 939 because the condition on line 938 was never true
939 coord.var_name = coord.var_name.removesuffix("_2")
940 if coord.var_name and coord.var_name.endswith("_3"): 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true
941 coord.var_name = coord.var_name.removesuffix("_3")
943 if cube.var_name and cube.var_name.endswith("_0"):
944 cube.var_name = cube.var_name.removesuffix("_0")
947def _proleptic_gregorian_fix(cube: iris.cube.Cube):
948 """Convert the calendars of time units to use a standard calendar."""
949 try:
950 time_coord = cube.coord("time")
951 if time_coord.units.calendar == "proleptic_gregorian":
952 logging.debug(
953 "Changing proleptic Gregorian calendar to standard calendar for %s",
954 repr(time_coord.units),
955 )
956 time_coord.units = time_coord.units.change_calendar("standard")
957 except iris.exceptions.CoordinateNotFoundError:
958 pass
961def _lfric_time_callback(cube: iris.cube.Cube):
962 """Fix time coordinate metadata if missing dimensions.
964 Some model data does not contain forecast_reference_time or forecast_period as
965 expected coordinates, and so we cannot aggregate over case studies without this
966 metadata. This callback fixes these issues.
968 This callback also ensures all time coordinates are referenced as hours since
969 1970-01-01 00:00:00 for consistency across different model inputs.
971 Notes
972 -----
973 Some parts of the code have been adapted from Paul Earnshaw's scripts.
974 """
975 # Construct forecast_reference time if it doesn't exist.
976 try:
977 tcoord = cube.coord("time")
978 # Set time coordinate to common basis "hours since 1970"
979 try:
980 tcoord.convert_units("hours since 1970-01-01 00:00:00")
981 except ValueError:
982 logging.warning("Unrecognised base time unit: %s", tcoord.units)
984 if not cube.coords("forecast_reference_time"):
985 try:
986 init_time = datetime.datetime.fromisoformat(
987 tcoord.attributes["time_origin"]
988 )
989 frt_point = tcoord.units.date2num(init_time)
990 frt_coord = iris.coords.AuxCoord(
991 frt_point,
992 units=tcoord.units,
993 standard_name="forecast_reference_time",
994 long_name="forecast_reference_time",
995 )
996 cube.add_aux_coord(frt_coord)
997 except KeyError:
998 logging.warning(
999 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
1000 )
1002 # Remove time_origin to allow multiple case studies to merge.
1003 tcoord.attributes.pop("time_origin", None)
1005 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
1006 if not cube.coords("forecast_period"):
1007 try:
1008 # Create array of forecast lead times.
1009 init_coord = cube.coord("forecast_reference_time")
1010 init_time_points_in_tcoord_units = tcoord.units.date2num(
1011 init_coord.units.num2date(init_coord.points)
1012 )
1013 lead_times = tcoord.points - init_time_points_in_tcoord_units
1015 # Get unit for lead time from time coordinate's unit.
1016 # Convert all lead time to hours for consistency between models.
1017 if "seconds" in str(tcoord.units): 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true
1018 lead_times = lead_times / 3600.0
1019 units = "hours"
1020 elif "hours" in str(tcoord.units): 1020 ↛ 1023line 1020 didn't jump to line 1023 because the condition on line 1020 was always true
1021 units = "hours"
1022 else:
1023 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
1025 # Create lead time coordinate.
1026 lead_time_coord = iris.coords.AuxCoord(
1027 lead_times,
1028 standard_name="forecast_period",
1029 long_name="forecast_period",
1030 units=units,
1031 )
1033 # Associate lead time coordinate with time dimension.
1034 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1035 except iris.exceptions.CoordinateNotFoundError:
1036 logging.warning(
1037 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1038 )
1039 except iris.exceptions.CoordinateNotFoundError:
1040 logging.warning("No time coordinate on cube.")
1043def _lfric_forecast_period_callback(cube: iris.cube.Cube):
1044 """Check forecast_period name and units."""
1045 try:
1046 coord = cube.coord("forecast_period")
1047 if coord.units != "hours":
1048 cube.coord("forecast_period").convert_units("hours")
1049 if not coord.standard_name:
1050 coord.standard_name = "forecast_period"
1051 except iris.exceptions.CoordinateNotFoundError:
1052 pass
1055def _fix_no_time_coords_callback(cube: iris.cube.Cube):
1056 """Add dummy time coord to process cubes that don't have sequence coord."""
1057 # Only add if time coordinate does not exist.
1058 if not cube.coords("time"):
1059 cube.add_aux_coord(
1060 iris.coords.DimCoord(
1061 0, standard_name="time", units="hours since 0001-01-01 00:00:00"
1062 )
1063 )
1065 return cube
1068def _normalise_ML_varname(cube: iris.cube.Cube):
1069 """Fix plev variable names to standard names."""
1070 if cube.coords("pressure"):
1071 if cube.name() == "x_wind":
1072 cube.long_name = "zonal_wind_at_pressure_levels"
1073 if cube.name() == "y_wind":
1074 cube.long_name = "meridional_wind_at_pressure_levels"
1075 if cube.name() == "air_temperature":
1076 cube.long_name = "temperature_at_pressure_levels"
1077 if cube.name() == "specific_humidity": 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true
1078 cube.long_name = (
1079 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging"
1080 )