Coverage for src / CSET / operators / read.py: 90%
360 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 14:31 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for reading various types of files from disk."""
17import ast
18import datetime
19import functools
20import glob
21import itertools
22import logging
23from pathlib import Path
24from typing import Literal
26import iris
27import iris.coord_systems
28import iris.coords
29import iris.cube
30import iris.exceptions
31import iris.util
32import numpy as np
33from iris.analysis.cartography import rotate_pole, rotate_winds
35from CSET._common import iter_maybe
36from CSET.operators._stash_to_lfric import STASH_TO_LFRIC
37from CSET.operators._utils import (
38 get_cube_coordindex,
39 get_cube_yxcoordname,
40 is_spatialdim,
41)
44class NoDataError(FileNotFoundError):
45 """Error that no data has been loaded."""
48def read_cube(
49 file_paths: list[str] | str,
50 constraint: iris.Constraint = None,
51 model_names: list[str] | str | None = None,
52 subarea_type: str = None,
53 subarea_extent: list[float] = None,
54 **kwargs,
55) -> iris.cube.Cube:
56 """Read a single cube from files.
58 Read operator that takes a path string (can include shell-style glob
59 patterns), and loads the cube matching the constraint. If any paths point to
60 directory, all the files contained within are loaded.
62 Ensemble data can also be loaded. If it has a realization coordinate
63 already, it will be directly used. If not, it will have its member number
64 guessed from the filename, based on one of several common patterns. For
65 example the pattern *emXX*, where XX is the realization.
67 Deterministic data will be loaded with a realization of 0, allowing it to be
68 processed in the same way as ensemble data.
70 Arguments
71 ---------
72 file_paths: str | list[str]
73 Path or paths to where .pp/.nc files are located
74 constraint: iris.Constraint | iris.ConstraintCombination, optional
75 Constraints to filter data by. Defaults to unconstrained.
76 model_names: str | list[str], optional
77 Names of the models that correspond to respective paths in file_paths.
78 subarea_type: "gridcells" | "modelrelative" | "realworld", optional
79 Whether to constrain data by model relative coordinates or real world
80 coordinates.
81 subarea_extent: list, optional
82 List of coordinates to constraint data by, in order lower latitude,
83 upper latitude, lower longitude, upper longitude.
85 Returns
86 -------
87 cubes: iris.cube.Cube
88 Cube loaded
90 Raises
91 ------
92 FileNotFoundError
93 If the provided path does not exist
94 ValueError
95 If the constraint doesn't produce a single cube.
96 """
97 cubes = read_cubes(
98 file_paths=file_paths,
99 constraint=constraint,
100 model_names=model_names,
101 subarea_type=subarea_type,
102 subarea_extent=subarea_extent,
103 )
104 # Check filtered cubes is a CubeList containing one cube.
105 if len(cubes) == 1:
106 return cubes[0]
107 else:
108 raise ValueError(
109 f"Constraint doesn't produce single cube: {constraint}\n{cubes}"
110 )
113def read_cubes(
114 file_paths: list[str] | str,
115 constraint: iris.Constraint | None = None,
116 model_names: str | list[str] | None = None,
117 subarea_type: str = None,
118 subarea_extent: list = None,
119 **kwargs,
120) -> iris.cube.CubeList:
121 """Read cubes from files.
123 Read operator that takes a path string (can include shell-style glob
124 patterns), and loads the cubes matching the constraint. If any paths point
125 to directory, all the files contained within are loaded.
127 Ensemble data can also be loaded. If it has a realization coordinate
128 already, it will be directly used. If not, it will have its member number
129 guessed from the filename, based on one of several common patterns. For
130 example the pattern *emXX*, where XX is the realization.
132 Deterministic data will be loaded with a realization of 0, allowing it to be
133 processed in the same way as ensemble data.
135 Data output by XIOS (such as LFRic) has its per-file metadata removed so
136 that the cubes merge across files.
138 Arguments
139 ---------
140 file_paths: str | list[str]
141 Path or paths to where .pp/.nc files are located. Can include globs.
142 constraint: iris.Constraint | iris.ConstraintCombination, optional
143 Constraints to filter data by. Defaults to unconstrained.
144 model_names: str | list[str], optional
145 Names of the models that correspond to respective paths in file_paths.
146 subarea_type: str, optional
147 Whether to constrain data by model relative coordinates or real world
148 coordinates.
149 subarea_extent: list[float], optional
150 List of coordinates to constraint data by, in order lower latitude,
151 upper latitude, lower longitude, upper longitude.
153 Returns
154 -------
155 cubes: iris.cube.CubeList
156 Cubes loaded after being merged and concatenated.
158 Raises
159 ------
160 FileNotFoundError
161 If the provided path does not exist
162 """
163 # Get iterable of paths. Each path corresponds to 1 model.
164 paths = iter_maybe(file_paths)
165 model_names = iter_maybe(model_names)
167 # Check we have appropriate number of model names.
168 if model_names != (None,) and len(model_names) != len(paths):
169 raise ValueError(
170 f"The number of model names ({len(model_names)}) should equal "
171 f"the number of paths given ({len(paths)})."
172 )
174 # Load the data for each model into a CubeList per model.
175 model_cubes = (
176 _load_model(path, name, constraint)
177 for path, name in itertools.zip_longest(paths, model_names, fillvalue=None)
178 )
180 # Split out first model's cubes and mark it as the base for comparisons.
181 cubes = next(model_cubes)
182 for cube in cubes:
183 # Use 1 to indicate True, as booleans can't be saved in NetCDF attributes.
184 cube.attributes["cset_comparison_base"] = 1
186 # Load the rest of the models.
187 cubes.extend(itertools.chain.from_iterable(model_cubes))
189 # Unify time units so different case studies can merge.
190 iris.util.unify_time_units(cubes)
192 # Select sub region.
193 cubes = _cutout_cubes(cubes, subarea_type, subarea_extent)
195 # Merge and concatenate cubes now metadata has been fixed.
196 cubes = cubes.merge()
197 cubes = cubes.concatenate()
199 # Squeeze single valued coordinates into scalar coordinates.
200 cubes = iris.cube.CubeList(iris.util.squeeze(cube) for cube in cubes)
202 # Ensure dimension coordinates are bounded.
203 for cube in cubes:
204 for dim_coord in cube.coords(dim_coords=True):
205 # Iris can't guess the bounds of a scalar coordinate.
206 if not dim_coord.has_bounds() and dim_coord.shape[0] > 1:
207 dim_coord.guess_bounds()
209 logging.info("Loaded cubes: %s", cubes)
210 if len(cubes) == 0:
211 raise NoDataError("No cubes loaded, check your constraints!")
212 return cubes
215def _load_model(
216 paths: str | list[str],
217 model_name: str | None,
218 constraint: iris.Constraint | None,
219) -> iris.cube.CubeList:
220 """Load a single model's data into a CubeList."""
221 input_files = _check_input_files(paths)
222 # If unset, a constraint of None lets everything be loaded.
223 logging.debug("Constraint: %s", constraint)
224 cubes = iris.load(input_files, constraint, callback=_loading_callback)
225 # Make the UM's winds consistent with LFRic.
226 _fix_um_winds(cubes)
228 # Add model_name attribute to each cube to make it available at any further
229 # step without needing to pass it as function parameter.
230 if model_name is not None:
231 for cube in cubes:
232 cube.attributes["model_name"] = model_name
233 return cubes
236def _check_input_files(input_paths: str | list[str]) -> list[Path]:
237 """Get an iterable of files to load, and check that they all exist.
239 Arguments
240 ---------
241 input_paths: list[str]
242 List of paths to input files or directories. The path may itself contain
243 glob patterns, but unlike in shells it will match directly first.
245 Returns
246 -------
247 list[Path]
248 A list of files to load.
250 Raises
251 ------
252 FileNotFoundError:
253 If the provided arguments don't resolve to at least one existing file.
254 """
255 files = []
256 for raw_filename in iter_maybe(input_paths):
257 # Match glob-like files first, if they exist.
258 raw_path = Path(raw_filename)
259 if raw_path.is_file():
260 files.append(raw_path)
261 else:
262 for input_path in glob.glob(raw_filename):
263 # Convert string paths into Path objects.
264 input_path = Path(input_path)
265 # Get the list of files in the directory, or use it directly.
266 if input_path.is_dir():
267 logging.debug("Checking directory '%s' for files", input_path)
268 files.extend(p for p in input_path.iterdir() if p.is_file())
269 else:
270 files.append(input_path)
272 files.sort()
273 logging.info("Loading files:\n%s", "\n".join(str(path) for path in files))
274 if len(files) == 0:
275 raise FileNotFoundError(f"No files found for {input_paths}")
276 return files
279def _cutout_cubes(
280 cubes: iris.cube.CubeList,
281 subarea_type: Literal["gridcells", "realworld", "modelrelative"] | None,
282 subarea_extent: list[float, float, float, float],
283):
284 """Cut out a subarea from a CubeList."""
285 if subarea_type is None:
286 logging.debug("Subarea selection is disabled.")
287 return cubes
289 # If selected, cutout according to number of grid cells to trim from each edge.
290 cutout_cubes = iris.cube.CubeList()
291 # Find spatial coordinates
292 for cube in cubes:
293 # Find dimension coordinates.
294 lat_name, lon_name = get_cube_yxcoordname(cube)
296 # Compute cutout based on number of cells to trim from edges.
297 if subarea_type == "gridcells":
298 logging.debug(
299 "User requested LowerTrim: %s LeftTrim: %s UpperTrim: %s RightTrim: %s",
300 subarea_extent[0],
301 subarea_extent[1],
302 subarea_extent[2],
303 subarea_extent[3],
304 )
305 lat_points = np.sort(cube.coord(lat_name).points)
306 lon_points = np.sort(cube.coord(lon_name).points)
307 # Define cutout region using user provided cell points.
308 lats = [lat_points[subarea_extent[0]], lat_points[-subarea_extent[2] - 1]]
309 lons = [lon_points[subarea_extent[1]], lon_points[-subarea_extent[3] - 1]]
311 # Compute cutout based on specified coordinate values.
312 elif subarea_type == "realworld" or subarea_type == "modelrelative":
313 # If not gridcells, cutout by requested geographic area,
314 logging.debug(
315 "User requested LLat: %s ULat: %s LLon: %s ULon: %s",
316 subarea_extent[0],
317 subarea_extent[1],
318 subarea_extent[2],
319 subarea_extent[3],
320 )
321 # Define cutout region using user provided coordinates.
322 lats = np.array(subarea_extent[0:2])
323 lons = np.array(subarea_extent[2:4])
324 # Ensure cutout longitudes are within +/- 180.0 bounds.
325 while lons[0] < -180.0:
326 lons += 360.0
327 while lons[1] > 180.0:
328 lons -= 360.0
329 # If the coordinate system is rotated we convert coordinates into
330 # model-relative coordinates to extract the appropriate cutout.
331 coord_system = cube.coord(lat_name).coord_system
332 if subarea_type == "realworld" and isinstance(
333 coord_system, iris.coord_systems.RotatedGeogCS
334 ):
335 lons, lats = rotate_pole(
336 lons,
337 lats,
338 pole_lon=coord_system.grid_north_pole_longitude,
339 pole_lat=coord_system.grid_north_pole_latitude,
340 )
341 else:
342 raise ValueError("Unknown subarea_type:", subarea_type)
344 # Do cutout and add to cutout_cubes.
345 intersection_args = {lat_name: lats, lon_name: lons}
346 logging.debug("Cutting out coords: %s", intersection_args)
347 try:
348 cutout_cubes.append(cube.intersection(**intersection_args))
349 except IndexError as err:
350 raise ValueError(
351 "Region cutout error. Check and update SUBAREA_EXTENT."
352 "Cutout region requested should be contained within data area. "
353 "Also check if cutout region requested is smaller than input grid spacing."
354 ) from err
356 return cutout_cubes
359def _loading_callback(cube: iris.cube.Cube, field, filename: str) -> iris.cube.Cube:
360 """Compose together the needed callbacks into a single function."""
361 # Most callbacks operate in-place, but save the cube when returned!
362 _realization_callback(cube, field, filename)
363 _um_normalise_callback(cube, field, filename)
364 _lfric_normalise_callback(cube, field, filename)
365 cube = _lfric_time_coord_fix_callback(cube, field, filename)
366 _normalise_var0_varname(cube)
367 _fix_spatial_coords_callback(cube)
368 _fix_pressure_coord_callback(cube)
369 _fix_um_radtime(cube)
370 _fix_cell_methods(cube)
371 cube = _convert_cube_units_callback(cube)
372 cube = _grid_longitude_fix_callback(cube)
373 _fix_lfric_cloud_base_altitude(cube)
374 _proleptic_gregorian_fix(cube)
375 _lfric_time_callback(cube)
376 _lfric_forecast_period_standard_name_callback(cube)
377 _normalise_ML_varname(cube)
378 return cube
381def _realization_callback(cube, field, filename):
382 """Give deterministic cubes a realization of 0.
384 This means they can be handled in the same way as ensembles through the rest
385 of the code.
386 """
387 # Only add if realization coordinate does not exist.
388 if not cube.coords("realization"):
389 cube.add_aux_coord(
390 iris.coords.DimCoord(0, standard_name="realization", units="1")
391 )
394@functools.lru_cache(None)
395def _warn_once(msg):
396 """Print a warning message, skipping recent duplicates."""
397 logging.warning(msg)
400def _um_normalise_callback(cube: iris.cube.Cube, field, filename):
401 """Normalise UM STASH variable long names to LFRic variable names.
403 Note standard names will remain associated with cubes where different.
404 Long name will be used consistently in output filename and titles.
405 """
406 # Convert STASH to LFRic variable name
407 if "STASH" in cube.attributes:
408 stash = cube.attributes["STASH"]
409 try:
410 (name, grid) = STASH_TO_LFRIC[str(stash)]
411 cube.long_name = name
412 except KeyError:
413 # Don't change cubes with unknown stash codes.
414 _warn_once(
415 f"Unknown STASH code: {stash}. Please check file stash_to_lfric.py to update."
416 )
419def _lfric_normalise_callback(cube: iris.cube.Cube, field, filename):
420 """Normalise attributes that prevents LFRic cube from merging.
422 The uuid and timeStamp relate to the output file, as saved by XIOS, and has
423 no relation to the data contained. These attributes are removed.
425 The um_stash_source is a list of STASH codes for when an LFRic field maps to
426 multiple UM fields, however it can be encoded in any order. This attribute
427 is sorted to prevent this. This attribute is only present in LFRic data that
428 has been converted to look like UM data.
429 """
430 # Remove unwanted attributes.
431 cube.attributes.pop("timeStamp", None)
432 cube.attributes.pop("uuid", None)
433 cube.attributes.pop("name", None)
435 # Sort STASH code list.
436 stash_list = cube.attributes.get("um_stash_source")
437 if stash_list:
438 # Parse the string as a list, sort, then re-encode as a string.
439 cube.attributes["um_stash_source"] = str(sorted(ast.literal_eval(stash_list)))
442def _lfric_time_coord_fix_callback(
443 cube: iris.cube.Cube, field, filename
444) -> iris.cube.Cube:
445 """Ensure the time coordinate is a DimCoord rather than an AuxCoord.
447 The coordinate is converted and replaced if not. SLAMed LFRic data has this
448 issue, though the coordinate satisfies all the properties for a DimCoord.
449 Scalar time values are left as AuxCoords.
450 """
451 # This issue seems to come from iris's handling of NetCDF files where time
452 # always ends up as an AuxCoord.
453 if cube.coords("time"):
454 time_coord = cube.coord("time")
455 if (
456 not isinstance(time_coord, iris.coords.DimCoord)
457 and len(cube.coord_dims(time_coord)) == 1
458 ):
459 # Fudge the bounds to foil checking for strict monotonicity.
460 if time_coord.has_bounds(): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 if (time_coord.bounds[-1][0] - time_coord.bounds[0][0]) < 1.0e-8:
462 time_coord.bounds = [
463 [
464 time_coord.bounds[i][0] + 1.0e-8 * float(i),
465 time_coord.bounds[i][1],
466 ]
467 for i in range(len(time_coord.bounds))
468 ]
469 iris.util.promote_aux_coord_to_dim_coord(cube, time_coord)
470 return cube
473def _grid_longitude_fix_callback(cube: iris.cube.Cube) -> iris.cube.Cube:
474 """Check grid_longitude coordinates are in the range -180 deg to 180 deg.
476 This is necessary if comparing two models with different conventions --
477 for example, models where the prime meridian is defined as 0 deg or
478 360 deg. If not in the range -180 deg to 180 deg, we wrap the grid_longitude
479 so that it falls in this range. Checks are for near-180 bounds given
480 model data bounds may not extend exactly to 0. or 360.
481 Input cubes on non-rotated grid coordinates are not impacted.
482 """
483 try:
484 y, x = get_cube_yxcoordname(cube)
485 except ValueError:
486 # Don't modify non-spatial cubes.
487 return cube
489 long_coord = cube.coord(x)
490 # Wrap longitudes if rotated pole coordinates
491 coord_system = long_coord.coord_system
492 if x == "grid_longitude" and isinstance(
493 coord_system, iris.coord_systems.RotatedGeogCS
494 ):
495 long_points = long_coord.points.copy()
496 long_centre = np.median(long_points)
497 while long_centre < -175.0:
498 long_centre += 360.0
499 long_points += 360.0
500 while long_centre >= 175.0:
501 long_centre -= 360.0
502 long_points -= 360.0
503 long_coord.points = long_points
505 # Update coord bounds to be consistent with wrapping.
506 if long_coord.has_bounds():
507 long_coord.bounds = None
508 long_coord.guess_bounds()
510 return cube
513def _fix_spatial_coords_callback(cube: iris.cube.Cube):
514 """Check latitude and longitude coordinates name.
516 This is necessary as some models define their grid as on rotated
517 'grid_latitude' and 'grid_longitude' coordinates while others define
518 the grid on non-rotated 'latitude' and 'longitude'.
519 Cube dimensions need to be made consistent to avoid recipe failures,
520 particularly where comparing multiple input models with differing spatial
521 coordinates.
522 """
523 # Check if cube is spatial.
524 if not is_spatialdim(cube):
525 # Don't modify non-spatial cubes.
526 return
528 # Get spatial coords and dimension index.
529 y_name, x_name = get_cube_yxcoordname(cube)
530 ny = get_cube_coordindex(cube, y_name)
531 nx = get_cube_coordindex(cube, x_name)
533 # Remove spatial coords bounds if erroneous values detected.
534 # Aims to catch some errors in input coord bounds by setting
535 # invalid threshold of 10000.0
536 if cube.coord(x_name).has_bounds() and cube.coord(y_name).has_bounds():
537 bx_max = np.max(np.abs(cube.coord(x_name).bounds))
538 by_max = np.max(np.abs(cube.coord(y_name).bounds))
539 if bx_max > 10000.0 or by_max > 10000.0:
540 cube.coord(x_name).bounds = None
541 cube.coord(y_name).bounds = None
543 # Translate [grid_latitude, grid_longitude] to an unrotated 1-d DimCoord
544 # [latitude, longitude] for instances where rotated_pole=90.0
545 if "grid_latitude" in [coord.name() for coord in cube.coords(dim_coords=True)]:
546 coord_system = cube.coord("grid_latitude").coord_system
547 pole_lat = getattr(coord_system, "grid_north_pole_latitude", None)
548 if pole_lat == 90.0: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 lats = cube.coord("grid_latitude").points
550 lons = cube.coord("grid_longitude").points
552 cube.remove_coord("grid_latitude")
553 cube.add_dim_coord(
554 iris.coords.DimCoord(
555 lats,
556 standard_name="latitude",
557 var_name="latitude",
558 units="degrees",
559 coord_system=iris.coord_systems.GeogCS(6371229.0),
560 circular=True,
561 ),
562 ny,
563 )
564 y_name = "latitude"
565 cube.remove_coord("grid_longitude")
566 cube.add_dim_coord(
567 iris.coords.DimCoord(
568 lons,
569 standard_name="longitude",
570 var_name="longitude",
571 units="degrees",
572 coord_system=iris.coord_systems.GeogCS(6371229.0),
573 circular=True,
574 ),
575 nx,
576 )
577 x_name = "longitude"
579 # Create additional AuxCoord [grid_latitude, grid_longitude] with
580 # rotated pole attributes for cases with [lat, lon] inputs
581 if y_name in ["latitude"] and cube.coord(y_name).units in [
582 "degrees",
583 "degrees_north",
584 "degrees_south",
585 ]:
586 # Add grid_latitude AuxCoord
587 if "grid_latitude" not in [ 587 ↛ 600line 587 didn't jump to line 600 because the condition on line 587 was always true
588 coord.name() for coord in cube.coords(dim_coords=False)
589 ]:
590 cube.add_aux_coord(
591 iris.coords.AuxCoord(
592 cube.coord(y_name).points,
593 var_name="grid_latitude",
594 units="degrees",
595 ),
596 ny,
597 )
598 # Ensure input latitude DimCoord has CoordSystem
599 # This attribute is sometimes lost on iris.save
600 if not cube.coord(y_name).coord_system:
601 cube.coord(y_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
603 if x_name in ["longitude"] and cube.coord(x_name).units in [
604 "degrees",
605 "degrees_west",
606 "degrees_east",
607 ]:
608 # Add grid_longitude AuxCoord
609 if "grid_longitude" not in [ 609 ↛ 623line 609 didn't jump to line 623 because the condition on line 609 was always true
610 coord.name() for coord in cube.coords(dim_coords=False)
611 ]:
612 cube.add_aux_coord(
613 iris.coords.AuxCoord(
614 cube.coord(x_name).points,
615 var_name="grid_longitude",
616 units="degrees",
617 ),
618 nx,
619 )
621 # Ensure input longitude DimCoord has CoordSystem
622 # This attribute is sometimes lost on iris.save
623 if not cube.coord(x_name).coord_system:
624 cube.coord(x_name).coord_system = iris.coord_systems.GeogCS(6371229.0)
627def _fix_pressure_coord_callback(cube: iris.cube.Cube):
628 """Rename pressure coordinate to "pressure" if it exists and ensure hPa units.
630 This problem was raised because the AIFS model data from ECMWF
631 defines the pressure coordinate with the name "pressure_level" rather
632 than compliant CF coordinate names.
634 Additionally, set the units of pressure to be hPa to be consistent with the UM,
635 and approach the coordinates in a unified way.
636 """
637 for coord in cube.dim_coords:
638 if coord.name() in ["pressure_level", "pressure_levels"]:
639 coord.rename("pressure")
641 if coord.name() == "pressure":
642 if str(cube.coord("pressure").units) != "hPa":
643 cube.coord("pressure").convert_units("hPa")
646def _fix_um_radtime(cube: iris.cube.Cube):
647 """Move radiation diagnostics from timestamps which are output N minutes or seconds past every hour.
649 This callback does not have any effect for output diagnostics with
650 timestamps exactly 00 or 30 minutes past the hour. Only radiation
651 diagnostics are checked.
652 Note this callback does not interpolate the data in time, only adjust
653 timestamps to sit on the hour to enable time-to-time difference plotting
654 with models which may output radiation data on the hour.
655 """
656 try:
657 if cube.attributes["STASH"] in [
658 "m01s01i207",
659 "m01s01i208",
660 "m01s02i205",
661 "m01s02i201",
662 "m01s01i207",
663 "m01s02i207",
664 "m01s01i235",
665 ]:
666 time_coord = cube.coord("time")
668 # Convert time points to datetime objects
669 time_unit = time_coord.units
670 time_points = time_unit.num2date(time_coord.points)
671 # Skip if times don't need fixing.
672 if time_points[0].minute == 0 and time_points[0].second == 0:
673 return
674 if time_points[0].minute == 30 and time_points[0].second == 0: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true
675 return
677 # Subtract time difference from the hour from each time point
678 n_minute = time_points[0].minute
679 n_second = time_points[0].second
680 # If times closer to next hour, compute difference to add on to following hour
681 if n_minute > 30:
682 n_minute = n_minute - 60
683 # Compute new diagnostic time stamp
684 new_time_points = (
685 time_points
686 - datetime.timedelta(minutes=n_minute)
687 - datetime.timedelta(seconds=n_second)
688 )
690 # Convert back to numeric values using the original time unit.
691 new_time_values = time_unit.date2num(new_time_points)
693 # Replace the time coordinate with updated values.
694 time_coord.points = new_time_values
696 # Recompute forecast_period with corrected values.
697 if cube.coord("forecast_period"): 697 ↛ exitline 697 didn't return from function '_fix_um_radtime' because the condition on line 697 was always true
698 fcst_prd_points = cube.coord("forecast_period").points
699 new_fcst_points = (
700 time_unit.num2date(fcst_prd_points)
701 - datetime.timedelta(minutes=n_minute)
702 - datetime.timedelta(seconds=n_second)
703 )
704 cube.coord("forecast_period").points = time_unit.date2num(
705 new_fcst_points
706 )
707 except KeyError:
708 pass
711def _fix_cell_methods(cube: iris.cube.Cube):
712 """To fix the assumed cell_methods in accumulation STASH from UM.
714 Lightning (m01s21i104), rainfall amount (m01s04i201, m01s05i201) and snowfall amount
715 (m01s04i202, m01s05i202) in UM is being output as a time accumulation,
716 over each hour (TAcc1hr), but input cubes show cell_methods as "mean".
717 For UM and LFRic inputs to be compatible, we assume accumulated cell_methods are
718 "sum". This callback changes "mean" cube attribute cell_method to "sum",
719 enabling the cell_method constraint on reading to select correct input.
720 """
721 # Shift "mean" cell_method to "sum" for selected UM inputs.
722 if cube.attributes.get("STASH") in [
723 "m01s21i104",
724 "m01s04i201",
725 "m01s04i202",
726 "m01s05i201",
727 "m01s05i202",
728 ]:
729 # Check if input cell_method contains "mean" time-processing.
730 if set(cm.method for cm in cube.cell_methods) == {"mean"}: 730 ↛ exitline 730 didn't return from function '_fix_cell_methods' because the condition on line 730 was always true
731 # Retrieve interval and any comment information.
732 for cell_method in cube.cell_methods:
733 interval_str = cell_method.intervals
734 comment_str = cell_method.comments
736 # Remove input aggregation method.
737 cube.cell_methods = ()
739 # Replace "mean" with "sum" cell_method to indicate aggregation.
740 cube.add_cell_method(
741 iris.coords.CellMethod(
742 method="sum",
743 coords="time",
744 intervals=interval_str,
745 comments=comment_str,
746 )
747 )
750def _convert_cube_units_callback(cube: iris.cube.Cube):
751 """Adjust diagnostic units for specific variables.
753 Some precipitation diagnostics are output with unit kg m-2 s-1 and are
754 converted here to mm hr-1.
756 Visibility diagnostics are converted here from m to km to improve output
757 formatting.
758 """
759 # Convert precipitation diagnostic units if required.
760 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
761 if any("surface_microphysical" in name for name in varnames):
762 if cube.units == "kg m-2 s-1":
763 logging.debug(
764 "Converting precipitation rate units from kg m-2 s-1 to mm hr-1"
765 )
766 # Convert from kg m-2 s-1 to mm s-1 assuming 1kg water = 1l water = 1dm^3 water.
767 # This is a 1:1 conversion, so we just change the units.
768 cube.units = "mm s-1"
769 # Convert the units to per hour.
770 cube.convert_units("mm hr-1")
771 elif cube.units == "kg m-2": 771 ↛ 778line 771 didn't jump to line 778 because the condition on line 771 was always true
772 logging.debug("Converting precipitation amount units from kg m-2 to mm")
773 # Convert from kg m-2 to mm assuming 1kg water = 1l water = 1dm^3 water.
774 # This is a 1:1 conversion, so we just change the units.
775 cube.units = "mm"
777 # Convert visibility diagnostic units if required.
778 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
779 if any("visibility" in name for name in varnames):
780 if cube.units == "m": 780 ↛ 785line 780 didn't jump to line 785 because the condition on line 780 was always true
781 logging.debug("Converting visibility units m to km.")
782 # Convert the units to km.
783 cube.convert_units("km")
785 return cube
788def _fix_lfric_cloud_base_altitude(cube: iris.cube.Cube):
789 """Mask cloud_base_altitude diagnostic in regions with no cloud."""
790 varnames = filter(None, [cube.long_name, cube.standard_name, cube.var_name])
791 if any("cloud_base_altitude" in name for name in varnames):
792 # Mask cube where set > 144kft to catch default 144.35695538058164
793 cube.data = np.ma.masked_array(cube.data)
794 cube.data[cube.data > 144.0] = np.ma.masked
797def _fix_um_winds(cubes: iris.cube.CubeList):
798 """To make winds from the UM consistent with those from LFRic.
800 Diagnostics of wind are not always consistent between the UM
801 and LFric. Here, winds from the UM are adjusted to make them i
802 consistent with LFRic.
803 """
804 # Check whether we have components of the wind identified by STASH,
805 # (so this will apply only to cubes from the UM), but not the
806 # wind speed and calculate it if it is missing. Note that
807 # this will be biased low in general because the components will mostly
808 # be time averages. For simplicity, we do this only if there is just one
809 # cube of a component. A more complicated approach would be to consider
810 # the cell methods, but it may not be warranted.
811 u_constr = iris.AttributeConstraint(STASH="m01s03i225")
812 v_constr = iris.AttributeConstraint(STASH="m01s03i226")
813 speed_constr = iris.AttributeConstraint(STASH="m01s03i227")
814 try:
815 if cubes.extract(u_constr) and cubes.extract(v_constr): 815 ↛ 816line 815 didn't jump to line 816 because the condition on line 815 was never true
816 if len(cubes.extract(u_constr)) == 1 and not cubes.extract(speed_constr):
817 _add_wind_speed_um(cubes)
818 # Convert winds in the UM to be relative to true east and true north.
819 _convert_wind_true_dirn_um(cubes)
820 except (KeyError, AttributeError):
821 pass
824def _add_wind_speed_um(cubes: iris.cube.CubeList):
825 """Add windspeeds to cubes from the UM."""
826 wspd10 = (
827 cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i225"))[0] ** 2
828 + cubes.extract_cube(iris.AttributeConstraint(STASH="m01s03i226"))[0] ** 2
829 ) ** 0.5
830 wspd10.attributes["STASH"] = "m01s03i227"
831 wspd10.standard_name = "wind_speed"
832 wspd10.long_name = "wind_speed_at_10m"
833 cubes.append(wspd10)
836def _convert_wind_true_dirn_um(cubes: iris.cube.CubeList):
837 """To convert winds to true directions.
839 Convert from the components relative to the grid to true directions.
840 This functionality only handles the simplest case.
841 """
842 u_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i225"))
843 v_grids = cubes.extract(iris.AttributeConstraint(STASH="m01s03i226"))
844 for u, v in zip(u_grids, v_grids, strict=True):
845 true_u, true_v = rotate_winds(u, v, iris.coord_systems.GeogCS(6371229.0))
846 u.data = true_u.data
847 v.data = true_v.data
850def _normalise_var0_varname(cube: iris.cube.Cube):
851 """Fix varnames for consistency to allow merging.
853 Some model data netCDF sometimes have a coordinate name end in
854 "_0" etc, where duplicate coordinates of same name are defined but
855 with different attributes. This can be inconsistently managed in
856 different model inputs and can cause cubes to fail to merge.
857 """
858 for coord in cube.coords():
859 if coord.var_name and coord.var_name.endswith("_0"):
860 coord.var_name = coord.var_name.removesuffix("_0")
861 if coord.var_name and coord.var_name.endswith("_1"):
862 coord.var_name = coord.var_name.removesuffix("_1")
863 if coord.var_name and coord.var_name.endswith("_2"): 863 ↛ 864line 863 didn't jump to line 864 because the condition on line 863 was never true
864 coord.var_name = coord.var_name.removesuffix("_2")
865 if coord.var_name and coord.var_name.endswith("_3"): 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 coord.var_name = coord.var_name.removesuffix("_3")
868 if cube.var_name and cube.var_name.endswith("_0"):
869 cube.var_name = cube.var_name.removesuffix("_0")
872def _proleptic_gregorian_fix(cube: iris.cube.Cube):
873 """Convert the calendars of time units to use a standard calendar."""
874 try:
875 time_coord = cube.coord("time")
876 if time_coord.units.calendar == "proleptic_gregorian":
877 logging.debug(
878 "Changing proleptic Gregorian calendar to standard calendar for %s",
879 repr(time_coord.units),
880 )
881 time_coord.units = time_coord.units.change_calendar("standard")
882 except iris.exceptions.CoordinateNotFoundError:
883 pass
886def _lfric_time_callback(cube: iris.cube.Cube):
887 """Fix time coordinate metadata if missing dimensions.
889 Some model data does not contain forecast_reference_time or forecast_period as
890 expected coordinates, and so we cannot aggregate over case studies without this
891 metadata. This callback fixes these issues.
893 This callback also ensures all time coordinates are referenced as hours since
894 1970-01-01 00:00:00 for consistency across different model inputs.
896 Notes
897 -----
898 Some parts of the code have been adapted from Paul Earnshaw's scripts.
899 """
900 # Construct forecast_reference time if it doesn't exist.
901 try:
902 tcoord = cube.coord("time")
903 # Set time coordinate to common basis "hours since 1970"
904 try:
905 tcoord.convert_units("hours since 1970-01-01 00:00:00")
906 except ValueError:
907 logging.warning("Unrecognised base time unit: %s", tcoord.units)
909 if not cube.coords("forecast_reference_time"):
910 try:
911 init_time = datetime.datetime.fromisoformat(
912 tcoord.attributes["time_origin"]
913 )
914 frt_point = tcoord.units.date2num(init_time)
915 frt_coord = iris.coords.AuxCoord(
916 frt_point,
917 units=tcoord.units,
918 standard_name="forecast_reference_time",
919 long_name="forecast_reference_time",
920 )
921 cube.add_aux_coord(frt_coord)
922 except KeyError:
923 logging.warning(
924 "Cannot find forecast_reference_time, but no `time_origin` attribute to construct it from."
925 )
927 # Remove time_origin to allow multiple case studies to merge.
928 tcoord.attributes.pop("time_origin", None)
930 # Construct forecast_period axis (forecast lead time) if it doesn't exist.
931 if not cube.coords("forecast_period"):
932 try:
933 # Create array of forecast lead times.
934 init_coord = cube.coord("forecast_reference_time")
935 init_time_points_in_tcoord_units = tcoord.units.date2num(
936 init_coord.units.num2date(init_coord.points)
937 )
938 lead_times = tcoord.points - init_time_points_in_tcoord_units
940 # Get unit for lead time from time coordinate's unit.
941 # Convert all lead time to hours for consistency between models.
942 if "seconds" in str(tcoord.units): 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true
943 lead_times = lead_times / 3600.0
944 units = "hours"
945 elif "hours" in str(tcoord.units): 945 ↛ 948line 945 didn't jump to line 948 because the condition on line 945 was always true
946 units = "hours"
947 else:
948 raise ValueError(f"Unrecognised base time unit: {tcoord.units}")
950 # Create lead time coordinate.
951 lead_time_coord = iris.coords.AuxCoord(
952 lead_times,
953 standard_name="forecast_period",
954 long_name="forecast_period",
955 units=units,
956 )
958 # Associate lead time coordinate with time dimension.
959 cube.add_aux_coord(lead_time_coord, cube.coord_dims("time"))
960 except iris.exceptions.CoordinateNotFoundError:
961 logging.warning(
962 "Cube does not have both time and forecast_reference_time coordinate, so cannot construct forecast_period"
963 )
964 except iris.exceptions.CoordinateNotFoundError:
965 logging.warning("No time coordinate on cube.")
968def _lfric_forecast_period_standard_name_callback(cube: iris.cube.Cube):
969 """Add forecast_period standard name if missing."""
970 try:
971 coord = cube.coord("forecast_period")
972 if not coord.standard_name:
973 coord.standard_name = "forecast_period"
974 except iris.exceptions.CoordinateNotFoundError:
975 pass
978def _normalise_ML_varname(cube: iris.cube.Cube):
979 """Fix variable names in ML models to standard names."""
980 if cube.coords("pressure"):
981 if cube.name() == "x_wind":
982 cube.long_name = "zonal_wind_at_pressure_levels"
983 if cube.name() == "y_wind":
984 cube.long_name = "meridional_wind_at_pressure_levels"
985 if cube.name() == "air_temperature":
986 cube.long_name = "temperature_at_pressure_levels"