Coverage for src / CSET / operators / read.py: 90%
369 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 12:47 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 12:47 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import iris
27import iris.coord_systems
28import iris.coords
29import iris.cube
30import iris.exceptions
31import iris.util
32import numpy as np
33from iris.analysis.cartography import rotate_pole, rotate_winds
35from CSET._common import iter_maybe
36from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
37from CSET.operators._utils import get_cube_yxcoordname
40class NoDataError(FileNotFoundError):
41 """Error that no data has been loaded."""
44def read_cube(
45 file_paths: list[str] | str,
46 constraint: iris.Constraint = None,
47 model_names: list[str] | str | None = None,
48 subarea_type: str = None,
49 subarea_extent: list[float] = None,
50 **kwargs,
51) -> iris.cube.Cube:
52 """Read a single cube from files.
54 Read operator that takes a path string (can include shell-style glob
55 patterns), and loads the cube matching the constraint. If any paths point to
56 directory, all the files contained within are loaded.
58 Ensemble data can also be loaded. If it has a realization coordinate
59 already, it will be directly used. If not, it will have its member number
60 guessed from the filename, based on one of several common patterns. For
61 example the pattern *emXX*, where XX is the realization.
63 Deterministic data will be loaded with a realization of 0, allowing it to be
64 processed in the same way as ensemble data.
66 Arguments
67 ---------
68 file_paths: str | list[str]
69 Path or paths to where .pp/.nc files are located
70 constraint: iris.Constraint | iris.ConstraintCombination, optional
71 Constraints to filter data by. Defaults to unconstrained.
72 model_names: str | list[str], optional
73 Names of the models that correspond to respective paths in file_paths.
74 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
75 Whether to constrain data by model relative coordinates or real world
76 coordinates.
77 subarea_extent: list, optional
78 List of coordinates to constraint data by, in order lower latitude,
79 upper latitude, lower longitude, upper longitude.
81 Returns
82 -------
83 cubes: iris.cube.Cube
84 Cube loaded
86 Raises
87 ------
88 FileNotFoundError
89 If the provided path does not exist
90 ValueError
91 If the constraint doesn't produce a single cube.
92 """
93 cubes = read_cubes(
94 file_paths=file_paths,
95 constraint=constraint,
96 model_names=model_names,
97 subarea_type=subarea_type,
98 subarea_extent=subarea_extent,
99 )
100 # Check filtered cubes is a CubeList containing one cube.
101 if len(cubes) == 1:
102 return cubes[0]
103 else:
104 raise ValueError(
105 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
106 )
109def read_cubes(
110 file_paths: list[str] | str,
111 constraint: iris.Constraint | None = None,
112 model_names: str | list[str] | None = None,
113 subarea_type: str = None,
114 subarea_extent: list = None,
115 **kwargs,
116) -> iris.cube.CubeList:
117 """Read cubes from files.
119 Read operator that takes a path string (can include shell-style glob
120 patterns), and loads the cubes matching the constraint. If any paths point
121 to directory, all the files contained within are loaded.
123 Ensemble data can also be loaded. If it has a realization coordinate
124 already, it will be directly used. If not, it will have its member number
125 guessed from the filename, based on one of several common patterns. For
126 example the pattern *emXX*, where XX is the realization.
128 Deterministic data will be loaded with a realization of 0, allowing it to be
129 processed in the same way as ensemble data.
131 Data output by XIOS (such as LFRic) has its per-file metadata removed so
132 that the cubes merge across files.
134 Arguments
135 ---------
136 file_paths: str | list[str]
137 Path or paths to where .pp/.nc files are located. Can include globs.
138 constraint: iris.Constraint | iris.ConstraintCombination, optional
139 Constraints to filter data by. Defaults to unconstrained.
140 model_names: str | list[str], optional
141 Names of the models that correspond to respective paths in file_paths.
142 subarea_type: str, optional
143 Whether to constrain data by model relative coordinates or real world
144 coordinates.
145 subarea_extent: list[float], optional
146 List of coordinates to constraint data by, in order lower latitude,
147 upper latitude, lower longitude, upper longitude.
149 Returns
150 -------
151 cubes: iris.cube.CubeList
152 Cubes loaded after being merged and concatenated.
154 Raises
155 ------
156 FileNotFoundError
157 If the provided path does not exist
158 """
159 # Get iterable of paths. Each path corresponds to 1 model.
160 paths = iter_maybe(file_paths)
161 model_names = iter_maybe(model_names)
163 # Check we have appropriate number of model names.
164 if model_names != (None,) and len(model_names) != len(paths):
165 raise ValueError(
166 f"The number of model names ({len(model_names)}) should equal "
167 f"the number of paths given ({len(paths)})."
168 )
170 # Load the data for each model into a CubeList per model.
171 model_cubes = (
172 _load_model(path, name, constraint)
173 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
174 )
176 # Split out first model's cubes and mark it as the base for comparisons.
177 cubes = next(model_cubes)
178 for cube in cubes:
179 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
180 cube.attributes["cset_comparison_base"] = 1
182 # Load the rest of the models.
183 cubes.extend(itertools.chain.from_iterable(model_cubes))
185 # Unify time units so different case studies can merge.
186 iris.util.unify_time_units(cubes)
188 # Select sub region.
189 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
190 # Merge and concatenate cubes now metadata has been fixed.
191 cubes = cubes.merge()
192 cubes = cubes.concatenate()
194 # Ensure dimension coordinates are bounded.
195 for cube in cubes:
196 for dim_coord in cube.coords(dim_coords=True):
197 # Iris can't guess the bounds of a scalar coordinate.
198 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
199 dim_coord.guess_bounds()
201 logging.info("Loaded cubes: %s", cubes)
202 if len(cubes) == 0:
203 raise NoDataError("No cubes loaded, check your constraints!")
204 return cubes
207def _load_model(
208 paths: str | list[str],
209 model_name: str | None,
210 constraint: iris.Constraint | None,
211) -> iris.cube.CubeList:
212 """Load a single model's data into a CubeList."""
213 input_files = _check_input_files(paths)
214 # If unset, a constraint of None lets everything be loaded.
215 logging.debug("Constraint: %s", constraint)
216 cubes = iris.load(
217 input_files, constraint, callback=_create_callback(is_ensemble=False)
218 )
219 # Make the UM's winds consistent with LFRic.
220 _fix_um_winds(cubes)
222 # Reload with ensemble handling if needed.
223 if _is_ensemble(cubes):
224 cubes = iris.load(
225 input_files, constraint, callback=_create_callback(is_ensemble=True)
226 )
228 # Add model_name attribute to each cube to make it available at any further
229 # step without needing to pass it as function parameter.
230 if model_name is not None:
231 for cube in cubes:
232 cube.attributes["model_name"] = model_name
233 return cubes
236def _check_input_files(input_paths: str | list[str]) -> list[Path]:
237 """Get an iterable of files to load, and check that they all exist.
239 Arguments
240 ---------
241 input_paths: list[str]
242 List of paths to input files or directories. The path may itself contain
243 glob patterns, but unlike in shells it will match directly first.
245 Returns
246 -------
247 list[Path]
248 A list of files to load.
250 Raises
251 ------
252 FileNotFoundError:
253 If the provided arguments don't resolve to at least one existing file.
254 """
255 files = []
256 for raw_filename in iter_maybe(input_paths):
257 # Match glob-like files first, if they exist.
258 raw_path = Path(raw_filename)
259 if raw_path.is_file():
260 files.append(raw_path)
261 else:
262 for input_path in glob.glob(raw_filename):
263 # Convert string paths into Path objects.
264 input_path = Path(input_path)
265 # Get the list of files in the directory, or use it directly.
266 if input_path.is_dir():
267 logging.debug("Checking directory '%s' for files", input_path)
268 files.extend(p for p in input_path.iterdir() if p.is_file())
269 else:
270 files.append(input_path)
272 files.sort()
273 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
274 if len(files) == 0:
275 raise FileNotFoundError(f"No files found for {input_paths}")
276 return files
279def _cutout_cubes(
280 cubes: iris.cube.CubeList,
281 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
282 subarea_extent: list[float, float, float, float],
283):
284 """Cut out a subarea from a CubeList."""
285 if subarea_type is None:
286 logging.debug("Subarea selection is disabled.")
287 return cubes
289 # If selected, cutout according to number of grid cells to trim from each edge.
290 cutout_cubes = iris.cube.CubeList()
291 # Find spatial coordinates
292 for cube in cubes:
293 # Find dimension coordinates.
294 lat_name, lon_name = get_cube_yxcoordname(cube)
296 # Compute cutout based on number of cells to trim from edges.
297 if subarea_type == "gridcells":
298 logging.debug(
299 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
300 subarea_extent[0],
301 subarea_extent[1],
302 subarea_extent[2],
303 subarea_extent[3],
304 )
305 lat_points = np.sort(cube.coord(lat_name).points)
306 lon_points = np.sort(cube.coord(lon_name).points)
307 # Define cutout region using user provided cell points.
308 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
309 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
311 # Compute cutout based on specified coordinate values.
312 elif subarea_type == "realworld" or subarea_type == "modelrelative":
313 # If not gridcells, cutout by requested geographic area,
314 logging.debug(
315 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
316 subarea_extent[0],
317 subarea_extent[1],
318 subarea_extent[2],
319 subarea_extent[3],
320 )
321 # Define cutout region using user provided coordinates.
322 lats = np.array(subarea_extent[0:2])
323 lons = np.array(subarea_extent[2:4])
324 # Ensure cutout longitudes are within +/- 180.0 bounds.
325 while lons[0] < -180.0:
326 lons += 360.0
327 while lons[1] > 180.0:
328 lons -= 360.0
329 # If the coordinate system is rotated we convert coordinates into
330 # model-relative coordinates to extract the appropriate cutout.
331 coord_system = cube.coord(lat_name).coord_system
332 if subarea_type == "realworld" and isinstance(
333 coord_system, iris.coord_systems.RotatedGeogCS
334 ):
335 lons, lats = rotate_pole(
336 lons,
337 lats,
338 pole_lon=coord_system.grid_north_pole_longitude,
339 pole_lat=coord_system.grid_north_pole_latitude,
340 )
341 else:
342 raise ValueError("Unknown subarea_type:", subarea_type)
344 # Do cutout and add to cutout_cubes.
345 intersection_args = {lat_name: lats, lon_name: lons}
346 logging.debug("Cutting out coords: %s", intersection_args)
347 try:
348 cutout_cubes.append(cube.intersection(**intersection_args))
349 except IndexError as err:
350 raise ValueError(
351 "Region cutout error. Check and update SUBAREA_EXTENT."
352 "Cutout region requested should be contained within data area. "
353 "Also check if cutout region requested is smaller than input grid spacing."
354 ) from err
356 return cutout_cubes
359def _is_ensemble(cubelist: iris.cube.CubeList) -> bool:
360 """Test if a CubeList is likely to be ensemble data.
362 If cubes either have a realization dimension, or there are multiple files
363 for the same time-step, we can assume it is ensemble data.
364 """
365 unique_cubes = set()
366 for cube in cubelist:
367 # Ignore realization of 0, as that is given to deterministic data.
368 if cube.coords("realization") and any(cube.coord("realization").points != 0):
369 return True
370 # Compare XML representation of cube structure check for duplicates.
371 cube_content = cube.xml()
372 if cube_content in unique_cubes:
373 logging.info("Ensemble data loaded.")
374 return True
375 else:
376 unique_cubes.add(cube_content)
377 logging.info("Deterministic data loaded.")
378 return False
381def _create_callback(is_ensemble: bool) -> callable:
382 """Compose together the needed callbacks into a single function."""
384 def callback(cube: iris.cube.Cube, field, filename: str):
385 if is_ensemble:
386 _ensemble_callback(cube, field, filename)
387 else:
388 _deterministic_callback(cube, field, filename)
390 _um_normalise_callback(cube, field, filename)
391 _lfric_normalise_callback(cube, field, filename)
392 _lfric_time_coord_fix_callback(cube, field, filename)
393 _normalise_var0_varname(cube)
394 _fix_spatial_coords_callback(cube)
395 _fix_pressure_coord_callback(cube)
396 _fix_um_radtime(cube)
397 _fix_cell_methods(cube)
398 _convert_cube_units_callback(cube)
399 _grid_longitude_fix_callback(cube)
400 _fix_lfric_cloud_base_altitude(cube)
401 _proleptic_gregorian_fix(cube)
402 _lfric_time_callback(cube)
403 _lfric_forecast_period_standard_name_callback(cube)
405 return callback
408def _ensemble_callback(cube, field, filename):
409 """Add a realization coordinate to a cube.
411 Uses the filename to add an ensemble member ('realization') to each cube.
412 Assumes data is formatted enuk_um_0XX/enukaa_pd0HH.pp where XX is the
413 ensemble member.
415 Arguments
416 ---------
417 cube: Cube
418 ensemble member cube
419 field
420 Raw data variable, unused.
421 filename: str
422 filename of ensemble member data
423 """
424 if not cube.coords("realization"):
425 if "em" in filename:
426 # Assuming format is *emXX*
427 loc = filename.find("em") + 2
428 member = np.int32(filename[loc : loc + 2])
429 else:
430 # Assuming raw fields files format is enuk_um_0XX/enukaa_pd0HH
431 member = np.int32(filename[-15:-13])
433 cube.add_aux_coord(iris.coords.AuxCoord(member, standard_name="realization"))
436def _deterministic_callback(cube, field, filename):
437 """Give deterministic cubes a realization of 0.
439 This means they can be handled in the same way as ensembles through the rest
440 of the code.
441 """
442 # Only add if realization coordinate does not exist.
443 if not cube.coords("realization"):
444 cube.add_aux_coord(
445 iris.coords.AuxCoord(np.int32(0), standard_name="realization", units="1")
446 )
449@functools.lru_cache(None)
450def _warn_once(msg):
451 """Print a warning message, skipping recent duplicates."""
452 logging.warning(msg)
455def _um_normalise_callback(cube: iris.cube.Cube, field, filename):
456 """Normalise UM STASH variable long names to LFRic variable names.
458 Note standard names will remain associated with cubes where different.
459 Long name will be used consistently in output filename and titles.
460 """
461 # Convert STASH to LFRic variable name
462 if "STASH" in cube.attributes:
463 stash = cube.attributes["STASH"]
464 try:
465 (name, grid) = STASH_TO_LFRIC[str(stash)]
466 cube.long_name = name
467 except KeyError:
468 # Don't change cubes with unknown stash codes.
469 _warn_once(
470 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update."
471 )
474def _lfric_normalise_callback(cube: iris.cube.Cube, field, filename):
475 """Normalise attributes that prevents LFRic cube from merging.
477 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
478 no relation to the data contained. These attributes are removed.
480 The um_stash_source is a list of STASH codes for when an LFRic field maps to
481 multiple UM fields, however it can be encoded in any order. This attribute
482 is sorted to prevent this. This attribute is only present in LFRic data that
483 has been converted to look like UM data.
484 """
485 # Remove unwanted attributes.
486 cube.attributes.pop("timeStamp", None)
487 cube.attributes.pop("uuid", None)
488 cube.attributes.pop("name", None)
490 # Sort STASH code list.
491 stash_list = cube.attributes.get("um_stash_source")
492 if stash_list:
493 # Parse the string as a list, sort, then re-encode as a string.
494 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
497def _lfric_time_coord_fix_callback(cube: iris.cube.Cube, field, filename):
498 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
500 The coordinate is converted and replaced if not. SLAMed LFRic data has this
501 issue, though the coordinate satisfies all the properties for a DimCoord.
502 Scalar time values are left as AuxCoords.
503 """
504 # This issue seems to come from iris's handling of NetCDF files where time
505 # always ends up as an AuxCoord.
506 if cube.coords("time"):
507 time_coord = cube.coord("time")
508 if (
509 not isinstance(time_coord, iris.coords.DimCoord)
510 and len(cube.coord_dims(time_coord)) == 1
511 ):
512 # Fudge the bounds to foil checking for strict monotonicity.
513 if time_coord.has_bounds(): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true
514 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
515 time_coord.bounds = [
516 [
517 time_coord.bounds[i][0] + 1.0e-8 * float(i),
518 time_coord.bounds[i][1],
519 ]
520 for i in range(len(time_coord.bounds))
521 ]
522 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
524 # Force single-valued coordinates to be scalar coordinates.
525 return iris.util.squeeze(cube)
528def _grid_longitude_fix_callback(cube: iris.cube.Cube):
529 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
531 This is necessary if comparing two models with different conventions --
532 for example, models where the prime meridian is defined as 0 deg or
533 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
534 so that it falls in this range. Checks are for near-180 bounds given
535 model data bounds may not extend exactly to 0. or 360.
536 Input cubes on non-rotated grid coordinates are not impacted.
537 """
538 import CSET.operators._utils as utils
540 try:
541 y, x = utils.get_cube_yxcoordname(cube)
542 except ValueError:
543 # Don't modify non-spatial cubes.
544 return cube
546 long_coord = cube.coord(x)
547 # Wrap longitudes if rotated pole coordinates
548 coord_system = long_coord.coord_system
549 if x == "grid_longitude" and isinstance(
550 coord_system, iris.coord_systems.RotatedGeogCS
551 ):
552 long_points = long_coord.points.copy()
553 long_centre = np.median(long_points)
554 while long_centre < -175.0:
555 long_centre += 360.0
556 long_points += 360.0
557 while long_centre >= 175.0:
558 long_centre -= 360.0
559 long_points -= 360.0
560 long_coord.points = long_points
562 # Update coord bounds to be consistent with wrapping.
563 if long_coord.has_bounds() and np.size(long_coord) > 1: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true
564 long_coord.bounds = None
565 long_coord.guess_bounds()
567 return cube
570def _fix_spatial_coords_callback(cube: iris.cube.Cube):
571 """Check latitude and longitude coordinates name.
573 This is necessary as some models define their grid as on rotated
574 'grid_latitude' and 'grid_longitude' coordinates while others define
575 the grid on non-rotated 'latitude' and 'longitude'.
576 Cube dimensions need to be made consistent to avoid recipe failures,
577 particularly where comparing multiple input models with differing spatial
578 coordinates.
579 """
580 import CSET.operators._utils as utils
582 # Check if cube is spatial.
583 if not utils.is_spatialdim(cube):
584 # Don't modify non-spatial cubes.
585 return cube
587 # Get spatial coords and dimension index.
588 y_name, x_name = utils.get_cube_yxcoordname(cube)
589 ny = utils.get_cube_coordindex(cube, y_name)
590 nx = utils.get_cube_coordindex(cube, x_name)
592 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
593 # [latitude, longitude] for instances where rotated_pole=90.0
594 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
595 coord_system = cube.coord("grid_latitude").coord_system
596 pole_lat = coord_system.grid_north_pole_latitude
597 if pole_lat == 90.0: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 lats = cube.coord("grid_latitude").points
599 lons = cube.coord("grid_longitude").points
601 cube.remove_coord("grid_latitude")
602 cube.add_dim_coord(
603 iris.coords.DimCoord(
604 lats,
605 standard_name="latitude",
606 var_name="latitude",
607 units="degrees",
608 coord_system=iris.coord_systems.GeogCS(6371229.0),
609 circular=True,
610 ),
611 ny,
612 )
613 y_name = "latitude"
614 cube.remove_coord("grid_longitude")
615 cube.add_dim_coord(
616 iris.coords.DimCoord(
617 lons,
618 standard_name="longitude",
619 var_name="longitude",
620 units="degrees",
621 coord_system=iris.coord_systems.GeogCS(6371229.0),
622 circular=True,
623 ),
624 nx,
625 )
626 x_name = "longitude"
628 # Create additional AuxCoord [grid_latitude, grid_longitude] with
629 # rotated pole attributes for cases with [lat, lon] inputs
630 if y_name in ["latitude"] and cube.coord(y_name).units in [
631 "degrees",
632 "degrees_north",
633 "degrees_south",
634 ]:
635 # Add grid_latitude AuxCoord
636 if "grid_latitude" not in [ 636 ↛ 649line 636 didn't jump to line 649 because the condition on line 636 was always true
637 coord.name() for coord in cube.coords(dim_coords=False)
638 ]:
639 cube.add_aux_coord(
640 iris.coords.AuxCoord(
641 cube.coord(y_name).points,
642 var_name="grid_latitude",
643 units="degrees",
644 ),
645 ny,
646 )
647 # Ensure input latitude DimCoord has CoordSystem
648 # This attribute is sometimes lost on iris.save
649 if not cube.coord(y_name).coord_system:
650 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
652 if x_name in ["longitude"] and cube.coord(x_name).units in [
653 "degrees",
654 "degrees_west",
655 "degrees_east",
656 ]:
657 # Add grid_longitude AuxCoord
658 if "grid_longitude" not in [ 658 ↛ 672line 658 didn't jump to line 672 because the condition on line 658 was always true
659 coord.name() for coord in cube.coords(dim_coords=False)
660 ]:
661 cube.add_aux_coord(
662 iris.coords.AuxCoord(
663 cube.coord(x_name).points,
664 var_name="grid_longitude",
665 units="degrees",
666 ),
667 nx,
668 )
670 # Ensure input longitude DimCoord has CoordSystem
671 # This attribute is sometimes lost on iris.save
672 if not cube.coord(x_name).coord_system:
673 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
676def _fix_pressure_coord_callback(cube: iris.cube.Cube):
677 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
679 This problem was raised because the AIFS model data from ECMWF
680 defines the pressure coordinate with the name "pressure_level" rather
681 than compliant CF coordinate names.
683 Additionally, set the units of pressure to be hPa to be consistent with the UM,
684 and approach the coordinates in a unified way.
685 """
686 for coord in cube.dim_coords:
687 if coord.name() in ["pressure_level", "pressure_levels"]:
688 coord.rename("pressure")
690 if coord.name() == "pressure":
691 if str(cube.coord("pressure").units) != "hPa":
692 cube.coord("pressure").convert_units("hPa")
695def _fix_um_radtime(cube: iris.cube.Cube):
696 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
698 This callback does not have any effect for output diagnostics with
699 timestamps exactly 00 or 30 minutes past the hour. Only radiation
700 diagnostics are checked.
701 Note this callback does not interpolate the data in time, only adjust
702 timestamps to sit on the hour to enable time-to-time difference plotting
703 with models which may output radiation data on the hour.
704 """
705 try:
706 if cube.attributes["STASH"] in [
707 "m01s01i207",
708 "m01s01i208",
709 "m01s02i205",
710 "m01s02i201",
711 "m01s01i207",
712 "m01s02i207",
713 "m01s01i235",
714 ]:
715 time_coord = cube.coord("time")
717 # Convert time points to datetime objects
718 time_unit = time_coord.units
719 time_points = time_unit.num2date(time_coord.points)
720 # Skip if times don't need fixing.
721 if time_points[0].minute == 0 and time_points[0].second == 0:
722 return
723 if time_points[0].minute == 30 and time_points[0].second == 0: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 return
726 # Subtract time difference from the hour from each time point
727 n_minute = time_points[0].minute
728 n_second = time_points[0].second
729 # If times closer to next hour, compute difference to add on to following hour
730 if n_minute > 30:
731 n_minute = n_minute - 60
732 # Compute new diagnostic time stamp
733 new_time_points = (
734 time_points
735 - datetime.timedelta(minutes=n_minute)
736 - datetime.timedelta(seconds=n_second)
737 )
739 # Convert back to numeric values using the original time unit.
740 new_time_values = time_unit.date2num(new_time_points)
742 # Replace the time coordinate with updated values.
743 time_coord.points = new_time_values
745 # Recompute forecast_period with corrected values.
746 if cube.coord("forecast_period"): 746 ↛ exitline 746 didn't return from function '_fix_um_radtime' because the condition on line 746 was always true
747 fcst_prd_points = cube.coord("forecast_period").points
748 new_fcst_points = (
749 time_unit.num2date(fcst_prd_points)
750 - datetime.timedelta(minutes=n_minute)
751 - datetime.timedelta(seconds=n_second)
752 )
753 cube.coord("forecast_period").points = time_unit.date2num(
754 new_fcst_points
755 )
756 except KeyError:
757 pass
760def _fix_cell_methods(cube: iris.cube.Cube):
761 """To fix the assumed cell_methods in accumulation STASH from UM.
763 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
764 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
765 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
766 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
767 "sum". This callback changes "mean" cube attribute cell_method to "sum",
768 enabling the cell_method constraint on reading to select correct input.
769 """
770 # Shift "mean" cell_method to "sum" for selected UM inputs.
771 if cube.attributes.get("STASH") in [
772 "m01s21i104",
773 "m01s04i201",
774 "m01s04i202",
775 "m01s05i201",
776 "m01s05i202",
777 ]:
778 # Check if input cell_method contains "mean" time-processing.
779 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 779 ↛ exitline 779 didn't return from function '_fix_cell_methods' because the condition on line 779 was always true
780 # Retrieve interval and any comment information.
781 for cell_method in cube.cell_methods:
782 interval_str = cell_method.intervals
783 comment_str = cell_method.comments
785 # Remove input aggregation method.
786 cube.cell_methods = ()
788 # Replace "mean" with "sum" cell_method to indicate aggregation.
789 cube.add_cell_method(
790 iris.coords.CellMethod(
791 method="sum",
792 coords="time",
793 intervals=interval_str,
794 comments=comment_str,
795 )
796 )
799def _convert_cube_units_callback(cube: iris.cube.Cube):
800 """Adjust diagnostic units for specific variables.
802 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
803 converted here to mm hr-1.
805 Visibility diagnostics are converted here from m to km to improve output
806 formatting.
807 """
808 # Convert precipitation diagnostic units if required.
809 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
810 if any("surface_microphysical" in name for name in varnames):
811 if cube.units == "kg m-2 s-1":
812 logging.debug(
813 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1"
814 )
815 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
816 # This is a 1:1 conversion, so we just change the units.
817 cube.units = "mm s-1"
818 # Convert the units to per hour.
819 cube.convert_units("mm hr-1")
820 elif cube.units == "kg m-2": 820 ↛ 827line 820 didn't jump to line 827 because the condition on line 820 was always true
821 logging.debug("Converting precipitation amount units from kg m-2 to mm")
822 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
823 # This is a 1:1 conversion, so we just change the units.
824 cube.units = "mm"
826 # Convert visibility diagnostic units if required.
827 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
828 if any("visibility" in name for name in varnames):
829 if cube.units == "m": 829 ↛ 834line 829 didn't jump to line 834 because the condition on line 829 was always true
830 logging.debug("Converting visibility units m to km.")
831 # Convert the units to km.
832 cube.convert_units("km")
834 return cube
837def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
838 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
839 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
840 if any("cloud_base_altitude" in name for name in varnames):
841 # Mask cube where set > 144kft to catch default 144.35695538058164
842 cube.data = np.ma.masked_array(cube.data)
843 cube.data[cube.data > 144.0] = np.ma.masked
846def _fix_um_winds(cubes: iris.cube.CubeList):
847 """To make winds from the UM consistent with those from LFRic.
849 Diagnostics of wind are not always consistent between the UM
850 and LFric. Here, winds from the UM are adjusted to make them i
851 consistent with LFRic.
852 """
853 # Check whether we have components of the wind identified by STASH,
854 # (so this will apply only to cubes from the UM), but not the
855 # wind speed and calculate it if it is missing. Note that
856 # this will be biased low in general because the components will mostly
857 # be time averages. For simplicity, we do this only if there is just one
858 # cube of a component. A more complicated approach would be to consider
859 # the cell methods, but it may not be warranted.
860 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
861 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
862 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
863 try:
864 if cubes.extract(u_constr) and cubes.extract(v_constr): 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true
865 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
866 _add_wind_speed_um(cubes)
867 # Convert winds in the UM to be relative to true east and true north.
868 _convert_wind_true_dirn_um(cubes)
869 except (KeyError, AttributeError):
870 pass
873def _add_wind_speed_um(cubes: iris.cube.CubeList):
874 """Add windspeeds to cubes from the UM."""
875 wspd10 = (
876 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
877 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
878 ) ** 0.5
879 wspd10.attributes["STASH"] = "m01s03i227"
880 wspd10.standard_name = "wind_speed"
881 wspd10.long_name = "wind_speed_at_10m"
882 cubes.append(wspd10)
885def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
886 """To convert winds to true directions.
888 Convert from the components relative to the grid to true directions.
889 This functionality only handles the simplest case.
890 """
891 u_grid = cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))
892 v_grid = cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))
893 true_u, true_v = rotate_winds(u_grid, v_grid, iris.coord_systems.GeogCS(6371229.0))
894 u_grid.data = true_u.data
895 v_grid.data = true_v.data
898def _normalise_var0_varname(cube: iris.cube.Cube):
899 """Fix varnames for consistency to allow merging.
901 Some model data netCDF sometimes have a coordinate name end in
902 "_0" etc, where duplicate coordinates of same name are defined but
903 with different attributes. This can be inconsistently managed in
904 different model inputs and can cause cubes to fail to merge.
905 """
906 for coord in cube.coords():
907 if coord.var_name and coord.var_name.endswith("_0"):
908 coord.var_name = coord.var_name.removesuffix("_0")
909 if coord.var_name and coord.var_name.endswith("_1"):
910 coord.var_name = coord.var_name.removesuffix("_1")
911 if coord.var_name and coord.var_name.endswith("_2"): 911 ↛ 912line 911 didn't jump to line 912 because the condition on line 911 was never true
912 coord.var_name = coord.var_name.removesuffix("_2")
913 if coord.var_name and coord.var_name.endswith("_3"): 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 coord.var_name = coord.var_name.removesuffix("_3")
916 if cube.var_name and cube.var_name.endswith("_0"):
917 cube.var_name = cube.var_name.removesuffix("_0")
920def _proleptic_gregorian_fix(cube: iris.cube.Cube):
921 """Convert the calendars of time units to use a standard calendar."""
922 try:
923 time_coord = cube.coord("time")
924 if time_coord.units.calendar == "proleptic_gregorian":
925 logging.debug(
926 "Changing proleptic Gregorian calendar to standard calendar for %s",
927 repr(time_coord.units),
928 )
929 time_coord.units = time_coord.units.change_calendar("standard")
930 except iris.exceptions.CoordinateNotFoundError:
931 pass
934def _lfric_time_callback(cube: iris.cube.Cube):
935 """Fix time coordinate metadata if missing dimensions.
937 Some model data does not contain forecast_reference_time or forecast_period as
938 expected coordinates, and so we cannot aggregate over case studies without this
939 metadata. This callback fixes these issues.
941 This callback also ensures all time coordinates are referenced as hours since
942 1970-01-01 00:00:00 for consistency across different model inputs.
944 Notes
945 -----
946 Some parts of the code have been adapted from Paul Earnshaw's scripts.
947 """
948 # Construct forecast_reference time if it doesn't exist.
949 try:
950 tcoord = cube.coord("time")
951 # Set time coordinate to common basis "hours since 1970"
952 try:
953 tcoord.convert_units("hours since 1970-01-01 00:00:00")
954 except ValueError:
955 logging.warning("Unrecognised base time unit: %s", tcoord.units)
957 if not cube.coords("forecast_reference_time"):
958 try:
959 init_time = datetime.datetime.fromisoformat(
960 tcoord.attributes["time_origin"]
961 )
962 frt_point = tcoord.units.date2num(init_time)
963 frt_coord = iris.coords.AuxCoord(
964 frt_point,
965 units=tcoord.units,
966 standard_name="forecast_reference_time",
967 long_name="forecast_reference_time",
968 )
969 cube.add_aux_coord(frt_coord)
970 except KeyError:
971 logging.warning(
972 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
973 )
975 # Remove time_origin to allow multiple case studies to merge.
976 tcoord.attributes.pop("time_origin", None)
978 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
979 if not cube.coords("forecast_period"):
980 try:
981 # Create array of forecast lead times.
982 init_coord = cube.coord("forecast_reference_time")
983 init_time_points_in_tcoord_units = tcoord.units.date2num(
984 init_coord.units.num2date(init_coord.points)
985 )
986 lead_times = tcoord.points - init_time_points_in_tcoord_units
988 # Get unit for lead time from time coordinate's unit.
989 # Convert all lead time to hours for consistency between models.
990 if "seconds" in str(tcoord.units): 990 ↛ 991line 990 didn't jump to line 991 because the condition on line 990 was never true
991 lead_times = lead_times / 3600.0
992 units = "hours"
993 elif "hours" in str(tcoord.units): 993 ↛ 996line 993 didn't jump to line 996 because the condition on line 993 was always true
994 units = "hours"
995 else:
996 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
998 # Create lead time coordinate.
999 lead_time_coord = iris.coords.AuxCoord(
1000 lead_times,
1001 standard_name="forecast_period",
1002 long_name="forecast_period",
1003 units=units,
1004 )
1006 # Associate lead time coordinate with time dimension.
1007 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
1008 except iris.exceptions.CoordinateNotFoundError:
1009 logging.warning(
1010 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
1011 )
1012 except iris.exceptions.CoordinateNotFoundError:
1013 logging.warning("No time coordinate on cube.")
1016def _lfric_forecast_period_standard_name_callback(cube: iris.cube.Cube):
1017 """Add forecast_period standard name if missing."""
1018 try:
1019 coord = cube.coord("forecast_period")
1020 if not coord.standard_name:
1021 coord.standard_name = "forecast_period"
1022 except iris.exceptions.CoordinateNotFoundError:
1023 pass