Coverage for src/CSET/operators/constraints.py: 92%
93 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 15:17 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators to generate constraints to filter with."""
17import numbers
18import re
19from collections.abc import Iterable
20from datetime import timedelta
22import iris
23import iris.coords
24import iris.cube
26import CSET.operators._utils as operator_utils
27from CSET._common import iter_maybe
30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint:
31 """Generate constraint from STASH code.
33 Operator that takes a stash string, and uses iris to generate a constraint
34 to be passed into the read operator to minimize the CubeList the read
35 operator loads and speed up loading.
37 Arguments
38 ---------
39 stash: str
40 stash code to build iris constraint, such as "m01s03i236"
42 Returns
43 -------
44 stash_constraint: iris.AttributeConstraint
45 """
46 # At a later stage str list an option to combine constraints. Arguments
47 # could be a list of stash codes that combined build the constraint.
48 stash_constraint = iris.AttributeConstraint(STASH=stash)
49 return stash_constraint
52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint:
53 """Generate constraint from variable name or STASH code.
55 Operator that takes a CF compliant variable name string, and generates an
56 iris constraint to be passed into the read or filter operator. Can also be
57 passed a STASH code to generate a STASH constraint.
59 Arguments
60 ---------
61 varname: str
62 CF compliant name of variable, or a UM STASH code such as "m01s03i236".
64 Returns
65 -------
66 varname_constraint: iris.Constraint
67 """
68 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname):
69 varname_constraint = iris.AttributeConstraint(STASH=varname)
70 else:
71 varname_constraint = iris.Constraint(name=varname)
72 return varname_constraint
75def generate_level_constraint(
76 coordinate: str, levels: int | list[int] | str, **kwargs
77) -> iris.Constraint:
78 """Generate constraint for particular levels on the specified coordinate.
80 Operator that generates a constraint to constrain to specific model or
81 pressure levels. If no levels are specified then any cube with the specified
82 coordinate is rejected.
84 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"``
85 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic.
87 Arguments
88 ---------
89 coordinate: str
90 Level coordinate name about which to constraint.
91 levels: int | list[int] | str
92 CF compliant level points, ``"*"`` for retrieving all levels, or
93 ``[]`` for no levels.
95 Returns
96 -------
97 constraint: iris.Constraint
99 Notes
100 -----
101 Due to the specification of ``coordinate`` as an argument any iterable
102 coordinate can be stratified with this function. Therefore,
103 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the
104 ensemble members, or group of ensemble members you wish to constrain your
105 results over.
106 """
107 # If asterisks, then return all levels for given coordinate.
108 if levels == "*":
109 return iris.Constraint(**{coordinate: lambda cell: True})
110 else:
111 # Ensure is iterable.
112 if not isinstance(levels, Iterable):
113 levels = [levels]
115 # When no levels specified reject cube with level coordinate.
116 if len(levels) == 0:
118 def no_levels(cube):
119 # Reject cubes for which coordinate exists.
120 return not cube.coords(coordinate)
122 return iris.Constraint(cube_func=no_levels)
124 # Filter the coordinate to the desired levels.
125 # Dictionary unpacking is used to provide programmatic keyword arguments.
126 return iris.Constraint(**{coordinate: levels})
129def generate_cell_methods_constraint(
130 cell_methods: list,
131 varname: str | None = None,
132 coord: iris.coords.Coord | None = None,
133 interval: str | None = None,
134 comment: str | None = None,
135 **kwargs,
136) -> iris.Constraint:
137 """Generate constraint from cell methods.
139 Operator that takes a list of cell methods and generates a constraint from
140 that. Use [] to specify non-aggregated data.
142 Arguments
143 ---------
144 cell_methods: list
145 cube.cell_methods for filtering.
146 varname: str, optional
147 CF compliant name of variable.
148 coord: iris.coords.Coord, optional
149 iris.coords.Coord to which the cell method is applied to.
150 interval: str, optional
151 interval over which the cell method is applied to (e.g. 1 hour).
152 comment: str, optional
153 any comments in Cube meta data associated with the cell method.
155 Returns
156 -------
157 cell_method_constraint: iris.Constraint
158 """
159 if len(cell_methods) == 0:
161 def check_no_aggregation(cube: iris.cube.Cube) -> bool:
162 """Check that any cell methods are "point", meaning no aggregation."""
163 return set(cm.method for cm in cube.cell_methods) <= {"point"}
165 def check_cell_sum(cube: iris.cube.Cube) -> bool:
166 """Check that any cell methods are "sum"."""
167 return set(cm.method for cm in cube.cell_methods) == {"sum"}
169 if varname:
170 # Require number_of_lightning_flashes to be "sum" cell_method input.
171 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs.
172 if ("lightning" in varname) or ( 172 ↛ 179line 172 didn't jump to line 179 because the condition on line 172 was always true
173 "surface_microphysical" in varname and "amount" in varname
174 ):
175 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum)
176 return cell_methods_constraint
178 # If no variable name set, assume require instantaneous cube.
179 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation)
181 else:
182 # If cell_method constraint set in recipe, check for required input.
183 def check_cell_methods(cube: iris.cube.Cube) -> bool:
184 return all(
185 iris.coords.CellMethod(
186 method=cm, coords=coord, intervals=interval, comments=comment
187 )
188 in cube.cell_methods
189 for cm in cell_methods
190 )
192 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods)
194 return cell_methods_constraint
197def generate_time_constraint(
198 time_start: str, time_end: str = None, **kwargs
199) -> iris.Constraint:
200 """Generate constraint between times.
202 Operator that takes one or two ISO 8601 date strings, and returns a
203 constraint that selects values between those dates (inclusive).
205 Arguments
206 ---------
207 time_start: str | datetime.datetime | cftime.datetime
208 ISO date for lower bound
210 time_end: str | datetime.datetime | cftime.datetime
211 ISO date for upper bound. If omitted it defaults to the same as
212 time_start
214 Returns
215 -------
216 time_constraint: iris.Constraint
217 """
218 if isinstance(time_start, str):
219 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start)
220 else:
221 pdt_start, offset_start = time_start, timedelta(0)
223 if time_end is None:
224 pdt_end, offset_end = time_start, offset_start
225 elif isinstance(time_end, str):
226 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end)
227 print(pdt_end)
228 print(offset_end)
229 else:
230 pdt_end, offset_end = time_end, timedelta(0)
232 if offset_start is None:
233 offset_start = timedelta(0)
234 if offset_end is None:
235 offset_end = timedelta(0)
237 time_constraint = iris.Constraint(
238 time=lambda t: (
239 (pdt_start <= (t.point - offset_start))
240 and ((t.point - offset_end) <= pdt_end)
241 )
242 )
244 return time_constraint
247def generate_area_constraint(
248 lat_start: float | None,
249 lat_end: float | None,
250 lon_start: float | None,
251 lon_end: float | None,
252 **kwargs,
253) -> iris.Constraint:
254 """Generate an area constraint between latitude/longitude limits.
256 Operator that takes a set of latitude and longitude limits and returns a
257 constraint that selects grid values only inside that area. Works with the
258 data's native grid so is defined within the rotated pole CRS.
260 Alternatively, all arguments may be None to indicate the area should not be
261 constrained. This is useful to allow making subsetting an optional step in a
262 processing pipeline.
264 Arguments
265 ---------
266 lat_start: float | None
267 Latitude value for lower bound
268 lat_end: float | None
269 Latitude value for top bound
270 lon_start: float | None
271 Longitude value for left bound
272 lon_end: float | None
273 Longitude value for right bound
275 Returns
276 -------
277 area_constraint: iris.Constraint
278 """
279 # Check all arguments are defined, or all are None.
280 if not (
281 all(
282 (
283 isinstance(lat_start, numbers.Real),
284 isinstance(lat_end, numbers.Real),
285 isinstance(lon_start, numbers.Real),
286 isinstance(lon_end, numbers.Real),
287 )
288 )
289 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None))
290 ):
291 raise TypeError("Bounds must real numbers, or all None.")
293 # Don't constrain area if all arguments are None.
294 if lat_start is None: # Only need to check once, as they will be the same.
295 # An empty constraint allows everything.
296 return iris.Constraint()
298 # Handle bounds crossing the date line.
299 if lon_end < lon_start: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 lon_end = lon_end + 360
302 def bound_lat(cell: iris.coords.Cell) -> bool:
303 return lat_start < cell < lat_end
305 def bound_lon(cell: iris.coords.Cell) -> bool:
306 # Adjust cell values to handle crossing the date line.
307 if cell < lon_start:
308 cell = cell + 360
309 return lon_start < cell < lon_end
311 area_constraint = iris.Constraint(
312 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon}
313 )
314 return area_constraint
317def generate_remove_single_ensemble_member_constraint(
318 ensemble_member: int = 0, **kwargs
319) -> iris.Constraint:
320 """
321 Generate a constraint to remove a single ensemble member.
323 Operator that returns a constraint to remove the given ensemble member. By
324 default the ensemble member removed is the control member (assumed to have
325 a realization of zero). However, any ensemble member can be removed, thus
326 allowing a non-zero control member to be removed if the control is a
327 different member.
329 Arguments
330 ---------
331 ensemble_member: int
332 Default is 0. The ensemble member realization to remove.
334 Returns
335 -------
336 iris.Constraint
338 Notes
339 -----
340 This operator is primarily used to remove the control member to allow
341 ensemble metrics to be calculated without the control member. For
342 example, the ensemble mean is not normally calculated including the
343 control member. It is particularly useful to remove the control member
344 when it is not an equally-likely member of the ensemble.
345 """
346 return iris.Constraint(realization=lambda m: m.point != ensemble_member)
349def generate_realization_constraint(
350 ensemble_members: int | list[int], **kwargs
351) -> iris.Constraint:
352 """
353 Generate a constraint to subset ensemble members.
355 Operator that is given a list of ensemble members and returns a constraint
356 to select those ensemble members. This operator is particularly useful for
357 subsetting ensembles.
359 Arguments
360 ---------
361 ensemble_members: int | list[int]
362 The ensemble members to be subsetted over.
364 Returns
365 -------
366 iris.Constraint
367 """
368 # Ensure ensemble_members is iterable.
369 ensemble_members = iter_maybe(ensemble_members)
370 return iris.Constraint(realization=ensemble_members)
373def generate_hour_constraint(
374 hour_start: int,
375 hour_end: int = None,
376 **kwargs,
377) -> iris.Constraint:
378 """Generate an hour constraint between hour of day limits.
380 Operator that takes a set of hour of day limits and returns a constraint that
381 selects only hours within that time frame regardless of day.
383 Alternatively, the result can be constrained to a single hour by just entering
384 a starting hour.
386 Should any sub-hourly data be given these will have the same hour coordinate
387 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all
388 times will be selected with this constraint.
390 Arguments
391 ---------
392 hour_start: int
393 The hour of day for the lower bound, within 0 to 23.
394 hour_end: int | None
395 The hour of day for the upper bound, within 0 to 23. Alternatively,
396 set to None if only one hour required.
398 Returns
399 -------
400 hour_constraint: iris.Constraint
402 Raises
403 ------
404 ValueError
405 If the provided arguments are outside of the range 0 to 23.
406 """
407 if hour_end is None:
408 hour_end = hour_start
410 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23):
411 raise ValueError("Hours must be between 0 and 23 inclusive.")
413 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end)
414 return hour_constraint
417def combine_constraints(
418 constraint: iris.Constraint = None, **kwargs
419) -> iris.Constraint:
420 """
421 Operator that combines multiple constraints into one.
423 Arguments
424 ---------
425 constraint: iris.Constraint
426 First constraint to combine.
427 additional_constraint_1: iris.Constraint
428 Second constraint to combine. This must be a named argument.
429 additional_constraint_2: iris.Constraint
430 There can be any number of additional constraint, they just need unique
431 names.
432 ...
434 Returns
435 -------
436 combined_constraint: iris.Constraint
438 Raises
439 ------
440 TypeError
441 If the provided arguments are not constraints.
442 """
443 # If the first argument is not a constraint, it is ignored. This handles the
444 # automatic passing of the previous step's output.
445 if isinstance(constraint, iris.Constraint):
446 combined_constraint = constraint
447 else:
448 combined_constraint = iris.Constraint()
450 for constr in kwargs.values():
451 combined_constraint = combined_constraint & constr
452 return combined_constraint