Coverage for src/CSET/operators/constraints.py: 93%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-27 15:22 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 An Iris constraint for either: 

67 - a single UM STASH code 

68 - a single variable name 

69 - a list of variable names (Cardington multi-input case) 

70 """ 

71 _STASH_RE = re.compile(r"m\d{2}s\d{2}i\d{3}$") 

72 # ---- CASE 1: list of variable names (e.g. Cardington multi-variable) ---- 

73 if isinstance(varname, (list, tuple)): 

74 return iris.Constraint( 

75 cube_func=lambda cube: ( 

76 cube.long_name in varname 

77 or cube.standard_name in varname 

78 or cube.var_name in varname 

79 ) 

80 ) 

81 

82 # ---- CASE 2: single UM STASH code ---- 

83 if _STASH_RE.match(varname): 

84 return iris.AttributeConstraint(STASH=varname) 

85 

86 # ---- CASE 3: single variable name ---- 

87 return iris.Constraint(name=varname) 

88 

89 

90def generate_level_constraint( 

91 coordinate: str, levels: int | list[int] | str, **kwargs 

92) -> iris.Constraint: 

93 """Generate constraint for particular levels on the specified coordinate. 

94 

95 Operator that generates a constraint to constrain to specific model or 

96 pressure levels. If no levels are specified then any cube with the specified 

97 coordinate is rejected. 

98 

99 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

100 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

101 

102 Arguments 

103 --------- 

104 coordinate: str 

105 Level coordinate name about which to constraint. 

106 levels: int | list[int] | str 

107 CF compliant level points, ``"*"`` for retrieving all levels, or 

108 ``[]`` for no levels. 

109 

110 Returns 

111 ------- 

112 constraint: iris.Constraint 

113 

114 Notes 

115 ----- 

116 Due to the specification of ``coordinate`` as an argument any iterable 

117 coordinate can be stratified with this function. Therefore, 

118 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

119 ensemble members, or group of ensemble members you wish to constrain your 

120 results over. 

121 """ 

122 # If asterisks, then return all levels for given coordinate. 

123 if levels == "*": 

124 return iris.Constraint(**{coordinate: lambda cell: True}) 

125 else: 

126 # Ensure is iterable. 

127 if not isinstance(levels, Iterable): 

128 levels = [levels] 

129 

130 # When no levels specified reject cube with level coordinate. 

131 if len(levels) == 0: 

132 

133 def no_levels(cube): 

134 # Reject cubes for which coordinate exists. 

135 return not cube.coords(coordinate) 

136 

137 return iris.Constraint(cube_func=no_levels) 

138 

139 # Filter the coordinate to the desired levels. 

140 # Dictionary unpacking is used to provide programmatic keyword arguments. 

141 return iris.Constraint(**{coordinate: levels}) 

142 

143 

144def generate_remove_single_level_constraint( 

145 coord: str, level: int = 0, **kwargs 

146) -> iris.Constraint: 

147 """ 

148 Generate a constraint to remove a single model level number. 

149 

150 Operator that returns a constraint to remove the given level. By 

151 default the first level is removed (assumed to be 

152 level zero). However, any level can be removed. 

153 

154 Arguments 

155 --------- 

156 coord: str 

157 The coordinate for which the level is to be removed. 

158 level: int 

159 Default is 0. The model level number to remove. 

160 

161 Returns 

162 ------- 

163 iris.Constraint 

164 

165 Notes 

166 ----- 

167 This operator is primarily used to ensure the levels are consistent 

168 as some level sets (e.g. specific humidity) will be on the same level set 

169 but have a different number of levels (e.g 71 instead of expected 70). 

170 """ 

171 return iris.Constraint(**{coord: lambda m: m.point != level}) 

172 

173 

174def generate_cell_methods_constraint( 

175 cell_methods: list, 

176 varname: str | None = None, 

177 coord: iris.coords.Coord | None = None, 

178 interval: str | None = None, 

179 comment: str | None = None, 

180 **kwargs, 

181) -> iris.Constraint: 

182 """Generate constraint from cell methods. 

183 

184 Operator that takes a list of cell methods and generates a constraint from 

185 that. Use [] to specify non-aggregated data. 

186 

187 Arguments 

188 --------- 

189 cell_methods: list 

190 cube.cell_methods for filtering. 

191 varname: str, optional 

192 CF compliant name of variable. 

193 coord: iris.coords.Coord, optional 

194 iris.coords.Coord to which the cell method is applied to. 

195 interval: str, optional 

196 interval over which the cell method is applied to (e.g. 1 hour). 

197 comment: str, optional 

198 any comments in Cube meta data associated with the cell method. 

199 

200 Returns 

201 ------- 

202 cell_method_constraint: iris.Constraint 

