Coverage for src/CSET/operators/constraints.py: 93%
107 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 15:22 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 15:22 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to generate constraints to filter with."""
17import numbers
18import re
19from collections.abc import Iterable
20from datetime import timedelta
22import iris
23import iris.coords
24import iris.cube
26import CSET.operators._utils as operator_utils
27from CSET._common import iter_maybe
30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint:
31 """Generate constraint from STASH code.
33 Operator that takes a stash string, and uses iris to generate a constraint
34 to be passed into the read operator to minimize the CubeList the read
35 operator loads and speed up loading.
37 Arguments
38 ---------
39 stash: str
40 stash code to build iris constraint, such as "m01s03i236"
42 Returns
43 -------
44 stash_constraint: iris.AttributeConstraint
45 """
46 # At a later stage str list an option to combine constraints. Arguments
47 # could be a list of stash codes that combined build the constraint.
48 stash_constraint = iris.AttributeConstraint(STASH=stash)
49 return stash_constraint
52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint:
53 """Generate constraint from variable name or STASH code.
55 Operator that takes a CF compliant variable name string, and generates an
56 iris constraint to be passed into the read or filter operator. Can also be
57 passed a STASH code to generate a STASH constraint.
59 Arguments
60 ---------
61 varname: str
62 CF compliant name of variable, or a UM STASH code such as "m01s03i236".
64 Returns
65 -------
66 An Iris constraint for either:
67 - a single UM STASH code
68 - a single variable name
69 - a list of variable names (Cardington multi-input case)
70 """
71 _STASH_RE = re.compile(r"m\d{2}s\d{2}i\d{3}$")
72 # ---- CASE 1: list of variable names (e.g. Cardington multi-variable) ----
73 if isinstance(varname, (list, tuple)):
74 return iris.Constraint(
75 cube_func=lambda cube: (
76 cube.long_name in varname
77 or cube.standard_name in varname
78 or cube.var_name in varname
79 )
80 )
82 # ---- CASE 2: single UM STASH code ----
83 if _STASH_RE.match(varname):
84 return iris.AttributeConstraint(STASH=varname)
86 # ---- CASE 3: single variable name ----
87 return iris.Constraint(name=varname)
90def generate_level_constraint(
91 coordinate: str, levels: int | list[int] | str, **kwargs
92) -> iris.Constraint:
93 """Generate constraint for particular levels on the specified coordinate.
95 Operator that generates a constraint to constrain to specific model or
96 pressure levels. If no levels are specified then any cube with the specified
97 coordinate is rejected.
99 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"``
100 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic.
102 Arguments
103 ---------
104 coordinate: str
105 Level coordinate name about which to constraint.
106 levels: int | list[int] | str
107 CF compliant level points, ``"*"`` for retrieving all levels, or
108 ``[]`` for no levels.
110 Returns
111 -------
112 constraint: iris.Constraint
114 Notes
115 -----
116 Due to the specification of ``coordinate`` as an argument any iterable
117 coordinate can be stratified with this function. Therefore,
118 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the
119 ensemble members, or group of ensemble members you wish to constrain your
120 results over.
121 """
122 # If asterisks, then return all levels for given coordinate.
123 if levels == "*":
124 return iris.Constraint(**{coordinate: lambda cell: True})
125 else:
126 # Ensure is iterable.
127 if not isinstance(levels, Iterable):
128 levels = [levels]
130 # When no levels specified reject cube with level coordinate.
131 if len(levels) == 0:
133 def no_levels(cube):
134 # Reject cubes for which coordinate exists.
135 return not cube.coords(coordinate)
137 return iris.Constraint(cube_func=no_levels)
139 # Filter the coordinate to the desired levels.
140 # Dictionary unpacking is used to provide programmatic keyword arguments.
141 return iris.Constraint(**{coordinate: levels})
144def generate_remove_single_level_constraint(
145 coord: str, level: int = 0, **kwargs
146) -> iris.Constraint:
147 """
148 Generate a constraint to remove a single model level number.
150 Operator that returns a constraint to remove the given level. By
151 default the first level is removed (assumed to be
152 level zero). However, any level can be removed.
154 Arguments
155 ---------
156 coord: str
157 The coordinate for which the level is to be removed.
158 level: int
159 Default is 0. The model level number to remove.
161 Returns
162 -------
163 iris.Constraint
165 Notes
166 -----
167 This operator is primarily used to ensure the levels are consistent
168 as some level sets (e.g. specific humidity) will be on the same level set
169 but have a different number of levels (e.g 71 instead of expected 70).
170 """
171 return iris.Constraint(**{coord: lambda m: m.point != level})
174def generate_cell_methods_constraint(
175 cell_methods: list,
176 varname: str | None = None,
177 coord: iris.coords.Coord | None = None,
178 interval: str | None = None,
179 comment: str | None = None,
180 **kwargs,
181) -> iris.Constraint:
182 """Generate constraint from cell methods.
184 Operator that takes a list of cell methods and generates a constraint from
185 that. Use [] to specify non-aggregated data.
187 Arguments
188 ---------
189 cell_methods: list
190 cube.cell_methods for filtering.
191 varname: str, optional
192 CF compliant name of variable.
193 coord: iris.coords.Coord, optional
194 iris.coords.Coord to which the cell method is applied to.
195 interval: str, optional
196 interval over which the cell method is applied to (e.g. 1 hour).
197 comment: str, optional
198 any comments in Cube meta data associated with the cell method.
200 Returns
201 -------
202 cell_method_constraint: iris.Constraint
203 """
204 if len(cell_methods) == 0:
206 def check_no_aggregation(cube: iris.cube.Cube) -> bool:
207 """Check that any cell methods are "point", meaning no aggregation."""
208 return set(cm.method for cm in cube.cell_methods) <= {"point"}
210 def check_cell_sum(cube: iris.cube.Cube) -> bool:
211 """Check that any cell methods are "sum"."""
212 return set(cm.method for cm in cube.cell_methods) == {"sum"}
214 def check_cell_mean(cube: iris.cube.Cube) -> bool:
215 """Check that any cell methods are "mean"."""
216 return set(cm.method for cm in cube.cell_methods) == {"mean"}
218 if varname:
219 # Require number_of_lightning_flashes to be "sum" cell_method input.
220 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs.
221 if ("lightning" in varname) or (
222 "surface_microphysical" in varname and "amount" in varname
223 ):
224 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum)
225 return cell_methods_constraint
226 # Require climatological ancillary as time-average mean.
227 if ("albedo" in varname) or ( 227 ↛ 234line 227 didn't jump to line 234 because the condition on line 227 was always true
228 "ocean" in varname and "chlorophyll" in varname
229 ):
230 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean)
231 return cell_methods_constraint
233 # If no variable name set, assume require instantaneous cube.
234 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation)
236 else:
237 # If cell_method constraint set in recipe, check for required input.
238 def check_cell_methods(cube: iris.cube.Cube) -> bool:
239 return all(
240 iris.coords.CellMethod(
241 method=cm, coords=coord, intervals=interval, comments=comment
242 )
243 in cube.cell_methods
244 for cm in cell_methods
245 )
247 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods)
249 return cell_methods_constraint
252def generate_time_constraint(
253 time_start: str, time_end: str = None, **kwargs
254) -> iris.Constraint:
255 """Generate constraint between times.
257 Operator that takes one or two ISO 8601 date strings, and returns a
258 constraint that selects values between those dates (inclusive).
260 Arguments
261 ---------
262 time_start: str | datetime.datetime | cftime.datetime
263 ISO date for lower bound
265 time_end: str | datetime.datetime | cftime.datetime
266 ISO date for upper bound. If omitted it defaults to the same as
267 time_start
269 Returns
270 -------
271 time_constraint: iris.Constraint
272 """
273 if isinstance(time_start, str):
274 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start)
275 else:
276 pdt_start, offset_start = time_start, timedelta(0)
278 if time_end is None:
279 pdt_end, offset_end = time_start, offset_start
280 elif isinstance(time_end, str):
281 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end)
282 print(pdt_end)
283 print(offset_end)
284 else:
285 pdt_end, offset_end = time_end, timedelta(0)
287 if offset_start is None:
288 offset_start = timedelta(0)
289 if offset_end is None:
290 offset_end = timedelta(0)
292 time_constraint = iris.Constraint(
293 time=lambda t: (
294 (pdt_start <= (t.point - offset_start))
295 and ((t.point - offset_end) <= pdt_end)
296 )
297 )
299 return time_constraint
302def generate_area_constraint(
303 lat_start: float | None,
304 lat_end: float | None,
305 lon_start: float | None,
306 lon_end: float | None,
307 **kwargs,
308) -> iris.Constraint:
309 """Generate an area constraint between latitude/longitude limits.
311 Operator that takes a set of latitude and longitude limits and returns a
312 constraint that selects grid values only inside that area. Works with the
313 data's native grid so is defined within the rotated pole CRS.
315 Alternatively, all arguments may be None to indicate the area should not be
316 constrained. This is useful to allow making subsetting an optional step in a
317 processing pipeline.
319 Arguments
320 ---------
321 lat_start: float | None
322 Latitude value for lower bound
323 lat_end: float | None
324 Latitude value for top bound
325 lon_start: float | None
326 Longitude value for left bound
327 lon_end: float | None
328 Longitude value for right bound
330 Returns
331 -------
332 area_constraint: iris.Constraint
333 """
334 # Check all arguments are defined, or all are None.
335 if not (
336 all(
337 (
338 isinstance(lat_start, numbers.Real),
339 isinstance(lat_end, numbers.Real),
340 isinstance(lon_start, numbers.Real),
341 isinstance(lon_end, numbers.Real),
342 )
343 )
344 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None))
345 ):
346 raise TypeError("Bounds must real numbers, or all None.")
348 # Don't constrain area if all arguments are None.
349 if lat_start is None: # Only need to check once, as they will be the same.
350 # An empty constraint allows everything.
351 return iris.Constraint()
353 # Handle bounds crossing the date line.
354 if lon_end < lon_start: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 lon_end = lon_end + 360
357 def bound_lat(cell: iris.coords.Cell) -> bool:
358 return lat_start < cell < lat_end
360 def bound_lon(cell: iris.coords.Cell) -> bool:
361 # Adjust cell values to handle crossing the date line.
362 if cell < lon_start:
363 cell = cell + 360
364 return lon_start < cell < lon_end
366 area_constraint = iris.Constraint(
367 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon}
368 )
369 return area_constraint
372def generate_remove_single_ensemble_member_constraint(
373 ensemble_member: int = 0, **kwargs
374) -> iris.Constraint:
375 """
376 Generate a constraint to remove a single ensemble member.
378 Operator that returns a constraint to remove the given ensemble member. By
379 default the ensemble member removed is the control member (assumed to have
380 a realization of zero). However, any ensemble member can be removed, thus
381 allowing a non-zero control member to be removed if the control is a
382 different member.
384 Arguments
385 ---------
386 ensemble_member: int
387 Default is 0. The ensemble member realization to remove.
389 Returns
390 -------
391 iris.Constraint
393 Notes
394 -----
395 This operator is primarily used to remove the control member to allow
396 ensemble metrics to be calculated without the control member. For
397 example, the ensemble mean is not normally calculated including the
398 control member. It is particularly useful to remove the control member
399 when it is not an equally-likely member of the ensemble.
400 """
401 return iris.Constraint(realization=lambda m: m.point != ensemble_member)
404def generate_realization_constraint(
405 ensemble_members: int | list[int], **kwargs
406) -> iris.Constraint:
407 """
408 Generate a constraint to subset ensemble members.
410 Operator that is given a list of ensemble members and returns a constraint
411 to select those ensemble members. This operator is particularly useful for
412 subsetting ensembles.
414 Arguments
415 ---------
416 ensemble_members: int | list[int]
417 The ensemble members to be subsetted over.
419 Returns
420 -------
421 iris.Constraint
422 """
423 # Ensure ensemble_members is iterable.
424 ensemble_members = iter_maybe(ensemble_members)
425 return iris.Constraint(realization=ensemble_members)
428def generate_hour_constraint(
429 hour_start: int,
430 hour_end: int = None,
431 **kwargs,
432) -> iris.Constraint:
433 """Generate an hour constraint between hour of day limits.
435 Operator that takes a set of hour of day limits and returns a constraint that
436 selects only hours within that time frame regardless of day.
438 Alternatively, the result can be constrained to a single hour by just entering
439 a starting hour.
441 Should any sub-hourly data be given these will have the same hour coordinate
442 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all
443 times will be selected with this constraint.
445 Arguments
446 ---------
447 hour_start: int
448 The hour of day for the lower bound, within 0 to 23.
449 hour_end: int | None
450 The hour of day for the upper bound, within 0 to 23. Alternatively,
451 set to None if only one hour required.
453 Returns
454 -------
455 hour_constraint: iris.Constraint
457 Raises
458 ------
459 ValueError
460 If the provided arguments are outside of the range 0 to 23.
461 """
462 if hour_end is None:
463 hour_end = hour_start
465 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23):
466 raise ValueError("Hours must be between 0 and 23 inclusive.")
468 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end)
469 return hour_constraint
472def combine_constraints(
473 constraint: iris.Constraint = None, **kwargs
474) -> iris.Constraint:
475 """
476 Operator that combines multiple constraints into one.
478 Arguments
479 ---------
480 constraint: iris.Constraint
481 First constraint to combine.
482 additional_constraint_1: iris.Constraint
483 Second constraint to combine. This must be a named argument.
484 additional_constraint_2: iris.Constraint
485 There can be any number of additional constraint, they just need unique
486 names.
487 ...
489 Returns
490 -------
491 combined_constraint: iris.Constraint
493 Raises
494 ------
495 TypeError
496 If the provided arguments are not constraints.
497 """
498 # If the first argument is not a constraint, it is ignored. This handles the
499 # automatic passing of the previous step's output.
500 if isinstance(constraint, iris.Constraint):
501 combined_constraint = constraint
502 else:
503 combined_constraint = iris.Constraint()
505 for constr in kwargs.values():
506 combined_constraint = combined_constraint & constr
507 return combined_constraint
510def generate_attribute_constraint(
511 attribute: str, value: str = None, **kwargs
512) -> iris.AttributeConstraint:
513 """Generate constraint on cube attributes.
515 Constrains based on the presence of an attribute, and that attribute having
516 a particular value.
518 Arguments
519 ---------
520 attribute: str
521 Attribute to constraint on.
523 value: str
524 Attribute value to constrain on. If omitted the constraint merely checks
525 for the presence of an attribute.
527 Returns
528 -------
529 attribute_constraint: iris.Constraint
530 """
531 if value is None:
532 attribute_constraint = iris.Constraint(
533 cube_func=lambda cube: attribute in cube.attributes
534 )
535 else:
536 attribute_constraint = iris.AttributeConstraint(**{attribute: value})
537 return attribute_constraint