Coverage for src/CSET/operators/constraints.py: 92%
84 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 12:53 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 12:53 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to generate constraints to filter with."""
17import numbers
18import re
19from collections.abc import Iterable
20from datetime import datetime
22import iris
23import iris.coords
24import iris.cube
26from CSET._common import iter_maybe
29def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint:
30 """Generate constraint from STASH code.
32 Operator that takes a stash string, and uses iris to generate a constraint
33 to be passed into the read operator to minimize the CubeList the read
34 operator loads and speed up loading.
36 Arguments
37 ---------
38 stash: str
39 stash code to build iris constraint, such as "m01s03i236"
41 Returns
42 -------
43 stash_constraint: iris.AttributeConstraint
44 """
45 # At a later stage str list an option to combine constraints. Arguments
46 # could be a list of stash codes that combined build the constraint.
47 stash_constraint = iris.AttributeConstraint(STASH=stash)
48 return stash_constraint
51def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint:
52 """Generate constraint from variable name or STASH code.
54 Operator that takes a CF compliant variable name string, and generates an
55 iris constraint to be passed into the read or filter operator. Can also be
56 passed a STASH code to generate a STASH constraint.
58 Arguments
59 ---------
60 varname: str
61 CF compliant name of variable, or a UM STASH code such as "m01s03i236".
63 Returns
64 -------
65 varname_constraint: iris.Constraint
66 """
67 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname):
68 varname_constraint = iris.AttributeConstraint(STASH=varname)
69 else:
70 varname_constraint = iris.Constraint(name=varname)
71 return varname_constraint
74def generate_level_constraint(
75 coordinate: str, levels: int | list[int] | str, **kwargs
76) -> iris.Constraint:
77 """Generate constraint for particular levels on the specified coordinate.
79 Operator that generates a constraint to constrain to specific model or
80 pressure levels. If no levels are specified then any cube with the specified
81 coordinate is rejected.
83 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"``
84 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic.
86 Arguments
87 ---------
88 coordinate: str
89 Level coordinate name about which to constraint.
90 levels: int | list[int] | str
91 CF compliant level points, ``"*"`` for retrieving all levels, or
92 ``[]`` for no levels.
94 Returns
95 -------
96 constraint: iris.Constraint
98 Notes
99 -----
100 Due to the specification of ``coordinate`` as an argument any iterable
101 coordinate can be stratified with this function. Therefore,
102 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the
103 ensemble members, or group of ensemble members you wish to constrain your
104 results over.
105 """
106 # If asterisks, then return all levels for given coordinate.
107 if levels == "*":
108 return iris.Constraint(**{coordinate: lambda cell: True})
109 else:
110 # Ensure is iterable.
111 if not isinstance(levels, Iterable):
112 levels = [levels]
114 # When no levels specified reject cube with level coordinate.
115 if len(levels) == 0:
117 def no_levels(cube):
118 # Reject cubes for which coordinate exists.
119 return not cube.coords(coordinate)
121 return iris.Constraint(cube_func=no_levels)
123 # Filter the coordinate to the desired levels.
124 # Dictionary unpacking is used to provide programmatic keyword arguments.
125 return iris.Constraint(**{coordinate: levels})
128def generate_cell_methods_constraint(
129 cell_methods: list,
130 varname: str | None = None,
131 coord: iris.coords.Coord | None = None,
132 interval: str | None = None,
133 comment: str | None = None,
134 **kwargs,
135) -> iris.Constraint:
136 """Generate constraint from cell methods.
138 Operator that takes a list of cell methods and generates a constraint from
139 that. Use [] to specify non-aggregated data.
141 Arguments
142 ---------
143 cell_methods: list
144 cube.cell_methods for filtering.
145 varname: str, optional
146 CF compliant name of variable.
147 coord: iris.coords.Coord, optional
148 iris.coords.Coord to which the cell method is applied to.
149 interval: str, optional
150 interval over which the cell method is applied to (e.g. 1 hour).
151 comment: str, optional
152 any comments in Cube meta data associated with the cell method.
154 Returns
155 -------
156 cell_method_constraint: iris.Constraint
157 """
158 if len(cell_methods) == 0:
160 def check_no_aggregation(cube: iris.cube.Cube) -> bool:
161 """Check that any cell methods are "point", meaning no aggregation."""
162 return set(cm.method for cm in cube.cell_methods) <= {"point"}
164 def check_cell_sum(cube: iris.cube.Cube) -> bool:
165 """Check that any cell methods are "sum"."""
166 return set(cm.method for cm in cube.cell_methods) == {"sum"}
168 if varname:
169 # Require number_of_lightning_flashes to be "sum" cell_method input.
170 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs.
171 if ("lightning" in varname) or ( 171 ↛ 178line 171 didn't jump to line 178 because the condition on line 171 was always true
172 "surface_microphysical" in varname and "amount" in varname
173 ):
174 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum)
175 return cell_methods_constraint
177 # If no variable name set, assume require instantaneous cube.
178 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation)
180 else:
181 # If cell_method constraint set in recipe, check for required input.
182 def check_cell_methods(cube: iris.cube.Cube) -> bool:
183 return all(
184 iris.coords.CellMethod(
185 method=cm, coords=coord, intervals=interval, comments=comment
186 )
187 in cube.cell_methods
188 for cm in cell_methods
189 )
191 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods)
193 return cell_methods_constraint
196def generate_time_constraint(
197 time_start: str, time_end: str = None, **kwargs
198) -> iris.AttributeConstraint:
199 """Generate constraint between times.
201 Operator that takes one or two ISO 8601 date strings, and returns a
202 constraint that selects values between those dates (inclusive).
204 Arguments
205 ---------
206 time_start: str | datetime.datetime
207 ISO date for lower bound
209 time_end: str | datetime.datetime
210 ISO date for upper bound. If omitted it defaults to the same as
211 time_start
213 Returns
214 -------
215 time_constraint: iris.Constraint
216 """
217 if isinstance(time_start, str):
218 time_start = datetime.fromisoformat(time_start)
219 if time_end is None:
220 time_end = time_start
221 elif isinstance(time_end, str):
222 time_end = datetime.fromisoformat(time_end)
223 time_constraint = iris.Constraint(time=lambda t: time_start <= t.point <= time_end)
224 return time_constraint
227def generate_area_constraint(
228 lat_start: float | None,
229 lat_end: float | None,
230 lon_start: float | None,
231 lon_end: float | None,
232 **kwargs,
233) -> iris.Constraint:
234 """Generate an area constraint between latitude/longitude limits.
236 Operator that takes a set of latitude and longitude limits and returns a
237 constraint that selects grid values only inside that area. Works with the
238 data's native grid so is defined within the rotated pole CRS.
240 Alternatively, all arguments may be None to indicate the area should not be
241 constrained. This is useful to allow making subsetting an optional step in a
242 processing pipeline.
244 Arguments
245 ---------
246 lat_start: float | None
247 Latitude value for lower bound
248 lat_end: float | None
249 Latitude value for top bound
250 lon_start: float | None
251 Longitude value for left bound
252 lon_end: float | None
253 Longitude value for right bound
255 Returns
256 -------
257 area_constraint: iris.Constraint
258 """
259 # Check all arguments are defined, or all are None.
260 if not (
261 all(
262 (
263 isinstance(lat_start, numbers.Real),
264 isinstance(lat_end, numbers.Real),
265 isinstance(lon_start, numbers.Real),
266 isinstance(lon_end, numbers.Real),
267 )
268 )
269 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None))
270 ):
271 raise TypeError("Bounds must real numbers, or all None.")
273 # Don't constrain area if all arguments are None.
274 if lat_start is None: # Only need to check once, as they will be the same.
275 # An empty constraint allows everything.
276 return iris.Constraint()
278 # Handle bounds crossing the date line.
279 if lon_end < lon_start: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 lon_end = lon_end + 360
282 def bound_lat(cell: iris.coords.Cell) -> bool:
283 return lat_start < cell < lat_end
285 def bound_lon(cell: iris.coords.Cell) -> bool:
286 # Adjust cell values to handle crossing the date line.
287 if cell < lon_start:
288 cell = cell + 360
289 return lon_start < cell < lon_end
291 area_constraint = iris.Constraint(
292 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon}
293 )
294 return area_constraint
297def generate_remove_single_ensemble_member_constraint(
298 ensemble_member: int = 0, **kwargs
299) -> iris.Constraint:
300 """
301 Generate a constraint to remove a single ensemble member.
303 Operator that returns a constraint to remove the given ensemble member. By
304 default the ensemble member removed is the control member (assumed to have
305 a realization of zero). However, any ensemble member can be removed, thus
306 allowing a non-zero control member to be removed if the control is a
307 different member.
309 Arguments
310 ---------
311 ensemble_member: int
312 Default is 0. The ensemble member realization to remove.
314 Returns
315 -------
316 iris.Constraint
318 Notes
319 -----
320 This operator is primarily used to remove the control member to allow
321 ensemble metrics to be calculated without the control member. For
322 example, the ensemble mean is not normally calculated including the
323 control member. It is particularly useful to remove the control member
324 when it is not an equally-likely member of the ensemble.
325 """
326 return iris.Constraint(realization=lambda m: m.point != ensemble_member)
329def generate_realization_constraint(
330 ensemble_members: int | list[int], **kwargs
331) -> iris.Constraint:
332 """
333 Generate a constraint to subset ensemble members.
335 Operator that is given a list of ensemble members and returns a constraint
336 to select those ensemble members. This operator is particularly useful for
337 subsetting ensembles.
339 Arguments
340 ---------
341 ensemble_members: int | list[int]
342 The ensemble members to be subsetted over.
344 Returns
345 -------
346 iris.Constraint
347 """
348 # Ensure ensemble_members is iterable.
349 ensemble_members = iter_maybe(ensemble_members)
350 return iris.Constraint(realization=ensemble_members)
353def generate_hour_constraint(
354 hour_start: int,
355 hour_end: int = None,
356 **kwargs,
357) -> iris.Constraint:
358 """Generate an hour constraint between hour of day limits.
360 Operator that takes a set of hour of day limits and returns a constraint that
361 selects only hours within that time frame regardless of day.
363 Alternatively, the result can be constrained to a single hour by just entering
364 a starting hour.
366 Should any sub-hourly data be given these will have the same hour coordinate
367 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all
368 times will be selected with this constraint.
370 Arguments
371 ---------
372 hour_start: int
373 The hour of day for the lower bound, within 0 to 23.
374 hour_end: int | None
375 The hour of day for the upper bound, within 0 to 23. Alternatively,
376 set to None if only one hour required.
378 Returns
379 -------
380 hour_constraint: iris.Constraint
382 Raises
383 ------
384 ValueError
385 If the provided arguments are outside of the range 0 to 23.
386 """
387 if hour_end is None:
388 hour_end = hour_start
390 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23):
391 raise ValueError("Hours must be between 0 and 23 inclusive.")
393 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end)
394 return hour_constraint
397def combine_constraints(
398 constraint: iris.Constraint = None, **kwargs
399) -> iris.Constraint:
400 """
401 Operator that combines multiple constraints into one.
403 Arguments
404 ---------
405 constraint: iris.Constraint
406 First constraint to combine.
407 additional_constraint_1: iris.Constraint
408 Second constraint to combine. This must be a named argument.
409 additional_constraint_2: iris.Constraint
410 There can be any number of additional constraint, they just need unique
411 names.
412 ...
414 Returns
415 -------
416 combined_constraint: iris.Constraint
418 Raises
419 ------
420 TypeError
421 If the provided arguments are not constraints.
422 """
423 # If the first argument is not a constraint, it is ignored. This handles the
424 # automatic passing of the previous step's output.
425 if isinstance(constraint, iris.Constraint):
426 combined_constraint = constraint
427 else:
428 combined_constraint = iris.Constraint()
430 for constr in kwargs.values():
431 combined_constraint = combined_constraint & constr
432 return combined_constraint