203 """ 

204 if len(cell_methods) == 0: 

205 

206 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

207 """Check that any cell methods are "point", meaning no aggregation.""" 

208 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

209 

210 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

211 """Check that any cell methods are "sum".""" 

212 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

213 

214 def check_cell_mean(cube: iris.cube.Cube) -> bool: 

215 """Check that any cell methods are "mean".""" 

216 return set(cm.method for cm in cube.cell_methods) == {"mean"} 

217 

218 if varname: 

219 # Require number_of_lightning_flashes to be "sum" cell_method input. 

220 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

221 if ("lightning" in varname) or ( 

222 "surface_microphysical" in varname and "amount" in varname 

223 ): 

224 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

225 return cell_methods_constraint 

226 # Require climatological ancillary as time-average mean. 

227 if ("albedo" in varname) or ( 227 ↛ 234line 227 didn't jump to line 234 because the condition on line 227 was always true

228 "ocean" in varname and "chlorophyll" in varname 

229 ): 

230 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean) 

231 return cell_methods_constraint 

232 

233 # If no variable name set, assume require instantaneous cube. 

234 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

235 

236 else: 

237 # If cell_method constraint set in recipe, check for required input. 

238 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

239 return all( 

240 iris.coords.CellMethod( 

241 method=cm, coords=coord, intervals=interval, comments=comment 

242 ) 

243 in cube.cell_methods 

244 for cm in cell_methods 

245 ) 

246 

247 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

248 

249 return cell_methods_constraint 

250 

251 

252def generate_time_constraint( 

253 time_start: str, time_end: str = None, **kwargs 

254) -> iris.Constraint: 

255 """Generate constraint between times. 

256 

257 Operator that takes one or two ISO 8601 date strings, and returns a 

258 constraint that selects values between those dates (inclusive). 

259 

260 Arguments 

261 --------- 

262 time_start: str | datetime.datetime | cftime.datetime 

263 ISO date for lower bound 

264 

265 time_end: str | datetime.datetime | cftime.datetime 

266 ISO date for upper bound. If omitted it defaults to the same as 

267 time_start 

268 

269 Returns 

270 ------- 

271 time_constraint: iris.Constraint 

272 """ 

273 if isinstance(time_start, str): 

274 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

275 else: 

276 pdt_start, offset_start = time_start, timedelta(0) 

277 

278 if time_end is None: 

279 pdt_end, offset_end = time_start, offset_start 

280 elif isinstance(time_end, str): 

281 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

282 print(pdt_end) 

283 print(offset_end) 

284 else: 

285 pdt_end, offset_end = time_end, timedelta(0) 

286 

287 if offset_start is None: 

288 offset_start = timedelta(0) 

289 if offset_end is None: 

290 offset_end = timedelta(0) 

291 

292 time_constraint = iris.Constraint( 

293 time=lambda t: ( 

294 (pdt_start <= (t.point - offset_start)) 

295 and ((t.point - offset_end) <= pdt_end) 

296 ) 

297 ) 

298 

299 return time_constraint 

300 

301 

302def generate_area_constraint( 

303 lat_start: float | None, 

304 lat_end: float | None, 

305 lon_start: float | None, 

306 lon_end: float | None, 

307 **kwargs, 

308) -> iris.Constraint: 

309 """Generate an area constraint between latitude/longitude limits. 

310 

311 Operator that takes a set of latitude and longitude limits and returns a 

312 constraint that selects grid values only inside that area. Works with the 

313 data's native grid so is defined within the rotated pole CRS. 

314 

315 Alternatively, all arguments may be None to indicate the area should not be 

316 constrained. This is useful to allow making subsetting an optional step in a 

317 processing pipeline. 

318 

319 Arguments 

320 --------- 

321 lat_start: float | None 

322 Latitude value for lower bound 

323 lat_end: float | None 

324 Latitude value for top bound 

325 lon_start: float | None 

326 Longitude value for left bound 

327 lon_end: float | None 

328 Longitude value for right bound 

329 

330 Returns 

331 ------- 

332 area_constraint: iris.Constraint 

333 """ 

334 # Check all arguments are defined, or all are None. 

335 if not ( 

336 all( 

337 ( 

338 isinstance(lat_start, numbers.Real), 

339 isinstance(lat_end, numbers.Real), 

340 isinstance(lon_start, numbers.Real), 

341 isinstance(lon_end, numbers.Real), 

342 ) 

343 ) 

344 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

345 ): 

346 raise TypeError("Bounds must real numbers, or all None.") 

347 

348 # Don't constrain area if all arguments are None. 

349 if lat_start is None: # Only need to check once, as they will be the same. 

350 # An empty constraint allows everything. 

351 return iris.Constraint() 

352 

353 # Handle bounds crossing the date line. 

354 if lon_end < lon_start: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 lon_end = lon_end + 360 

356 

357 def bound_lat(cell: iris.coords.Cell) -> bool: 

358 return lat_start < cell < lat_end 

359 

360 def bound_lon(cell: iris.coords.Cell) -> bool: 

361 # Adjust cell values to handle crossing the date line. 

362 if cell < lon_start: 

363 cell = cell + 360 

364 return lon_start < cell < lon_end 

365 

366 area_constraint = iris.Constraint( 

367 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

368 ) 

369 return area_constraint 

370 

371 

372def generate_remove_single_ensemble_member_constraint( 

373 ensemble_member: int = 0, **kwargs 

374) -> iris.Constraint: 

375 """ 

