Coverage for src/CSET/operators/read.py: 91%
409 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-22 10:06 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-22 10:06 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import dask
27import iris
28import iris.coord_systems
29import iris.coords
30import iris.cube
31import iris.exceptions
32import iris.util
33import numpy as np
34from iris.analysis.cartography import rotate_pole, rotate_winds
36from CSET._common import iter_maybe
37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
38from CSET.operators._utils import (
39 get_cube_coordindex,
40 get_cube_yxcoordname,
41 is_spatialdim,
42)
43from CSET.operators.regrid import restructure_wrapper
46class NoDataError(FileNotFoundError):
47 """Error that no data has been loaded."""
50def read_cube(
51 file_paths: list[str] | str,
52 constraint: iris.Constraint = None,
53 model_names: list[str] | str | None = None,
54 subarea_type: str = None,
55 subarea_extent: list[float] = None,
56 **kwargs,
57) -> iris.cube.Cube:
58 """Read a single cube from files.
60 Read operator that takes a path string (can include shell-style glob
61 patterns), and loads the cube matching the constraint. If any paths point to
62 directory, all the files contained within are loaded.
64 Ensemble data can also be loaded. If it has a realization coordinate
65 already, it will be directly used. If not, it will have its member number
66 guessed from the filename, based on one of several common patterns. For
67 example the pattern *emXX*, where XX is the realization.
69 Deterministic data will be loaded with a realization of 0, allowing it to be
70 processed in the same way as ensemble data.
72 Arguments
73 ---------
74 file_paths: str | list[str]
75 Path or paths to where .pp/.nc files are located
76 constraint: iris.Constraint | iris.ConstraintCombination, optional
77 Constraints to filter data by. Defaults to unconstrained.
78 model_names: str | list[str], optional
79 Names of the models that correspond to respective paths in file_paths.
80 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
81 Whether to constrain data by model relative coordinates or real world
82 coordinates.
83 subarea_extent: list, optional
84 List of coordinates to constraint data by, in order lower latitude,
85 upper latitude, lower longitude, upper longitude.
87 Returns
88 -------
89 cubes: iris.cube.Cube
90 Cube loaded
92 Raises
93 ------
94 FileNotFoundError
95 If the provided path does not exist
96 ValueError
97 If the constraint doesn't produce a single cube.
98 """
99 cubes = read_cubes(
100 file_paths=file_paths,
101 constraint=constraint,
102 model_names=model_names,
103 subarea_type=subarea_type,
104 subarea_extent=subarea_extent,
105 )
106 # Check filtered cubes is a CubeList containing one cube.
107 if len(cubes) == 1:
108 return cubes[0]
109 else:
110 raise ValueError(
111 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
112 )
115def read_cubes(
116 file_paths: list[str] | str,
117 constraint: iris.Constraint | None = None,
118 model_names: str | list[str] | None = None,
119 subarea_type: str = None,
120 subarea_extent: list = None,
121 **kwargs,
122) -> iris.cube.CubeList:
123 """Read cubes from files.
125 Read operator that takes a path string (can include shell-style glob
126 patterns), and loads the cubes matching the constraint. If any paths point
127 to directory, all the files contained within are loaded.
129 Ensemble data can also be loaded. If it has a realization coordinate
130 already, it will be directly used. If not, it will have its member number
131 guessed from the filename, based on one of several common patterns. For
132 example the pattern *emXX*, where XX is the realization.
134 Deterministic data will be loaded with a realization of 0, allowing it to be
135 processed in the same way as ensemble data.
137 Data output by XIOS (such as LFRic) has its per-file metadata removed so
138 that the cubes merge across files.
140 Arguments
141 ---------
142 file_paths: str | list[str]
143 Path or paths to where .pp/.nc files are located. Can include globs.
144 constraint: iris.Constraint | iris.ConstraintCombination, optional
145 Constraints to filter data by. Defaults to unconstrained.
146 model_names: str | list[str], optional
147 Names of the models that correspond to respective paths in file_paths.
148 subarea_type: str, optional
149 Whether to constrain data by model relative coordinates or real world
150 coordinates.
151 subarea_extent: list[float], optional
152 List of coordinates to constraint data by, in order lower latitude,
153 upper latitude, lower longitude, upper longitude.
155 Returns
156 -------
157 cubes: iris.cube.CubeList
158 Cubes loaded after being merged and concatenated.
160 Raises
161 ------
162 FileNotFoundError
163 If the provided path does not exist
164 """
165 # Get iterable of paths. Each path corresponds to 1 model.
166 paths = iter_maybe(file_paths)
167 model_names = iter_maybe(model_names)
169 # Check we have appropriate number of model names.
170 if model_names != (None,) and len(model_names) != len(paths):
171 raise ValueError(
172 f"The number of model names ({len(model_names)}) should equal "
173 f"the number of paths given ({len(paths)})."
174 )
176 # Load the data for each model into a CubeList per model.
177 model_cubes = (
178 _load_model(path, name, constraint)
179 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
180 )
182 # Split out first model's cubes and mark it as the base for comparisons.
183 cubes = next(model_cubes)
184 for cube in cubes:
185 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
186 cube.attributes["cset_comparison_base"] = 1
188 # Load the rest of the models.
189 cubes.extend(itertools.chain.from_iterable(model_cubes))
191 # Unify time units so different case studies can merge.
192 iris.util.unify_time_units(cubes)
194 # Select sub region.
195 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
197 # Merge and concatenate cubes now metadata has been fixed.
198 cubes = _merge_cubes_check_ensemble(cubes)
199 cubes = cubes.concatenate()
201 # Squeeze single valued coordinates into scalar coordinates.
202 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
204 # Ensure dimension coordinates are bounded.
205 for cube in cubes:
206 for dim_coord in cube.coords(dim_coords=True):
207 if (dim_coord.standard_name == "time") and (
208 dim_coord.name()
209 not in itertools.chain.from_iterable(
210 m.coord_names for m in cube.cell_methods if m.method != "point"
211 )
212 ):
213 # Instantaneous time coordinate
214 continue
215 # Iris can't guess the bounds of a scalar coordinate.
216 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
217 dim_coord.guess_bounds()
219 logging.info("Loaded cubes: %s", cubes)
220 if len(cubes) == 0:
221 raise NoDataError("No cubes loaded, check your constraints!")
222 return cubes
225def _load_model(
226 paths: str | list[str],
227 model_name: str | None,
228 constraint: iris.Constraint | None,
229) -> iris.cube.CubeList:
230 """Load a single model's data into a CubeList."""
231 input_files = _check_input_files(paths)
232 # If unset, a constraint of None lets everything be loaded.
233 logging.debug("Constraint: %s", constraint)
235 cubes = iris.load(input_files)
237 # If a cube called latitude exists, chances are its unstructured.
238 # Using extract, not extract_cubes in a try as there might be more
239 # than one cube called latitude if we are aggregating.
240 if len(cubes.extract("latitude")) > 0: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 cubes = restructure_wrapper(cubes)
243 for cube in cubes:
244 _loading_callback(cube, None, None)
246 # Extract required cubes based on constraint
247 cubes = cubes.extract(constraint)
249 # cubes = iris.load(input_files, constraint, callback=_loading_callback)
250 # Make the UM's winds consistent with LFRic.
251 _fix_um_winds(cubes)
253 # Add model_name attribute to each cube to make it available at any further
254 # step without needing to pass it as function parameter.
255 if model_name is not None:
256 for cube in cubes:
257 cube.attributes["model_name"] = model_name
258 return cubes
261def _check_input_files(input_paths: str | list[str]) -> list[Path]:
262 """Get an iterable of files to load, and check that they all exist.
264 Arguments
265 ---------
266 input_paths: list[str]
267 List of paths to input files or directories. The path may itself contain
268 glob patterns, but unlike in shells it will match directly first.
270 Returns
271 -------
272 list[Path]
273 A list of files to load.
275 Raises
276 ------
277 FileNotFoundError:
278 If the provided arguments don't resolve to at least one existing file.
279 """
280 files = []
281 for raw_filename in iter_maybe(input_paths):
282 # Match glob-like files first, if they exist.
283 raw_path = Path(raw_filename)
284 if raw_path.is_file():
285 files.append(raw_path)
286 else:
287 for input_path in glob.glob(raw_filename):
288 # Convert string paths into Path objects.
289 input_path = Path(input_path)
290 # Get the list of files in the directory, or use it directly.
291 if input_path.is_dir():
292 logging.debug("Checking directory '%s' for files", input_path)
293 files.extend(p for p in input_path.iterdir() if p.is_file())
294 else:
295 files.append(input_path)
297 files.sort()
298 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
299 if len(files) == 0:
300 raise FileNotFoundError(f"No files found for {input_paths}")
301 return files
304def _merge_cubes_check_ensemble(cubes: iris.cube.CubeList):
305 """Merge CubeList, renumbering realizations of 0 if required.
307 An unsuccessful merge indicates common input cube attributes, most
308 commonly from ensemble members missing an explicit realization
309 coordinate. Therefore the members are renumbered before being merged
310 again.
311 """
312 try:
313 cubes = cubes.merge()
314 except iris.exceptions.MergeError:
315 _log_once(
316 "Attempt to merge input CubeList failed. Attempting to iterate realization coords to enable merge.",
317 level=logging.WARNING,
318 )
319 for ir, cube in enumerate(cubes):
320 if cube.coord("realization").points == 0: 320 ↛ 319line 320 didn't jump to line 319 because the condition on line 320 was always true
321 cube.coord("realization").points = ir + 1
322 cubes = cubes.merge()
323 return cubes
326def _cutout_cubes(
327 cubes: iris.cube.CubeList,
328 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
329 subarea_extent: list[float],
330):
331 """Cut out a subarea from a CubeList."""
332 if subarea_type is None:
333 logging.debug("Subarea selection is disabled.")
334 return cubes
336 # If selected, cutout according to number of grid cells to trim from each edge.
337 cutout_cubes = iris.cube.CubeList()
338 # Find spatial coordinates
339 for cube in cubes:
340 # Find dimension coordinates.
341 lat_name, lon_name = get_cube_yxcoordname(cube)
343 # Compute cutout based on number of cells to trim from edges.
344 if subarea_type == "gridcells":
345 logging.debug(
346 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
347 subarea_extent[0],
348 subarea_extent[1],
349 subarea_extent[2],
350 subarea_extent[3],
351 )
352 lat_points = np.sort(cube.coord(lat_name).points)
353 lon_points = np.sort(cube.coord(lon_name).points)
354 # Define cutout region using user provided cell points.
355 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
356 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
358 # Compute cutout based on specified coordinate values.
359 elif subarea_type == "realworld" or subarea_type == "modelrelative":
360 # If not gridcells, cutout by requested geographic area,
361 logging.debug(
362 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
363 subarea_extent[0],
364 subarea_extent[1],
365 subarea_extent[2],
366 subarea_extent[3],
367 )
368 # Define cutout region using user provided coordinates.
369 lats = np.array(subarea_extent[0:2])
370 lons = np.array(subarea_extent[2:4])
371 # Ensure cutout longitudes are within +/- 180.0 bounds.
372 while lons[0] < -180.0:
373 lons += 360.0
374 while lons[1] > 180.0:
375 lons -= 360.0
376 # If the coordinate system is rotated we convert coordinates into
377 # model-relative coordinates to extract the appropriate cutout.
378 coord_system = cube.coord(lat_name).coord_system
379 if subarea_type == "realworld" and isinstance(
380 coord_system, iris.coord_systems.RotatedGeogCS
381 ):
382 lons, lats = rotate_pole(
383 lons,
384 lats,
385 pole_lon=coord_system.grid_north_pole_longitude,
386 pole_lat=coord_system.grid_north_pole_latitude,
387 )
388 else:
389 raise ValueError("Unknown subarea_type:", subarea_type)
391 # Do cutout and add to cutout_cubes.
392 intersection_args = {lat_name: lats, lon_name: lons}
393 logging.debug("Cutting out coords: %s", intersection_args)
394 try:
395 cutout_cubes.append(cube.intersection(**intersection_args))
396 except IndexError as err:
397 raise ValueError(
398 "Region cutout error. Check and update SUBAREA_EXTENT."
399 "Cutout region requested should be contained within data area. "
400 "Also check if cutout region requested is smaller than input grid spacing."
401 ) from err
403 return cutout_cubes
406def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
407 """Compose together the needed callbacks into a single function."""
408 # Most callbacks operate in-place, but save the cube when returned!
409 _realization_callback(cube)
410 _um_normalise_callback(cube)
411 _lfric_normalise_callback(cube)
412 cube = _lfric_time_coord_fix_callback(cube)
413 _normalise_var0_varname(cube)
414 cube = _fix_no_spatial_coords_callback(cube)
415 _fix_spatial_coords_callback(cube)
416 _fix_pressure_coord_callback(cube)
417 _fix_um_radtime(cube)
418 _fix_cell_methods(cube)
419 cube = _convert_cube_units_callback(cube)
420 cube = _grid_longitude_fix_callback(cube)
421 _fix_lfric_cloud_base_altitude(cube)
422 _proleptic_gregorian_fix(cube)
423 _lfric_time_callback(cube)
424 _lfric_forecast_period_callback(cube)
425 cube = _fix_no_time_coords_callback(cube)
426 _normalise_ML_varname(cube)
427 return cube
430def _realization_callback(cube):
431 """Add a realization coordinate initialised to 0 if missing.
433 This means deterministic and ensemble cubes can assume realization coordinate through the rest
434 of the code.
435 """
436 # Only add if realization coordinate does not exist.
437 if not cube.coords("realization"):
438 cube.add_aux_coord(
439 iris.coords.DimCoord(0, standard_name="realization", units="1")
440 )
443@functools.lru_cache(None)
444def _log_once(msg, level=logging.WARNING):
445 """Print a warning message, skipping recent duplicates."""
446 logging.log(level, msg)
449def _um_normalise_callback(cube: iris.cube.Cube):
450 """Normalise UM STASH variable long names to LFRic variable names.
452 Note standard names will remain associated with cubes where different.
453 Long name will be used consistently in output filename and titles.
454 """
455 # Convert STASH to LFRic variable name
456 if "STASH" in cube.attributes:
457 stash = cube.attributes["STASH"]
458 try:
459 (name, grid) = STASH_TO_LFRIC[str(stash)]
460 cube.long_name = name
461 except KeyError:
462 # Don't change cubes with unknown stash codes.
463 _log_once(
464 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.",
465 level=logging.WARNING,
466 )
469def _lfric_normalise_callback(cube: iris.cube.Cube):
470 """Normalise attributes that prevents LFRic cube from merging.
472 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
473 no relation to the data contained. These attributes are removed.
475 The um_stash_source is a list of STASH codes for when an LFRic field maps to
476 multiple UM fields, however it can be encoded in any order. This attribute
477 is sorted to prevent this. This attribute is only present in LFRic data that
478 has been converted to look like UM data.
479 """
480 # Remove unwanted attributes.
481 cube.attributes.pop("timeStamp", None)
482 cube.attributes.pop("uuid", None)
483 cube.attributes.pop("name", None)
484 cube.attributes.pop("source", None)
485 cube.attributes.pop("analysis_source", None)
486 cube.attributes.pop("history", None)
488 # Sort STASH code list.
489 stash_list = cube.attributes.get("um_stash_source")
490 if stash_list:
491 # Parse the string as a list, sort, then re-encode as a string.
492 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
495def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
496 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
498 The coordinate is converted and replaced if not. SLAMed LFRic data has this
499 issue, though the coordinate satisfies all the properties for a DimCoord.
500 Scalar time values are left as AuxCoords.
501 """
502 # This issue seems to come from iris's handling of NetCDF files where time
503 # always ends up as an AuxCoord.
504 if cube.coords("time"):
505 time_coord = cube.coord("time")
506 if (
507 not isinstance(time_coord, iris.coords.DimCoord)
508 and len(cube.coord_dims(time_coord)) == 1
509 ):
510 # Fudge the bounds to foil checking for strict monotonicity.
511 if time_coord.has_bounds(): 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true
512 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
513 time_coord.bounds = [
514 [
515 time_coord.bounds[i][0] + 1.0e-8 * float(i),
516 time_coord.bounds[i][1],
517 ]
518 for i in range(len(time_coord.bounds))
519 ]
520 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
521 return cube
524def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
525 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
527 This is necessary if comparing two models with different conventions --
528 for example, models where the prime meridian is defined as 0 deg or
529 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
530 so that it falls in this range. Checks are for near-180 bounds given
531 model data bounds may not extend exactly to 0. or 360.
532 Input cubes on non-rotated grid coordinates are not impacted.
533 """
534 try:
535 y, x = get_cube_yxcoordname(cube)
536 except ValueError:
537 # Don't modify non-spatial cubes.
538 return cube
540 long_coord = cube.coord(x)
541 # Wrap longitudes if rotated pole coordinates
542 coord_system = long_coord.coord_system
543 if x == "grid_longitude" and isinstance(
544 coord_system, iris.coord_systems.RotatedGeogCS
545 ):
546 long_points = long_coord.points.copy()
547 long_centre = np.median(long_points)
548 while long_centre < -175.0:
549 long_centre += 360.0
550 long_points += 360.0
551 while long_centre >= 175.0:
552 long_centre -= 360.0
553 long_points -= 360.0
554 long_coord.points = long_points
556 # Update coord bounds to be consistent with wrapping.
557 if long_coord.has_bounds():
558 long_coord.bounds = None
559 long_coord.guess_bounds()
561 return cube
564def _fix_no_spatial_coords_callback(cube: iris.cube.Cube):
565 import CSET.operators._utils as utils
567 # Don't modify spatial cubes that already have spatial dimensions
568 if utils.is_spatialdim(cube):
569 return cube
571 else:
572 # attempt to get lat/long from cube attributes
573 try:
574 lat_min = cube.attributes.get("geospatial_lat_min")
575 lat_max = cube.attributes.get("geospatial_lat_max")
576 lon_min = cube.attributes.get("geospatial_lon_min")
577 lon_max = cube.attributes.get("geospatial_lon_max")
579 lon_val = (lon_min + lon_max) / 2.0
580 lat_val = (lat_min + lat_max) / 2.0
582 lat_coord = iris.coords.DimCoord(
583 lat_val,
584 standard_name="latitude",
585 units="degrees_north",
586 var_name="latitude",
587 coord_system=iris.coord_systems.GeogCS(6371229.0),
588 circular=True,
589 )
591 lon_coord = iris.coords.DimCoord(
592 lon_val,
593 standard_name="longitude",
594 units="degrees_east",
595 var_name="longitude",
596 coord_system=iris.coord_systems.GeogCS(6371229.0),
597 circular=True,
598 )
600 cube.add_aux_coord(lat_coord)
601 cube.add_aux_coord(lon_coord)
602 return cube
604 # if lat/long are not in attributes, then return cube unchanged:
605 except TypeError:
606 return cube
609def _fix_spatial_coords_callback(cube: iris.cube.Cube):
610 """Check latitude and longitude coordinates name.
612 This is necessary as some models define their grid as on rotated
613 'grid_latitude' and 'grid_longitude' coordinates while others define
614 the grid on non-rotated 'latitude' and 'longitude'.
615 Cube dimensions need to be made consistent to avoid recipe failures,
616 particularly where comparing multiple input models with differing spatial
617 coordinates.
618 """
619 # Check if cube is spatial.
620 if not is_spatialdim(cube):
621 # Don't modify non-spatial cubes.
622 return
624 # Get spatial coords and dimension index.
625 y_name, x_name = get_cube_yxcoordname(cube)
626 ny = get_cube_coordindex(cube, y_name)
627 nx = get_cube_coordindex(cube, x_name)
629 # Remove spatial coords bounds if erroneous values detected.
630 # Aims to catch some errors in input coord bounds by setting
631 # invalid threshold of 10000.0
632 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
633 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
634 by_max = np.max(np.abs(cube.coord(y_name).bounds))
635 if bx_max > 10000.0 or by_max > 10000.0:
636 cube.coord(x_name).bounds = None
637 cube.coord(y_name).bounds = None
639 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
640 # [latitude, longitude] for instances where rotated_pole=90.0
641 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
642 coord_system = cube.coord("grid_latitude").coord_system
643 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
644 if pole_lat == 90.0: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true
645 lats = cube.coord("grid_latitude").points
646 lons = cube.coord("grid_longitude").points
648 cube.remove_coord("grid_latitude")
649 cube.add_dim_coord(
650 iris.coords.DimCoord(
651 lats,
652 standard_name="latitude",
653 var_name="latitude",
654 units="degrees",
655 coord_system=iris.coord_systems.GeogCS(6371229.0),
656 circular=True,
657 ),
658 ny,
659 )
660 y_name = "latitude"
661 cube.remove_coord("grid_longitude")
662 cube.add_dim_coord(
663 iris.coords.DimCoord(
664 lons,
665 standard_name="longitude",
666 var_name="longitude",
667 units="degrees",
668 coord_system=iris.coord_systems.GeogCS(6371229.0),
669 circular=True,
670 ),
671 nx,
672 )
673 x_name = "longitude"
675 # Create additional AuxCoord [grid_latitude, grid_longitude] with
676 # rotated pole attributes for cases with [lat, lon] inputs
677 if y_name in ["latitude"] and cube.coord(y_name).units in [
678 "degrees",
679 "degrees_north",
680 "degrees_south",
681 ]:
682 # Add grid_latitude AuxCoord
683 if "grid_latitude" not in [
684 coord.name() for coord in cube.coords(dim_coords=False)
685 ]:
686 cube.add_aux_coord(
687 iris.coords.AuxCoord(
688 cube.coord(y_name).points,
689 var_name="grid_latitude",
690 units="degrees",
691 ),
692 ny,
693 )
694 # Ensure input latitude DimCoord has CoordSystem
695 # This attribute is sometimes lost on iris.save
696 if not cube.coord(y_name).coord_system:
697 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
699 if x_name in ["longitude"] and cube.coord(x_name).units in [
700 "degrees",
701 "degrees_west",
702 "degrees_east",
703 ]:
704 # Add grid_longitude AuxCoord
705 if "grid_longitude" not in [
706 coord.name() for coord in cube.coords(dim_coords=False)
707 ]:
708 cube.add_aux_coord(
709 iris.coords.AuxCoord(
710 cube.coord(x_name).points,
711 var_name="grid_longitude",
712 units="degrees",
713 ),
714 nx,
715 )
717 # Ensure input longitude DimCoord has CoordSystem
718 # This attribute is sometimes lost on iris.save
719 if not cube.coord(x_name).coord_system:
720 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
723def _fix_pressure_coord_callback(cube: iris.cube.Cube):
724 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
726 This problem was raised because the AIFS model data from ECMWF
727 defines the pressure coordinate with the name "pressure_level" rather
728 than compliant CF coordinate names.
730 Additionally, set the units of pressure to be hPa to be consistent with the UM,
731 and approach the coordinates in a unified way.
732 """
733 for coord in cube.dim_coords:
734 if coord.name() in ["pressure_level", "pressure_levels"]:
735 coord.rename("pressure")
737 if coord.name() == "pressure":
738 if str(cube.coord("pressure").units) != "hPa":
739 cube.coord("pressure").convert_units("hPa")
742def _fix_um_radtime(cube: iris.cube.Cube):
743 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
745 This callback does not have any effect for output diagnostics with
746 timestamps exactly 00 or 30 minutes past the hour. Only radiation
747 diagnostics are checked.
748 Note this callback does not interpolate the data in time, only adjust
749 timestamps to sit on the hour to enable time-to-time difference plotting
750 with models which may output radiation data on the hour.
751 """
752 try:
753 if cube.attributes["STASH"] in [
754 "m01s01i207",
755 "m01s01i208",
756 "m01s02i205",
757 "m01s02i201",
758 "m01s01i207",
759 "m01s02i207",
760 "m01s01i235",
761 ]:
762 time_coord = cube.coord("time")
764 # Convert time points to datetime objects
765 time_unit = time_coord.units
766 time_points = time_unit.num2date(time_coord.points)
767 # Skip if times don't need fixing.
768 if time_points[0].minute == 0 and time_points[0].second == 0:
769 return
770 if time_points[0].minute == 30 and time_points[0].second == 0: 770 ↛ 771line 770 didn't jump to line 771 because the condition on line 770 was never true
771 return
773 # Subtract time difference from the hour from each time point
774 n_minute = time_points[0].minute
775 n_second = time_points[0].second
776 # If times closer to next hour, compute difference to add on to following hour
777 if n_minute > 30:
778 n_minute = n_minute - 60
779 # Compute new diagnostic time stamp
780 new_time_points = (
781 time_points
782 - datetime.timedelta(minutes=n_minute)
783 - datetime.timedelta(seconds=n_second)
784 )
786 # Convert back to numeric values using the original time unit.
787 new_time_values = time_unit.date2num(new_time_points)
789 # Replace the time coordinate with updated values.
790 time_coord.points = new_time_values
792 # Recompute forecast_period with corrected values.
793 if cube.coord("forecast_period"): 793 ↛ exitline 793 didn't return from function '_fix_um_radtime' because the condition on line 793 was always true
794 fcst_prd_points = cube.coord("forecast_period").points
795 new_fcst_points = (
796 time_unit.num2date(fcst_prd_points)
797 - datetime.timedelta(minutes=n_minute)
798 - datetime.timedelta(seconds=n_second)
799 )
800 cube.coord("forecast_period").points = time_unit.date2num(
801 new_fcst_points
802 )
803 except KeyError:
804 pass
807def _fix_cell_methods(cube: iris.cube.Cube):
808 """To fix the assumed cell_methods in accumulation STASH from UM.
810 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
811 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
812 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
813 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
814 "sum". This callback changes "mean" cube attribute cell_method to "sum",
815 enabling the cell_method constraint on reading to select correct input.
816 """
817 # Shift "mean" cell_method to "sum" for selected UM inputs.
818 if cube.attributes.get("STASH") in [
819 "m01s21i104",
820 "m01s04i201",
821 "m01s04i202",
822 "m01s05i201",
823 "m01s05i202",
824 ]:
825 # Check if input cell_method contains "mean" time-processing.
826 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 826 ↛ exitline 826 didn't return from function '_fix_cell_methods' because the condition on line 826 was always true
827 # Retrieve interval and any comment information.
828 for cell_method in cube.cell_methods:
829 interval_str = cell_method.intervals
830 comment_str = cell_method.comments
832 # Remove input aggregation method.
833 cube.cell_methods = ()
835 # Replace "mean" with "sum" cell_method to indicate aggregation.
836 cube.add_cell_method(
837 iris.coords.CellMethod(
838 method="sum",
839 coords="time",
840 intervals=interval_str,
841 comments=comment_str,
842 )
843 )
846def _convert_cube_units_callback(cube: iris.cube.Cube):
847 """Adjust diagnostic units for specific variables.
849 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
850 converted here to mm hr-1.
852 Visibility diagnostics are converted here from m to km to improve output
853 formatting.
854 """
855 # Convert precipitation diagnostic units if required.
856 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
857 if any("surface_microphysical" in name for name in varnames):
858 if cube.units == "kg m-2 s-1":
859 _log_once(
860 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1",
861 level=logging.DEBUG,
862 )
863 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
864 # This is a 1:1 conversion, so we just change the units.
865 cube.units = "mm s-1"
866 # Convert the units to per hour.
867 cube.convert_units("mm hr-1")
868 elif cube.units == "kg m-2": 868 ↛ 878line 868 didn't jump to line 878 because the condition on line 868 was always true
869 _log_once(
870 "Converting precipitation amount units from kg m-2 to mm",
871 level=logging.DEBUG,
872 )
873 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
874 # This is a 1:1 conversion, so we just change the units.
875 cube.units = "mm"
877 # Convert visibility diagnostic units if required.
878 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
879 if any("visibility" in name for name in varnames):
880 if cube.units == "m": 880 ↛ 885line 880 didn't jump to line 885 because the condition on line 880 was always true
881 _log_once("Converting visibility units m to km.", level=logging.DEBUG)
882 # Convert the units to km.
883 cube.convert_units("km")
885 return cube
888def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
889 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
890 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
891 if any("cloud_base_altitude" in name for name in varnames):
892 # Mask cube where set > 144kft to catch default 144.35695538058164
893 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0)
896def _fix_um_winds(cubes: iris.cube.CubeList):
897 """To make winds from the UM consistent with those from LFRic.
899 Diagnostics of wind are not always consistent between the UM
900 and LFric. Here, winds from the UM are adjusted to make them i
901 consistent with LFRic.
902 """
903 # Check whether we have components of the wind identified by STASH,
904 # (so this will apply only to cubes from the UM), but not the
905 # wind speed and calculate it if it is missing. Note that
906 # this will be biased low in general because the components will mostly
907 # be time averages. For simplicity, we do this only if there is just one
908 # cube of a component. A more complicated approach would be to consider
909 # the cell methods, but it may not be warranted.
910 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
911 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
912 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
913 try:
914 if cubes.extract(u_constr) and cubes.extract(v_constr): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true
915 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
916 _add_wind_speed_um(cubes)
917 # Convert winds in the UM to be relative to true east and true north.
918 _convert_wind_true_dirn_um(cubes)
919 except (KeyError, AttributeError):
920 pass
923def _add_wind_speed_um(cubes: iris.cube.CubeList):
924 """Add windspeeds to cubes from the UM."""
925 wspd10 = (
926 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
927 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
928 ) ** 0.5
929 wspd10.attributes["STASH"] = "m01s03i227"
930 wspd10.standard_name = "wind_speed"
931 wspd10.long_name = "wind_speed_at_10m"
932 cubes.append(wspd10)
935def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
936 """To convert winds to true directions.
938 Convert from the components relative to the grid to true directions.
939 This functionality only handles the simplest case.
940 """
941 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
942 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
943 for u, v in zip(u_grids, v_grids, strict=True):
944 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
945 u.data = true_u.core_data()
946 v.data = true_v.core_data()
949def _normalise_var0_varname(cube: iris.cube.Cube):
950 """Fix varnames for consistency to allow merging.
952 Some model data netCDF sometimes have a coordinate name end in
953 "_0" etc, where duplicate coordinates of same name are defined but
954 with different attributes. This can be inconsistently managed in
955 different model inputs and can cause cubes to fail to merge.
956 """
957 for coord in cube.coords():
958 if coord.var_name and coord.var_name.endswith("_0"):
959 coord.var_name = coord.var_name.removesuffix("_0")
960 if coord.var_name and coord.var_name.endswith("_1"):
961 coord.var_name = coord.var_name.removesuffix("_1")
962 if coord.var_name and coord.var_name.endswith("_2"): 962 ↛ 963line 962 didn't jump to line 963 because the condition on line 962 was never true
963 coord.var_name = coord.var_name.removesuffix("_2")
964 if coord.var_name and coord.var_name.endswith("_3"): 964 ↛ 965line 964 didn't jump to line 965 because the condition on line 964 was never true
965 coord.var_name = coord.var_name.removesuffix("_3")
967 if cube.var_name and cube.var_name.endswith("_0"):
968 cube.var_name = cube.var_name.removesuffix("_0")
971def _proleptic_gregorian_fix(cube: iris.cube.Cube):
972 """Convert the calendars of time units to use a standard calendar."""
973 try:
974 time_coord = cube.coord("time")
975 if time_coord.units.calendar == "proleptic_gregorian":
976 logging.debug(
977 "Changing proleptic Gregorian calendar to standard calendar for %s",
978 repr(time_coord.units),
979 )
980 time_coord.units = time_coord.units.change_calendar("standard")
981 except iris.exceptions.CoordinateNotFoundError:
982 pass
985def _lfric_time_callback(cube: iris.cube.Cube):
986 """Fix time coordinate metadata if missing dimensions.
988 Some model data does not contain forecast_reference_time or forecast_period as
989 expected coordinates, and so we cannot aggregate over case studies without this
990 metadata. This callback fixes these issues.
992 This callback also ensures all time coordinates are referenced as hours since
993 1970-01-01 00:00:00 for consistency across different model inputs.
995 Notes
996 -----
997 Some parts of the code have been adapted from Paul Earnshaw's scripts.
998 """
999 # Construct forecast_reference time if it doesn't exist.
1000 try:
1001 tcoord = cube.coord("time")
1002 # Set time coordinate to common basis "hours since 1970"
1003 try:
1004 tcoord.convert_units("hours since 1970-01-01 00:00:00")
1005 except ValueError:
1006 logging.warning("Unrecognised base time unit: %s", tcoord.units)
1008 if not cube.coords("forecast_reference_time"):
1009 try:
1010 init_time = datetime.datetime.fromisoformat(
1011 tcoord.attributes["time_origin"]
1012 )
1013 frt_point = tcoord.units.date2num(init_time)
1014 frt_coord = iris.coords.AuxCoord(
1015 frt_point,
1016 units=tcoord.units,
1017 standard_name="forecast_reference_time",
1018 long_name="forecast_reference_time",
1019 )
1020 cube.add_aux_coord(frt_coord)
1021 except KeyError:
1022 logging.warning(
1023 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
1024 )
1026 # Remove time_origin to allow multiple case studies to merge.
1027 tcoord.attributes.pop("time_origin", None)
1029 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
1030 if not cube.coords("forecast_period"):
1031 try:
1032 # Create array of forecast lead times.
1033 init_coord = cube.coord("forecast_reference_time")
1034 init_time_points_in_tcoord_units = tcoord.units.date2num(
1035 init_coord.units.num2date(init_coord.points)
1036 )
1037 lead_times = tcoord.points - init_time_points_in_tcoord_units
1039 # Get unit for lead time from time coordinate's unit.
1040 # Convert all lead time to hours for consistency between models.
1041 if "seconds" in str(tcoord.units): 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true
1042 lead_times = lead_times / 3600.0
1043 units = "hours"
1044 elif "hours" in str(tcoord.units): 1044 ↛ 1047line 1044 didn't jump to line 1047 because the condition on line 1044 was always true
1045 units = "hours"
1046 else:
1047 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
1049 # Create lead time coordinate.
1050 lead_time_coord = iris.coords.AuxCoord(
1051 lead_times,
1052 standard_name="forecast_period",
1053 long_name="forecast_period",
1054 units=units,
1055 )
1057 # Associate lead time coordinate with time dimension.
1058 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1059 except iris.exceptions.CoordinateNotFoundError:
1060 logging.warning(
1061 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1062 )
1063 except iris.exceptions.CoordinateNotFoundError:
1064 logging.warning("No time coordinate on cube.")
1067def _lfric_forecast_period_callback(cube: iris.cube.Cube):
1068 """Check forecast_period name and units."""
1069 try:
1070 coord = cube.coord("forecast_period")
1071 if coord.units != "hours":
1072 cube.coord("forecast_period").convert_units("hours")
1073 if not coord.standard_name:
1074 coord.standard_name = "forecast_period"
1075 except iris.exceptions.CoordinateNotFoundError:
1076 pass
1079def _fix_no_time_coords_callback(cube: iris.cube.Cube):
1080 """Add dummy time coord to process cubes that don't have sequence coord."""
1081 # Only add if time coordinate does not exist.
1082 if not cube.coords("time"):
1083 cube.add_aux_coord(
1084 iris.coords.DimCoord(
1085 0, standard_name="time", units="hours since 0001-01-01 00:00:00"
1086 )
1087 )
1089 return cube
1092def _normalise_ML_varname(cube: iris.cube.Cube):
1093 """Fix plev variable names to standard names."""
1094 if cube.coords("pressure"):
1095 if cube.name() == "x_wind":
1096 cube.long_name = "zonal_wind_at_pressure_levels"
1097 if cube.name() == "y_wind":
1098 cube.long_name = "meridional_wind_at_pressure_levels"
1099 if cube.name() == "air_temperature":
1100 cube.long_name = "temperature_at_pressure_levels"
1101 if cube.name() == "specific_humidity": 1101 ↛ 1102line 1101 didn't jump to line 1102 because the condition on line 1101 was never true
1102 cube.long_name = (
1103 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging"
1104 )