Coverage for src / CSET / operators / read.py: 90%
391 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:48 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import dask
27import iris
28import iris.coord_systems
29import iris.coords
30import iris.cube
31import iris.exceptions
32import iris.util
33import numpy as np
34from iris.analysis.cartography import rotate_pole, rotate_winds
36from CSET._common import iter_maybe
37from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
38from CSET.operators._utils import (
39 get_cube_coordindex,
40 get_cube_yxcoordname,
41 is_spatialdim,
42)
45class NoDataError(FileNotFoundError):
46 """Error that no data has been loaded."""
49def read_cube(
50 file_paths: list[str] | str,
51 constraint: iris.Constraint = None,
52 model_names: list[str] | str | None = None,
53 subarea_type: str = None,
54 subarea_extent: list[float] = None,
55 **kwargs,
56) -> iris.cube.Cube:
57 """Read a single cube from files.
59 Read operator that takes a path string (can include shell-style glob
60 patterns), and loads the cube matching the constraint. If any paths point to
61 directory, all the files contained within are loaded.
63 Ensemble data can also be loaded. If it has a realization coordinate
64 already, it will be directly used. If not, it will have its member number
65 guessed from the filename, based on one of several common patterns. For
66 example the pattern *emXX*, where XX is the realization.
68 Deterministic data will be loaded with a realization of 0, allowing it to be
69 processed in the same way as ensemble data.
71 Arguments
72 ---------
73 file_paths: str | list[str]
74 Path or paths to where .pp/.nc files are located
75 constraint: iris.Constraint | iris.ConstraintCombination, optional
76 Constraints to filter data by. Defaults to unconstrained.
77 model_names: str | list[str], optional
78 Names of the models that correspond to respective paths in file_paths.
79 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
80 Whether to constrain data by model relative coordinates or real world
81 coordinates.
82 subarea_extent: list, optional
83 List of coordinates to constraint data by, in order lower latitude,
84 upper latitude, lower longitude, upper longitude.
86 Returns
87 -------
88 cubes: iris.cube.Cube
89 Cube loaded
91 Raises
92 ------
93 FileNotFoundError
94 If the provided path does not exist
95 ValueError
96 If the constraint doesn't produce a single cube.
97 """
98 cubes = read_cubes(
99 file_paths=file_paths,
100 constraint=constraint,
101 model_names=model_names,
102 subarea_type=subarea_type,
103 subarea_extent=subarea_extent,
104 )
105 # Check filtered cubes is a CubeList containing one cube.
106 if len(cubes) == 1:
107 return cubes[0]
108 else:
109 raise ValueError(
110 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
111 )
114def read_cubes(
115 file_paths: list[str] | str,
116 constraint: iris.Constraint | None = None,
117 model_names: str | list[str] | None = None,
118 subarea_type: str = None,
119 subarea_extent: list = None,
120 **kwargs,
121) -> iris.cube.CubeList:
122 """Read cubes from files.
124 Read operator that takes a path string (can include shell-style glob
125 patterns), and loads the cubes matching the constraint. If any paths point
126 to directory, all the files contained within are loaded.
128 Ensemble data can also be loaded. If it has a realization coordinate
129 already, it will be directly used. If not, it will have its member number
130 guessed from the filename, based on one of several common patterns. For
131 example the pattern *emXX*, where XX is the realization.
133 Deterministic data will be loaded with a realization of 0, allowing it to be
134 processed in the same way as ensemble data.
136 Data output by XIOS (such as LFRic) has its per-file metadata removed so
137 that the cubes merge across files.
139 Arguments
140 ---------
141 file_paths: str | list[str]
142 Path or paths to where .pp/.nc files are located. Can include globs.
143 constraint: iris.Constraint | iris.ConstraintCombination, optional
144 Constraints to filter data by. Defaults to unconstrained.
145 model_names: str | list[str], optional
146 Names of the models that correspond to respective paths in file_paths.
147 subarea_type: str, optional
148 Whether to constrain data by model relative coordinates or real world
149 coordinates.
150 subarea_extent: list[float], optional
151 List of coordinates to constraint data by, in order lower latitude,
152 upper latitude, lower longitude, upper longitude.
154 Returns
155 -------
156 cubes: iris.cube.CubeList
157 Cubes loaded after being merged and concatenated.
159 Raises
160 ------
161 FileNotFoundError
162 If the provided path does not exist
163 """
164 # Get iterable of paths. Each path corresponds to 1 model.
165 paths = iter_maybe(file_paths)
166 model_names = iter_maybe(model_names)
168 # Check we have appropriate number of model names.
169 if model_names != (None,) and len(model_names) != len(paths):
170 raise ValueError(
171 f"The number of model names ({len(model_names)}) should equal "
172 f"the number of paths given ({len(paths)})."
173 )
175 # Load the data for each model into a CubeList per model.
176 model_cubes = (
177 _load_model(path, name, constraint)
178 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
179 )
181 # Split out first model's cubes and mark it as the base for comparisons.
182 cubes = next(model_cubes)
183 for cube in cubes:
184 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
185 cube.attributes["cset_comparison_base"] = 1
187 # Load the rest of the models.
188 cubes.extend(itertools.chain.from_iterable(model_cubes))
190 # Unify time units so different case studies can merge.
191 iris.util.unify_time_units(cubes)
193 # Select sub region.
194 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
196 # Merge and concatenate cubes now metadata has been fixed.
197 cubes = cubes.merge()
198 cubes = cubes.concatenate()
200 # Squeeze single valued coordinates into scalar coordinates.
201 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
203 # Ensure dimension coordinates are bounded.
204 for cube in cubes:
205 for dim_coord in cube.coords(dim_coords=True):
206 # Iris can't guess the bounds of a scalar coordinate.
207 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
208 dim_coord.guess_bounds()
210 logging.info("Loaded cubes: %s", cubes)
211 if len(cubes) == 0:
212 raise NoDataError("No cubes loaded, check your constraints!")
213 return cubes
216def _load_model(
217 paths: str | list[str],
218 model_name: str | None,
219 constraint: iris.Constraint | None,
220) -> iris.cube.CubeList:
221 """Load a single model's data into a CubeList."""
222 input_files = _check_input_files(paths)
223 # If unset, a constraint of None lets everything be loaded.
224 logging.debug("Constraint: %s", constraint)
225 cubes = iris.load(input_files, constraint, callback=_loading_callback)
226 # Make the UM's winds consistent with LFRic.
227 _fix_um_winds(cubes)
229 # Add model_name attribute to each cube to make it available at any further
230 # step without needing to pass it as function parameter.
231 if model_name is not None:
232 for cube in cubes:
233 cube.attributes["model_name"] = model_name
234 return cubes
237def _check_input_files(input_paths: str | list[str]) -> list[Path]:
238 """Get an iterable of files to load, and check that they all exist.
240 Arguments
241 ---------
242 input_paths: list[str]
243 List of paths to input files or directories. The path may itself contain
244 glob patterns, but unlike in shells it will match directly first.
246 Returns
247 -------
248 list[Path]
249 A list of files to load.
251 Raises
252 ------
253 FileNotFoundError:
254 If the provided arguments don't resolve to at least one existing file.
255 """
256 files = []
257 for raw_filename in iter_maybe(input_paths):
258 # Match glob-like files first, if they exist.
259 raw_path = Path(raw_filename)
260 if raw_path.is_file():
261 files.append(raw_path)
262 else:
263 for input_path in glob.glob(raw_filename):
264 # Convert string paths into Path objects.
265 input_path = Path(input_path)
266 # Get the list of files in the directory, or use it directly.
267 if input_path.is_dir():
268 logging.debug("Checking directory '%s' for files", input_path)
269 files.extend(p for p in input_path.iterdir() if p.is_file())
270 else:
271 files.append(input_path)
273 files.sort()
274 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
275 if len(files) == 0:
276 raise FileNotFoundError(f"No files found for {input_paths}")
277 return files
280def _cutout_cubes(
281 cubes: iris.cube.CubeList,
282 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
283 subarea_extent: list[float],
284):
285 """Cut out a subarea from a CubeList."""
286 if subarea_type is None:
287 logging.debug("Subarea selection is disabled.")
288 return cubes
290 # If selected, cutout according to number of grid cells to trim from each edge.
291 cutout_cubes = iris.cube.CubeList()
292 # Find spatial coordinates
293 for cube in cubes:
294 # Find dimension coordinates.
295 lat_name, lon_name = get_cube_yxcoordname(cube)
297 # Compute cutout based on number of cells to trim from edges.
298 if subarea_type == "gridcells":
299 logging.debug(
300 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
301 subarea_extent[0],
302 subarea_extent[1],
303 subarea_extent[2],
304 subarea_extent[3],
305 )
306 lat_points = np.sort(cube.coord(lat_name).points)
307 lon_points = np.sort(cube.coord(lon_name).points)
308 # Define cutout region using user provided cell points.
309 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
310 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
312 # Compute cutout based on specified coordinate values.
313 elif subarea_type == "realworld" or subarea_type == "modelrelative":
314 # If not gridcells, cutout by requested geographic area,
315 logging.debug(
316 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
317 subarea_extent[0],
318 subarea_extent[1],
319 subarea_extent[2],
320 subarea_extent[3],
321 )
322 # Define cutout region using user provided coordinates.
323 lats = np.array(subarea_extent[0:2])
324 lons = np.array(subarea_extent[2:4])
325 # Ensure cutout longitudes are within +/- 180.0 bounds.
326 while lons[0] < -180.0:
327 lons += 360.0
328 while lons[1] > 180.0:
329 lons -= 360.0
330 # If the coordinate system is rotated we convert coordinates into
331 # model-relative coordinates to extract the appropriate cutout.
332 coord_system = cube.coord(lat_name).coord_system
333 if subarea_type == "realworld" and isinstance(
334 coord_system, iris.coord_systems.RotatedGeogCS
335 ):
336 lons, lats = rotate_pole(
337 lons,
338 lats,
339 pole_lon=coord_system.grid_north_pole_longitude,
340 pole_lat=coord_system.grid_north_pole_latitude,
341 )
342 else:
343 raise ValueError("Unknown subarea_type:", subarea_type)
345 # Do cutout and add to cutout_cubes.
346 intersection_args = {lat_name: lats, lon_name: lons}
347 logging.debug("Cutting out coords: %s", intersection_args)
348 try:
349 cutout_cubes.append(cube.intersection(**intersection_args))
350 except IndexError as err:
351 raise ValueError(
352 "Region cutout error. Check and update SUBAREA_EXTENT."
353 "Cutout region requested should be contained within data area. "
354 "Also check if cutout region requested is smaller than input grid spacing."
355 ) from err
357 return cutout_cubes
360def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
361 """Compose together the needed callbacks into a single function."""
362 # Most callbacks operate in-place, but save the cube when returned!
363 _realization_callback(cube)
364 _um_normalise_callback(cube)
365 _lfric_normalise_callback(cube)
366 cube = _lfric_time_coord_fix_callback(cube)
367 _normalise_var0_varname(cube)
368 cube = _fix_no_spatial_coords_callback(cube)
369 _fix_spatial_coords_callback(cube)
370 _fix_pressure_coord_callback(cube)
371 _fix_um_radtime(cube)
372 _fix_cell_methods(cube)
373 cube = _convert_cube_units_callback(cube)
374 cube = _grid_longitude_fix_callback(cube)
375 _fix_lfric_cloud_base_altitude(cube)
376 _proleptic_gregorian_fix(cube)
377 _lfric_time_callback(cube)
378 _lfric_forecast_period_callback(cube)
379 cube = _fix_no_time_coords_callback(cube)
380 _normalise_ML_varname(cube)
381 return cube
384def _realization_callback(cube):
385 """Give deterministic cubes a realization of 0.
387 This means they can be handled in the same way as ensembles through the rest
388 of the code.
389 """
390 # Only add if realization coordinate does not exist.
391 if not cube.coords("realization"):
392 cube.add_aux_coord(
393 iris.coords.DimCoord(0, standard_name="realization", units="1")
394 )
397@functools.lru_cache(None)
398def _log_once(msg, level=logging.WARNING):
399 """Print a warning message, skipping recent duplicates."""
400 logging.log(level, msg)
403def _um_normalise_callback(cube: iris.cube.Cube):
404 """Normalise UM STASH variable long names to LFRic variable names.
406 Note standard names will remain associated with cubes where different.
407 Long name will be used consistently in output filename and titles.
408 """
409 # Convert STASH to LFRic variable name
410 if "STASH" in cube.attributes:
411 stash = cube.attributes["STASH"]
412 try:
413 (name, grid) = STASH_TO_LFRIC[str(stash)]
414 cube.long_name = name
415 except KeyError:
416 # Don't change cubes with unknown stash codes.
417 _log_once(
418 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update.",
419 level=logging.WARNING,
420 )
423def _lfric_normalise_callback(cube: iris.cube.Cube):
424 """Normalise attributes that prevents LFRic cube from merging.
426 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
427 no relation to the data contained. These attributes are removed.
429 The um_stash_source is a list of STASH codes for when an LFRic field maps to
430 multiple UM fields, however it can be encoded in any order. This attribute
431 is sorted to prevent this. This attribute is only present in LFRic data that
432 has been converted to look like UM data.
433 """
434 # Remove unwanted attributes.
435 cube.attributes.pop("timeStamp", None)
436 cube.attributes.pop("uuid", None)
437 cube.attributes.pop("name", None)
438 cube.attributes.pop("source", None)
439 cube.attributes.pop("analysis_source", None)
440 cube.attributes.pop("history", None)
442 # Sort STASH code list.
443 stash_list = cube.attributes.get("um_stash_source")
444 if stash_list:
445 # Parse the string as a list, sort, then re-encode as a string.
446 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
449def _lfric_time_coord_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
450 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
452 The coordinate is converted and replaced if not. SLAMed LFRic data has this
453 issue, though the coordinate satisfies all the properties for a DimCoord.
454 Scalar time values are left as AuxCoords.
455 """
456 # This issue seems to come from iris's handling of NetCDF files where time
457 # always ends up as an AuxCoord.
458 if cube.coords("time"):
459 time_coord = cube.coord("time")
460 if (
461 not isinstance(time_coord, iris.coords.DimCoord)
462 and len(cube.coord_dims(time_coord)) == 1
463 ):
464 # Fudge the bounds to foil checking for strict monotonicity.
465 if time_coord.has_bounds(): 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
467 time_coord.bounds = [
468 [
469 time_coord.bounds[i][0] + 1.0e-8 * float(i),
470 time_coord.bounds[i][1],
471 ]
472 for i in range(len(time_coord.bounds))
473 ]
474 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
475 return cube
478def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
479 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
481 This is necessary if comparing two models with different conventions --
482 for example, models where the prime meridian is defined as 0 deg or
483 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
484 so that it falls in this range. Checks are for near-180 bounds given
485 model data bounds may not extend exactly to 0. or 360.
486 Input cubes on non-rotated grid coordinates are not impacted.
487 """
488 try:
489 y, x = get_cube_yxcoordname(cube)
490 except ValueError:
491 # Don't modify non-spatial cubes.
492 return cube
494 long_coord = cube.coord(x)
495 # Wrap longitudes if rotated pole coordinates
496 coord_system = long_coord.coord_system
497 if x == "grid_longitude" and isinstance(
498 coord_system, iris.coord_systems.RotatedGeogCS
499 ):
500 long_points = long_coord.points.copy()
501 long_centre = np.median(long_points)
502 while long_centre < -175.0:
503 long_centre += 360.0
504 long_points += 360.0
505 while long_centre >= 175.0:
506 long_centre -= 360.0
507 long_points -= 360.0
508 long_coord.points = long_points
510 # Update coord bounds to be consistent with wrapping.
511 if long_coord.has_bounds():
512 long_coord.bounds = None
513 long_coord.guess_bounds()
515 return cube
518def _fix_no_spatial_coords_callback(cube: iris.cube.Cube):
519 import CSET.operators._utils as utils
521 # Don't modify spatial cubes that already have spatial dimensions
522 if utils.is_spatialdim(cube):
523 return cube
525 else:
526 # attempt to get lat/long from cube attributes
527 try:
528 lat_min = cube.attributes.get("geospatial_lat_min")
529 lat_max = cube.attributes.get("geospatial_lat_max")
530 lon_min = cube.attributes.get("geospatial_lon_min")
531 lon_max = cube.attributes.get("geospatial_lon_max")
533 lon_val = (lon_min + lon_max) / 2.0
534 lat_val = (lat_min + lat_max) / 2.0
536 lat_coord = iris.coords.DimCoord(
537 lat_val,
538 standard_name="latitude",
539 units="degrees_north",
540 var_name="latitude",
541 coord_system=iris.coord_systems.GeogCS(6371229.0),
542 circular=True,
543 )
545 lon_coord = iris.coords.DimCoord(
546 lon_val,
547 standard_name="longitude",
548 units="degrees_east",
549 var_name="longitude",
550 coord_system=iris.coord_systems.GeogCS(6371229.0),
551 circular=True,
552 )
554 cube.add_aux_coord(lat_coord)
555 cube.add_aux_coord(lon_coord)
556 return cube
558 # if lat/long are not in attributes, then return cube unchanged:
559 except TypeError:
560 return cube
563def _fix_spatial_coords_callback(cube: iris.cube.Cube):
564 """Check latitude and longitude coordinates name.
566 This is necessary as some models define their grid as on rotated
567 'grid_latitude' and 'grid_longitude' coordinates while others define
568 the grid on non-rotated 'latitude' and 'longitude'.
569 Cube dimensions need to be made consistent to avoid recipe failures,
570 particularly where comparing multiple input models with differing spatial
571 coordinates.
572 """
573 # Check if cube is spatial.
574 if not is_spatialdim(cube):
575 # Don't modify non-spatial cubes.
576 return
578 # Get spatial coords and dimension index.
579 y_name, x_name = get_cube_yxcoordname(cube)
580 ny = get_cube_coordindex(cube, y_name)
581 nx = get_cube_coordindex(cube, x_name)
583 # Remove spatial coords bounds if erroneous values detected.
584 # Aims to catch some errors in input coord bounds by setting
585 # invalid threshold of 10000.0
586 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
587 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
588 by_max = np.max(np.abs(cube.coord(y_name).bounds))
589 if bx_max > 10000.0 or by_max > 10000.0:
590 cube.coord(x_name).bounds = None
591 cube.coord(y_name).bounds = None
593 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
594 # [latitude, longitude] for instances where rotated_pole=90.0
595 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
596 coord_system = cube.coord("grid_latitude").coord_system
597 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
598 if pole_lat == 90.0: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 lats = cube.coord("grid_latitude").points
600 lons = cube.coord("grid_longitude").points
602 cube.remove_coord("grid_latitude")
603 cube.add_dim_coord(
604 iris.coords.DimCoord(
605 lats,
606 standard_name="latitude",
607 var_name="latitude",
608 units="degrees",
609 coord_system=iris.coord_systems.GeogCS(6371229.0),
610 circular=True,
611 ),
612 ny,
613 )
614 y_name = "latitude"
615 cube.remove_coord("grid_longitude")
616 cube.add_dim_coord(
617 iris.coords.DimCoord(
618 lons,
619 standard_name="longitude",
620 var_name="longitude",
621 units="degrees",
622 coord_system=iris.coord_systems.GeogCS(6371229.0),
623 circular=True,
624 ),
625 nx,
626 )
627 x_name = "longitude"
629 # Create additional AuxCoord [grid_latitude, grid_longitude] with
630 # rotated pole attributes for cases with [lat, lon] inputs
631 if y_name in ["latitude"] and cube.coord(y_name).units in [
632 "degrees",
633 "degrees_north",
634 "degrees_south",
635 ]:
636 # Add grid_latitude AuxCoord
637 if "grid_latitude" not in [ 637 ↛ 650line 637 didn't jump to line 650 because the condition on line 637 was always true
638 coord.name() for coord in cube.coords(dim_coords=False)
639 ]:
640 cube.add_aux_coord(
641 iris.coords.AuxCoord(
642 cube.coord(y_name).points,
643 var_name="grid_latitude",
644 units="degrees",
645 ),
646 ny,
647 )
648 # Ensure input latitude DimCoord has CoordSystem
649 # This attribute is sometimes lost on iris.save
650 if not cube.coord(y_name).coord_system:
651 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
653 if x_name in ["longitude"] and cube.coord(x_name).units in [
654 "degrees",
655 "degrees_west",
656 "degrees_east",
657 ]:
658 # Add grid_longitude AuxCoord
659 if "grid_longitude" not in [ 659 ↛ 673line 659 didn't jump to line 673 because the condition on line 659 was always true
660 coord.name() for coord in cube.coords(dim_coords=False)
661 ]:
662 cube.add_aux_coord(
663 iris.coords.AuxCoord(
664 cube.coord(x_name).points,
665 var_name="grid_longitude",
666 units="degrees",
667 ),
668 nx,
669 )
671 # Ensure input longitude DimCoord has CoordSystem
672 # This attribute is sometimes lost on iris.save
673 if not cube.coord(x_name).coord_system:
674 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
677def _fix_pressure_coord_callback(cube: iris.cube.Cube):
678 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
680 This problem was raised because the AIFS model data from ECMWF
681 defines the pressure coordinate with the name "pressure_level" rather
682 than compliant CF coordinate names.
684 Additionally, set the units of pressure to be hPa to be consistent with the UM,
685 and approach the coordinates in a unified way.
686 """
687 for coord in cube.dim_coords:
688 if coord.name() in ["pressure_level", "pressure_levels"]:
689 coord.rename("pressure")
691 if coord.name() == "pressure":
692 if str(cube.coord("pressure").units) != "hPa":
693 cube.coord("pressure").convert_units("hPa")
696def _fix_um_radtime(cube: iris.cube.Cube):
697 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
699 This callback does not have any effect for output diagnostics with
700 timestamps exactly 00 or 30 minutes past the hour. Only radiation
701 diagnostics are checked.
702 Note this callback does not interpolate the data in time, only adjust
703 timestamps to sit on the hour to enable time-to-time difference plotting
704 with models which may output radiation data on the hour.
705 """
706 try:
707 if cube.attributes["STASH"] in [
708 "m01s01i207",
709 "m01s01i208",
710 "m01s02i205",
711 "m01s02i201",
712 "m01s01i207",
713 "m01s02i207",
714 "m01s01i235",
715 ]:
716 time_coord = cube.coord("time")
718 # Convert time points to datetime objects
719 time_unit = time_coord.units
720 time_points = time_unit.num2date(time_coord.points)
721 # Skip if times don't need fixing.
722 if time_points[0].minute == 0 and time_points[0].second == 0:
723 return
724 if time_points[0].minute == 30 and time_points[0].second == 0: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 return
727 # Subtract time difference from the hour from each time point
728 n_minute = time_points[0].minute
729 n_second = time_points[0].second
730 # If times closer to next hour, compute difference to add on to following hour
731 if n_minute > 30:
732 n_minute = n_minute - 60
733 # Compute new diagnostic time stamp
734 new_time_points = (
735 time_points
736 - datetime.timedelta(minutes=n_minute)
737 - datetime.timedelta(seconds=n_second)
738 )
740 # Convert back to numeric values using the original time unit.
741 new_time_values = time_unit.date2num(new_time_points)
743 # Replace the time coordinate with updated values.
744 time_coord.points = new_time_values
746 # Recompute forecast_period with corrected values.
747 if cube.coord("forecast_period"): 747 ↛ exitline 747 didn't return from function '_fix_um_radtime' because the condition on line 747 was always true
748 fcst_prd_points = cube.coord("forecast_period").points
749 new_fcst_points = (
750 time_unit.num2date(fcst_prd_points)
751 - datetime.timedelta(minutes=n_minute)
752 - datetime.timedelta(seconds=n_second)
753 )
754 cube.coord("forecast_period").points = time_unit.date2num(
755 new_fcst_points
756 )
757 except KeyError:
758 pass
761def _fix_cell_methods(cube: iris.cube.Cube):
762 """To fix the assumed cell_methods in accumulation STASH from UM.
764 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
765 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
766 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
767 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
768 "sum". This callback changes "mean" cube attribute cell_method to "sum",
769 enabling the cell_method constraint on reading to select correct input.
770 """
771 # Shift "mean" cell_method to "sum" for selected UM inputs.
772 if cube.attributes.get("STASH") in [
773 "m01s21i104",
774 "m01s04i201",
775 "m01s04i202",
776 "m01s05i201",
777 "m01s05i202",
778 ]:
779 # Check if input cell_method contains "mean" time-processing.
780 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 780 ↛ exitline 780 didn't return from function '_fix_cell_methods' because the condition on line 780 was always true
781 # Retrieve interval and any comment information.
782 for cell_method in cube.cell_methods:
783 interval_str = cell_method.intervals
784 comment_str = cell_method.comments
786 # Remove input aggregation method.
787 cube.cell_methods = ()
789 # Replace "mean" with "sum" cell_method to indicate aggregation.
790 cube.add_cell_method(
791 iris.coords.CellMethod(
792 method="sum",
793 coords="time",
794 intervals=interval_str,
795 comments=comment_str,
796 )
797 )
800def _convert_cube_units_callback(cube: iris.cube.Cube):
801 """Adjust diagnostic units for specific variables.
803 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
804 converted here to mm hr-1.
806 Visibility diagnostics are converted here from m to km to improve output
807 formatting.
808 """
809 # Convert precipitation diagnostic units if required.
810 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
811 if any("surface_microphysical" in name for name in varnames):
812 if cube.units == "kg m-2 s-1":
813 _log_once(
814 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1",
815 level=logging.DEBUG,
816 )
817 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
818 # This is a 1:1 conversion, so we just change the units.
819 cube.units = "mm s-1"
820 # Convert the units to per hour.
821 cube.convert_units("mm hr-1")
822 elif cube.units == "kg m-2": 822 ↛ 832line 822 didn't jump to line 832 because the condition on line 822 was always true
823 _log_once(
824 "Converting precipitation amount units from kg m-2 to mm",
825 level=logging.DEBUG,
826 )
827 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
828 # This is a 1:1 conversion, so we just change the units.
829 cube.units = "mm"
831 # Convert visibility diagnostic units if required.
832 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
833 if any("visibility" in name for name in varnames):
834 if cube.units == "m": 834 ↛ 839line 834 didn't jump to line 839 because the condition on line 834 was always true
835 _log_once("Converting visibility units m to km.", level=logging.DEBUG)
836 # Convert the units to km.
837 cube.convert_units("km")
839 return cube
842def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
843 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
844 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
845 if any("cloud_base_altitude" in name for name in varnames):
846 # Mask cube where set > 144kft to catch default 144.35695538058164
847 cube.data = dask.array.ma.masked_greater(cube.core_data(), 144.0)
850def _fix_um_winds(cubes: iris.cube.CubeList):
851 """To make winds from the UM consistent with those from LFRic.
853 Diagnostics of wind are not always consistent between the UM
854 and LFric. Here, winds from the UM are adjusted to make them i
855 consistent with LFRic.
856 """
857 # Check whether we have components of the wind identified by STASH,
858 # (so this will apply only to cubes from the UM), but not the
859 # wind speed and calculate it if it is missing. Note that
860 # this will be biased low in general because the components will mostly
861 # be time averages. For simplicity, we do this only if there is just one
862 # cube of a component. A more complicated approach would be to consider
863 # the cell methods, but it may not be warranted.
864 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
865 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
866 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
867 try:
868 if cubes.extract(u_constr) and cubes.extract(v_constr): 868 ↛ 869line 868 didn't jump to line 869 because the condition on line 868 was never true
869 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
870 _add_wind_speed_um(cubes)
871 # Convert winds in the UM to be relative to true east and true north.
872 _convert_wind_true_dirn_um(cubes)
873 except (KeyError, AttributeError):
874 pass
877def _add_wind_speed_um(cubes: iris.cube.CubeList):
878 """Add windspeeds to cubes from the UM."""
879 wspd10 = (
880 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
881 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
882 ) ** 0.5
883 wspd10.attributes["STASH"] = "m01s03i227"
884 wspd10.standard_name = "wind_speed"
885 wspd10.long_name = "wind_speed_at_10m"
886 cubes.append(wspd10)
889def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
890 """To convert winds to true directions.
892 Convert from the components relative to the grid to true directions.
893 This functionality only handles the simplest case.
894 """
895 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
896 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
897 for u, v in zip(u_grids, v_grids, strict=True):
898 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
899 u.data = true_u.core_data()
900 v.data = true_v.core_data()
903def _normalise_var0_varname(cube: iris.cube.Cube):
904 """Fix varnames for consistency to allow merging.
906 Some model data netCDF sometimes have a coordinate name end in
907 "_0" etc, where duplicate coordinates of same name are defined but
908 with different attributes. This can be inconsistently managed in
909 different model inputs and can cause cubes to fail to merge.
910 """
911 for coord in cube.coords():
912 if coord.var_name and coord.var_name.endswith("_0"):
913 coord.var_name = coord.var_name.removesuffix("_0")
914 if coord.var_name and coord.var_name.endswith("_1"):
915 coord.var_name = coord.var_name.removesuffix("_1")
916 if coord.var_name and coord.var_name.endswith("_2"): 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true
917 coord.var_name = coord.var_name.removesuffix("_2")
918 if coord.var_name and coord.var_name.endswith("_3"): 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true
919 coord.var_name = coord.var_name.removesuffix("_3")
921 if cube.var_name and cube.var_name.endswith("_0"):
922 cube.var_name = cube.var_name.removesuffix("_0")
925def _proleptic_gregorian_fix(cube: iris.cube.Cube):
926 """Convert the calendars of time units to use a standard calendar."""
927 try:
928 time_coord = cube.coord("time")
929 if time_coord.units.calendar == "proleptic_gregorian":
930 logging.debug(
931 "Changing proleptic Gregorian calendar to standard calendar for %s",
932 repr(time_coord.units),
933 )
934 time_coord.units = time_coord.units.change_calendar("standard")
935 except iris.exceptions.CoordinateNotFoundError:
936 pass
939def _lfric_time_callback(cube: iris.cube.Cube):
940 """Fix time coordinate metadata if missing dimensions.
942 Some model data does not contain forecast_reference_time or forecast_period as
943 expected coordinates, and so we cannot aggregate over case studies without this
944 metadata. This callback fixes these issues.
946 This callback also ensures all time coordinates are referenced as hours since
947 1970-01-01 00:00:00 for consistency across different model inputs.
949 Notes
950 -----
951 Some parts of the code have been adapted from Paul Earnshaw's scripts.
952 """
953 # Construct forecast_reference time if it doesn't exist.
954 try:
955 tcoord = cube.coord("time")
956 # Set time coordinate to common basis "hours since 1970"
957 try:
958 tcoord.convert_units("hours since 1970-01-01 00:00:00")
959 except ValueError:
960 logging.warning("Unrecognised base time unit: %s", tcoord.units)
962 if not cube.coords("forecast_reference_time"):
963 try:
964 init_time = datetime.datetime.fromisoformat(
965 tcoord.attributes["time_origin"]
966 )
967 frt_point = tcoord.units.date2num(init_time)
968 frt_coord = iris.coords.AuxCoord(
969 frt_point,
970 units=tcoord.units,
971 standard_name="forecast_reference_time",
972 long_name="forecast_reference_time",
973 )
974 cube.add_aux_coord(frt_coord)
975 except KeyError:
976 logging.warning(
977 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
978 )
980 # Remove time_origin to allow multiple case studies to merge.
981 tcoord.attributes.pop("time_origin", None)
983 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
984 if not cube.coords("forecast_period"):
985 try:
986 # Create array of forecast lead times.
987 init_coord = cube.coord("forecast_reference_time")
988 init_time_points_in_tcoord_units = tcoord.units.date2num(
989 init_coord.units.num2date(init_coord.points)
990 )
991 lead_times = tcoord.points - init_time_points_in_tcoord_units
993 # Get unit for lead time from time coordinate's unit.
994 # Convert all lead time to hours for consistency between models.
995 if "seconds" in str(tcoord.units): 995 ↛ 996line 995 didn't jump to line 996 because the condition on line 995 was never true
996 lead_times = lead_times / 3600.0
997 units = "hours"
998 elif "hours" in str(tcoord.units): 998 ↛ 1001line 998 didn't jump to line 1001 because the condition on line 998 was always true
999 units = "hours"
1000 else:
1001 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
1003 # Create lead time coordinate.
1004 lead_time_coord = iris.coords.AuxCoord(
1005 lead_times,
1006 standard_name="forecast_period",
1007 long_name="forecast_period",
1008 units=units,
1009 )
1011 # Associate lead time coordinate with time dimension.
1012 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1013 except iris.exceptions.CoordinateNotFoundError:
1014 logging.warning(
1015 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1016 )
1017 except iris.exceptions.CoordinateNotFoundError:
1018 logging.warning("No time coordinate on cube.")
1021def _lfric_forecast_period_callback(cube: iris.cube.Cube):
1022 """Check forecast_period name and units."""
1023 try:
1024 coord = cube.coord("forecast_period")
1025 if coord.units != "hours":
1026 cube.coord("forecast_period").convert_units("hours")
1027 if not coord.standard_name:
1028 coord.standard_name = "forecast_period"
1029 except iris.exceptions.CoordinateNotFoundError:
1030 pass
1033def _fix_no_time_coords_callback(cube: iris.cube.Cube):
1034 """Add dummy time coord to process cubes that don't have sequence coord."""
1035 # Only add if time coordinate does not exist.
1036 if not cube.coords("time"):
1037 cube.add_aux_coord(
1038 iris.coords.DimCoord(
1039 0, standard_name="time", units="hours since 1970-01-01 00:00:00"
1040 )
1041 )
1043 return cube
1046def _normalise_ML_varname(cube: iris.cube.Cube):
1047 """Fix plev variable names to standard names."""
1048 if cube.coords("pressure"):
1049 if cube.name() == "x_wind":
1050 cube.long_name = "zonal_wind_at_pressure_levels"
1051 if cube.name() == "y_wind":
1052 cube.long_name = "meridional_wind_at_pressure_levels"
1053 if cube.name() == "air_temperature":
1054 cube.long_name = "temperature_at_pressure_levels"
1055 if cube.name() == "specific_humidity": 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true
1056 cube.long_name = (
1057 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging"
1058 )