376 Generate a constraint to remove a single ensemble member. 

377 

378 Operator that returns a constraint to remove the given ensemble member. By 

379 default the ensemble member removed is the control member (assumed to have 

380 a realization of zero). However, any ensemble member can be removed, thus 

381 allowing a non-zero control member to be removed if the control is a 

382 different member. 

383 

384 Arguments 

385 --------- 

386 ensemble_member: int 

387 Default is 0. The ensemble member realization to remove. 

388 

389 Returns 

390 ------- 

391 iris.Constraint 

392 

393 Notes 

394 ----- 

395 This operator is primarily used to remove the control member to allow 

396 ensemble metrics to be calculated without the control member. For 

397 example, the ensemble mean is not normally calculated including the 

398 control member. It is particularly useful to remove the control member 

399 when it is not an equally-likely member of the ensemble. 

400 """ 

401 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

402 

403 

404def generate_realization_constraint( 

405 ensemble_members: int | list[int], **kwargs 

406) -> iris.Constraint: 

407 """ 

408 Generate a constraint to subset ensemble members. 

409 

410 Operator that is given a list of ensemble members and returns a constraint 

411 to select those ensemble members. This operator is particularly useful for 

412 subsetting ensembles. 

413 

414 Arguments 

415 --------- 

416 ensemble_members: int | list[int] 

417 The ensemble members to be subsetted over. 

418 

419 Returns 

420 ------- 

421 iris.Constraint 

422 """ 

423 # Ensure ensemble_members is iterable. 

424 ensemble_members = iter_maybe(ensemble_members) 

425 return iris.Constraint(realization=ensemble_members) 

426 

427 

428def generate_hour_constraint( 

429 hour_start: int, 

430 hour_end: int = None, 

431 **kwargs, 

432) -> iris.Constraint: 

433 """Generate an hour constraint between hour of day limits. 

434 

435 Operator that takes a set of hour of day limits and returns a constraint that 

436 selects only hours within that time frame regardless of day. 

437 

438 Alternatively, the result can be constrained to a single hour by just entering 

439 a starting hour. 

440 

441 Should any sub-hourly data be given these will have the same hour coordinate 

442 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

443 times will be selected with this constraint. 

444 

445 Arguments 

446 --------- 

447 hour_start: int 

448 The hour of day for the lower bound, within 0 to 23. 

449 hour_end: int | None 

450 The hour of day for the upper bound, within 0 to 23. Alternatively, 

451 set to None if only one hour required. 

452 

453 Returns 

454 ------- 

455 hour_constraint: iris.Constraint 

456 

457 Raises 

458 ------ 

459 ValueError 

460 If the provided arguments are outside of the range 0 to 23. 

461 """ 

462 if hour_end is None: 

463 hour_end = hour_start 

464 

465 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

466 raise ValueError("Hours must be between 0 and 23 inclusive.") 

467 

468 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

469 return hour_constraint 

470 

471 

472def combine_constraints( 

473 constraint: iris.Constraint = None, **kwargs 

474) -> iris.Constraint: 

475 """ 

476 Operator that combines multiple constraints into one. 

477 

478 Arguments 

479 --------- 

480 constraint: iris.Constraint 

481 First constraint to combine. 

482 additional_constraint_1: iris.Constraint 

483 Second constraint to combine. This must be a named argument. 

484 additional_constraint_2: iris.Constraint 

485 There can be any number of additional constraint, they just need unique 

486 names. 

487 ... 

488 

489 Returns 

490 ------- 

491 combined_constraint: iris.Constraint 

492 

493 Raises 

494 ------ 

495 TypeError 

496 If the provided arguments are not constraints. 

497 """ 

498 # If the first argument is not a constraint, it is ignored. This handles the 

499 # automatic passing of the previous step's output. 

500 if isinstance(constraint, iris.Constraint): 

501 combined_constraint = constraint 

502 else: 

503 combined_constraint = iris.Constraint() 

504 

505 for constr in kwargs.values(): 

506 combined_constraint = combined_constraint & constr 

507 return combined_constraint 

508 

509 

510def generate_attribute_constraint( 

511 attribute: str, value: str = None, **kwargs 

512) -> iris.AttributeConstraint: 

513 """Generate constraint on cube attributes. 

514 

515 Constrains based on the presence of an attribute, and that attribute having 

516 a particular value. 

517 

518 Arguments 

519 --------- 

520 attribute: str 

521 Attribute to constraint on. 

522 

523 value: str 

524 Attribute value to constrain on. If omitted the constraint merely checks 

525 for the presence of an attribute. 

526 

527 Returns 

528 ------- 

529 attribute_constraint: iris.Constraint 

530 """ 

531 if value is None: 

532 attribute_constraint = iris.Constraint( 

533 cube_func=lambda cube: attribute in cube.attributes 

534 ) 

535 else: 

536 attribute_constraint = iris.AttributeConstraint(**{attribute: value}) 

537 return attribute_constraint