Coverage for src / CSET / operators / constraints.py: 91%
105 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 14:01 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 14:01 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to generate constraints to filter with."""
17import numbers
18import re
19from collections.abc import Iterable
20from datetime import timedelta
22import iris
23import iris.coords
24import iris.cube
26import CSET.operators._utils as operator_utils
27from CSET._common import iter_maybe
30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint:
31 """Generate constraint from STASH code.
33 Operator that takes a stash string, and uses iris to generate a constraint
34 to be passed into the read operator to minimize the CubeList the read
35 operator loads and speed up loading.
37 Arguments
38 ---------
39 stash: str
40 stash code to build iris constraint, such as "m01s03i236"
42 Returns
43 -------
44 stash_constraint: iris.AttributeConstraint
45 """
46 # At a later stage str list an option to combine constraints. Arguments
47 # could be a list of stash codes that combined build the constraint.
48 stash_constraint = iris.AttributeConstraint(STASH=stash)
49 return stash_constraint
52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint:
53 """Generate constraint from variable name or STASH code.
55 Operator that takes a CF compliant variable name string, and generates an
56 iris constraint to be passed into the read or filter operator. Can also be
57 passed a STASH code to generate a STASH constraint.
59 Arguments
60 ---------
61 varname: str
62 CF compliant name of variable, or a UM STASH code such as "m01s03i236".
64 Returns
65 -------
66 An Iris constraint for either:
67 - a single UM STASH code
68 - a single variable name
69 - a list of variable names (Cardington multi-input case)
70 """
71 _STASH_RE = re.compile(r"m\d{2}s\d{2}i\d{3}$")
72 # ---- CASE 1: list of variable names (e.g. Cardington multi-variable) ----
73 if isinstance(varname, (list, tuple)): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return iris.Constraint(
75 cube_func=lambda cube: (
76 cube.var_name in varname
77 or cube.standard_name in varname
78 or cube.name() in varname
79 )
80 )
82 # ---- CASE 2: single UM STASH code ----
83 if _STASH_RE.match(varname):
84 return iris.AttributeConstraint(STASH=varname)
86 # ---- CASE 3: single variable name ----
87 return iris.Constraint(name=varname)
90def generate_level_constraint(
91 coordinate: str, levels: int | list[int] | str, **kwargs
92) -> iris.Constraint:
93 """Generate constraint for particular levels on the specified coordinate.
95 Operator that generates a constraint to constrain to specific model or
96 pressure levels. If no levels are specified then any cube with the specified
97 coordinate is rejected.
99 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"``
100 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic.
102 Arguments
103 ---------
104 coordinate: str
105 Level coordinate name about which to constraint.
106 levels: int | list[int] | str
107 CF compliant level points, ``"*"`` for retrieving all levels, or
108 ``[]`` for no levels.
110 Returns
111 -------
112 constraint: iris.Constraint
114 Notes
115 -----
116 Due to the specification of ``coordinate`` as an argument any iterable
117 coordinate can be stratified with this function. Therefore,
118 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the
119 ensemble members, or group of ensemble members you wish to constrain your
120 results over.
121 """
122 # If asterisks, then return all levels for given coordinate.
123 if levels == "*":
124 return iris.Constraint(**{coordinate: lambda cell: True})
125 else:
126 # Ensure is iterable.
127 if not isinstance(levels, Iterable):
128 levels = [levels]
130 # When no levels specified reject cube with level coordinate.
131 if len(levels) == 0:
133 def no_levels(cube):
134 # Reject cubes for which coordinate exists.
135 return not cube.coords(coordinate)
137 return iris.Constraint(cube_func=no_levels)
139 # Filter the coordinate to the desired levels.
140 # Dictionary unpacking is used to provide programmatic keyword arguments.
141 return iris.Constraint(**{coordinate: levels})
144def generate_cell_methods_constraint(
145 cell_methods: list,
146 varname: str | None = None,
147 coord: iris.coords.Coord | None = None,
148 interval: str | None = None,
149 comment: str | None = None,
150 **kwargs,
151) -> iris.Constraint:
152 """Generate constraint from cell methods.
154 Operator that takes a list of cell methods and generates a constraint from
155 that. Use [] to specify non-aggregated data.
157 Arguments
158 ---------
159 cell_methods: list
160 cube.cell_methods for filtering.
161 varname: str, optional
162 CF compliant name of variable.
163 coord: iris.coords.Coord, optional
164 iris.coords.Coord to which the cell method is applied to.
165 interval: str, optional
166 interval over which the cell method is applied to (e.g. 1 hour).
167 comment: str, optional
168 any comments in Cube meta data associated with the cell method.
170 Returns
171 -------
172 cell_method_constraint: iris.Constraint
173 """
174 if len(cell_methods) == 0:
176 def check_no_aggregation(cube: iris.cube.Cube) -> bool:
177 """Check that any cell methods are "point", meaning no aggregation."""
178 return set(cm.method for cm in cube.cell_methods) <= {"point"}
180 def check_cell_sum(cube: iris.cube.Cube) -> bool:
181 """Check that any cell methods are "sum"."""
182 return set(cm.method for cm in cube.cell_methods) == {"sum"}
184 def check_cell_mean(cube: iris.cube.Cube) -> bool:
185 """Check that any cell methods are "mean"."""
186 return set(cm.method for cm in cube.cell_methods) == {"mean"}
188 if varname:
189 # Require number_of_lightning_flashes to be "sum" cell_method input.
190 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs.
191 if ("lightning" in varname) or (
192 "surface_microphysical" in varname and "amount" in varname
193 ):
194 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum)
195 return cell_methods_constraint
196 # Require climatological ancillary as time-average mean.
197 if ("albedo" in varname) or ( 197 ↛ 204line 197 didn't jump to line 204 because the condition on line 197 was always true
198 "ocean" in varname and "chlorophyll" in varname
199 ):
200 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean)
201 return cell_methods_constraint
203 # If no variable name set, assume require instantaneous cube.
204 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation)
206 else:
207 # If cell_method constraint set in recipe, check for required input.
208 def check_cell_methods(cube: iris.cube.Cube) -> bool:
209 return all(
210 iris.coords.CellMethod(
211 method=cm, coords=coord, intervals=interval, comments=comment
212 )
213 in cube.cell_methods
214 for cm in cell_methods
215 )
217 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods)
219 return cell_methods_constraint
222def generate_time_constraint(
223 time_start: str, time_end: str = None, **kwargs
224) -> iris.Constraint:
225 """Generate constraint between times.
227 Operator that takes one or two ISO 8601 date strings, and returns a
228 constraint that selects values between those dates (inclusive).
230 Arguments
231 ---------
232 time_start: str | datetime.datetime | cftime.datetime
233 ISO date for lower bound
235 time_end: str | datetime.datetime | cftime.datetime
236 ISO date for upper bound. If omitted it defaults to the same as
237 time_start
239 Returns
240 -------
241 time_constraint: iris.Constraint
242 """
243 if isinstance(time_start, str):
244 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start)
245 else:
246 pdt_start, offset_start = time_start, timedelta(0)
248 if time_end is None:
249 pdt_end, offset_end = time_start, offset_start
250 elif isinstance(time_end, str):
251 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end)
252 print(pdt_end)
253 print(offset_end)
254 else:
255 pdt_end, offset_end = time_end, timedelta(0)
257 if offset_start is None:
258 offset_start = timedelta(0)
259 if offset_end is None:
260 offset_end = timedelta(0)
262 time_constraint = iris.Constraint(
263 time=lambda t: (
264 (pdt_start <= (t.point - offset_start))
265 and ((t.point - offset_end) <= pdt_end)
266 )
267 )
269 return time_constraint
272def generate_area_constraint(
273 lat_start: float | None,
274 lat_end: float | None,
275 lon_start: float | None,
276 lon_end: float | None,
277 **kwargs,
278) -> iris.Constraint:
279 """Generate an area constraint between latitude/longitude limits.
281 Operator that takes a set of latitude and longitude limits and returns a
282 constraint that selects grid values only inside that area. Works with the
283 data's native grid so is defined within the rotated pole CRS.
285 Alternatively, all arguments may be None to indicate the area should not be
286 constrained. This is useful to allow making subsetting an optional step in a
287 processing pipeline.
289 Arguments
290 ---------
291 lat_start: float | None
292 Latitude value for lower bound
293 lat_end: float | None
294 Latitude value for top bound
295 lon_start: float | None
296 Longitude value for left bound
297 lon_end: float | None
298 Longitude value for right bound
300 Returns
301 -------
302 area_constraint: iris.Constraint
303 """
304 # Check all arguments are defined, or all are None.
305 if not (
306 all(
307 (
308 isinstance(lat_start, numbers.Real),
309 isinstance(lat_end, numbers.Real),
310 isinstance(lon_start, numbers.Real),
311 isinstance(lon_end, numbers.Real),
312 )
313 )
314 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None))
315 ):
316 raise TypeError("Bounds must real numbers, or all None.")
318 # Don't constrain area if all arguments are None.
319 if lat_start is None: # Only need to check once, as they will be the same.
320 # An empty constraint allows everything.
321 return iris.Constraint()
323 # Handle bounds crossing the date line.
324 if lon_end < lon_start: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 lon_end = lon_end + 360
327 def bound_lat(cell: iris.coords.Cell) -> bool:
328 return lat_start < cell < lat_end
330 def bound_lon(cell: iris.coords.Cell) -> bool:
331 # Adjust cell values to handle crossing the date line.
332 if cell < lon_start:
333 cell = cell + 360
334 return lon_start < cell < lon_end
336 area_constraint = iris.Constraint(
337 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon}
338 )
339 return area_constraint
342def generate_remove_single_ensemble_member_constraint(
343 ensemble_member: int = 0, **kwargs
344) -> iris.Constraint:
345 """
346 Generate a constraint to remove a single ensemble member.
348 Operator that returns a constraint to remove the given ensemble member. By
349 default the ensemble member removed is the control member (assumed to have
350 a realization of zero). However, any ensemble member can be removed, thus
351 allowing a non-zero control member to be removed if the control is a
352 different member.
354 Arguments
355 ---------
356 ensemble_member: int
357 Default is 0. The ensemble member realization to remove.
359 Returns
360 -------
361 iris.Constraint
363 Notes
364 -----
365 This operator is primarily used to remove the control member to allow
366 ensemble metrics to be calculated without the control member. For
367 example, the ensemble mean is not normally calculated including the
368 control member. It is particularly useful to remove the control member
369 when it is not an equally-likely member of the ensemble.
370 """
371 return iris.Constraint(realization=lambda m: m.point != ensemble_member)
374def generate_realization_constraint(
375 ensemble_members: int | list[int], **kwargs
376) -> iris.Constraint:
377 """
378 Generate a constraint to subset ensemble members.
380 Operator that is given a list of ensemble members and returns a constraint
381 to select those ensemble members. This operator is particularly useful for
382 subsetting ensembles.
384 Arguments
385 ---------
386 ensemble_members: int | list[int]
387 The ensemble members to be subsetted over.
389 Returns
390 -------
391 iris.Constraint
392 """
393 # Ensure ensemble_members is iterable.
394 ensemble_members = iter_maybe(ensemble_members)
395 return iris.Constraint(realization=ensemble_members)
398def generate_hour_constraint(
399 hour_start: int,
400 hour_end: int = None,
401 **kwargs,
402) -> iris.Constraint:
403 """Generate an hour constraint between hour of day limits.
405 Operator that takes a set of hour of day limits and returns a constraint that
406 selects only hours within that time frame regardless of day.
408 Alternatively, the result can be constrained to a single hour by just entering
409 a starting hour.
411 Should any sub-hourly data be given these will have the same hour coordinate
412 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all
413 times will be selected with this constraint.
415 Arguments
416 ---------
417 hour_start: int
418 The hour of day for the lower bound, within 0 to 23.
419 hour_end: int | None
420 The hour of day for the upper bound, within 0 to 23. Alternatively,
421 set to None if only one hour required.
423 Returns
424 -------
425 hour_constraint: iris.Constraint
427 Raises
428 ------
429 ValueError
430 If the provided arguments are outside of the range 0 to 23.
431 """
432 if hour_end is None:
433 hour_end = hour_start
435 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23):
436 raise ValueError("Hours must be between 0 and 23 inclusive.")
438 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end)
439 return hour_constraint
442def combine_constraints(
443 constraint: iris.Constraint = None, **kwargs
444) -> iris.Constraint:
445 """
446 Operator that combines multiple constraints into one.
448 Arguments
449 ---------
450 constraint: iris.Constraint
451 First constraint to combine.
452 additional_constraint_1: iris.Constraint
453 Second constraint to combine. This must be a named argument.
454 additional_constraint_2: iris.Constraint
455 There can be any number of additional constraint, they just need unique
456 names.
457 ...
459 Returns
460 -------
461 combined_constraint: iris.Constraint
463 Raises
464 ------
465 TypeError
466 If the provided arguments are not constraints.
467 """
468 # If the first argument is not a constraint, it is ignored. This handles the
469 # automatic passing of the previous step's output.
470 if isinstance(constraint, iris.Constraint):
471 combined_constraint = constraint
472 else:
473 combined_constraint = iris.Constraint()
475 for constr in kwargs.values():
476 combined_constraint = combined_constraint & constr
477 return combined_constraint
480def generate_attribute_constraint(
481 attribute: str, value: str = None, **kwargs
482) -> iris.AttributeConstraint:
483 """Generate constraint on cube attributes.
485 Constrains based on the presence of an attribute, and that attribute having
486 a particular value.
488 Arguments
489 ---------
490 attribute: str
491 Attribute to constraint on.
493 value: str
494 Attribute value to constrain on. If omitted the constraint merely checks
495 for the presence of an attribute.
497 Returns
498 -------
499 attribute_constraint: iris.Constraint
500 """
501 if value is None:
502 attribute_constraint = iris.Constraint(
503 cube_func=lambda cube: attribute in cube.attributes
504 )
505 else:
506 attribute_constraint = iris.AttributeConstraint(**{attribute: value})
507 return attribute_constraint