Coverage for src/CSET/operators/constraints.py: 93%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-30 13:16 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 varname_constraint: iris.Constraint 

67 """ 

68 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname): 

69 varname_constraint = iris.AttributeConstraint(STASH=varname) 

70 else: 

71 varname_constraint = iris.Constraint(name=varname) 

72 

73 # Ensure access to variable vector components for computed fields 

74 if varname == "wind_speed_at_10m": 

75 varname_constraint = iris.Constraint( 

76 cube_func=lambda cube: ( 

77 cube.long_name 

78 in ["wind_at_10m", "eastward_wind_at_10m", "northward_wind_at_10m"] 

79 ) 

80 ) 

81 

82 return varname_constraint 

83 

84 

85def generate_level_constraint( 

86 coordinate: str, levels: int | list[int] | str, **kwargs 

87) -> iris.Constraint: 

88 """Generate constraint for particular levels on the specified coordinate. 

89 

90 Operator that generates a constraint to constrain to specific model or 

91 pressure levels. If no levels are specified then any cube with the specified 

92 coordinate is rejected. 

93 

94 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

95 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

96 

97 Arguments 

98 --------- 

99 coordinate: str 

100 Level coordinate name about which to constraint. 

101 levels: int | list[int] | str 

102 CF compliant level points, ``"*"`` for retrieving all levels, or 

103 ``[]`` for no levels. 

104 

105 Returns 

106 ------- 

107 constraint: iris.Constraint 

108 

109 Notes 

110 ----- 

111 Due to the specification of ``coordinate`` as an argument any iterable 

112 coordinate can be stratified with this function. Therefore, 

113 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

114 ensemble members, or group of ensemble members you wish to constrain your 

115 results over. 

116 """ 

117 # If asterisks, then return all levels for given coordinate. 

118 if levels == "*": 

119 return iris.Constraint(**{coordinate: lambda cell: True}) 

120 else: 

121 # Ensure is iterable. 

122 if not isinstance(levels, Iterable): 

123 levels = [levels] 

124 

125 # When no levels specified reject cube with level coordinate. 

126 if len(levels) == 0: 

127 

128 def no_levels(cube): 

129 # Reject cubes for which coordinate exists. 

130 return not cube.coords(coordinate) 

131 

132 return iris.Constraint(cube_func=no_levels) 

133 

134 # Filter the coordinate to the desired levels. 

135 # Dictionary unpacking is used to provide programmatic keyword arguments. 

136 return iris.Constraint(**{coordinate: levels}) 

137 

138 

139def generate_remove_single_level_constraint( 

140 coord: str, level: int = 0, **kwargs 

141) -> iris.Constraint: 

142 """ 

143 Generate a constraint to remove a single model level number. 

144 

145 Operator that returns a constraint to remove the given level. By 

146 default the first level is removed (assumed to be 

147 level zero). However, any level can be removed. 

148 

149 Arguments 

150 --------- 

151 coord: str 

152 The coordinate for which the level is to be removed. 

153 level: int 

154 Default is 0. The model level number to remove. 

155 

156 Returns 

157 ------- 

158 iris.Constraint 

159 

160 Notes 

161 ----- 

162 This operator is primarily used to ensure the levels are consistent 

163 as some level sets (e.g. specific humidity) will be on the same level set 

164 but have a different number of levels (e.g 71 instead of expected 70). 

165 """ 

166 return iris.Constraint(**{coord: lambda m: m.point != level}) 

167 

168 

169def generate_cell_methods_constraint( 

170 cell_methods: list, 

171 varname: str | None = None, 

172 coord: iris.coords.Coord | None = None, 

173 interval: str | None = None, 

174 comment: str | None = None, 

175 **kwargs, 

176) -> iris.Constraint: 

177 """Generate constraint from cell methods. 

178 

179 Operator that takes a list of cell methods and generates a constraint from 

180 that. Use [] to specify non-aggregated data. 

181 

182 Arguments 

183 --------- 

184 cell_methods: list 

185 cube.cell_methods for filtering. 

186 varname: str, optional 

187 CF compliant name of variable. 

188 coord: iris.coords.Coord, optional 

189 iris.coords.Coord to which the cell method is applied to. 

190 interval: str, optional 

191 interval over which the cell method is applied to (e.g. 1 hour). 

192 comment: str, optional 

193 any comments in Cube meta data associated with the cell method. 

194 

195 Returns 

196 ------- 

197 cell_method_constraint: iris.Constraint 

198 """ 

199 if len(cell_methods) == 0: 

200 

201 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

202 """Check that any cell methods are "point", meaning no aggregation.""" 

203 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

204 

205 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

206 """Check that any cell methods are "sum".""" 

207 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

208 

209 def check_cell_mean(cube: iris.cube.Cube) -> bool: 

210 """Check that any cell methods are "mean".""" 

211 return set(cm.method for cm in cube.cell_methods) == {"mean"} 

212 

213 if varname: 

214 # Require number_of_lightning_flashes to be "sum" cell_method input. 

215 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

216 if ("lightning" in varname) or ( 

217 "surface_microphysical" in varname and "amount" in varname 

218 ): 

219 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

220 return cell_methods_constraint 

221 # Require climatological ancillary as time-average mean. 

222 if ("albedo" in varname) or ( 222 ↛ 229line 222 didn't jump to line 229 because the condition on line 222 was always true

223 "ocean" in varname and "chlorophyll" in varname 

224 ): 

225 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean) 

226 return cell_methods_constraint 

227 

228 # If no variable name set, assume require instantaneous cube. 

229 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

230 

231 else: 

232 # If cell_method constraint set in recipe, check for required input. 

233 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

234 return all( 

235 iris.coords.CellMethod( 

236 method=cm, coords=coord, intervals=interval, comments=comment 

237 ) 

238 in cube.cell_methods 

239 for cm in cell_methods 

240 ) 

241 

242 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

243 

244 return cell_methods_constraint 

245 

246 

247def generate_time_constraint( 

248 time_start: str, time_end: str = None, **kwargs 

249) -> iris.Constraint: 

250 """Generate constraint between times. 

251 

252 Operator that takes one or two ISO 8601 date strings, and returns a 

253 constraint that selects values between those dates (inclusive). 

254 

255 Arguments 

256 --------- 

257 time_start: str | datetime.datetime | cftime.datetime 

258 ISO date for lower bound 

259 

260 time_end: str | datetime.datetime | cftime.datetime 

261 ISO date for upper bound. If omitted it defaults to the same as 

262 time_start 

263 

264 Returns 

265 ------- 

266 time_constraint: iris.Constraint 

267 """ 

268 if isinstance(time_start, str): 

269 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

270 else: 

271 pdt_start, offset_start = time_start, timedelta(0) 

272 

273 if time_end is None: 

274 pdt_end, offset_end = time_start, offset_start 

275 elif isinstance(time_end, str): 

276 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

277 print(pdt_end) 

278 print(offset_end) 

279 else: 

280 pdt_end, offset_end = time_end, timedelta(0) 

281 

282 if offset_start is None: 

283 offset_start = timedelta(0) 

284 if offset_end is None: 

285 offset_end = timedelta(0) 

286 

287 time_constraint = iris.Constraint( 

288 time=lambda t: ( 

289 (pdt_start <= (t.point - offset_start)) 

290 and ((t.point - offset_end) <= pdt_end) 

291 ) 

292 ) 

293 

294 return time_constraint 

295 

296 

297def generate_area_constraint( 

298 lat_start: float | None, 

299 lat_end: float | None, 

300 lon_start: float | None, 

301 lon_end: float | None, 

302 **kwargs, 

303) -> iris.Constraint: 

304 """Generate an area constraint between latitude/longitude limits. 

305 

306 Operator that takes a set of latitude and longitude limits and returns a 

307 constraint that selects grid values only inside that area. Works with the 

308 data's native grid so is defined within the rotated pole CRS. 

309 

310 Alternatively, all arguments may be None to indicate the area should not be 

311 constrained. This is useful to allow making subsetting an optional step in a 

312 processing pipeline. 

313 

314 Arguments 

315 --------- 

316 lat_start: float | None 

317 Latitude value for lower bound 

318 lat_end: float | None 

319 Latitude value for top bound 

320 lon_start: float | None 

321 Longitude value for left bound 

322 lon_end: float | None 

323 Longitude value for right bound 

324 

325 Returns 

326 ------- 

327 area_constraint: iris.Constraint 

328 """ 

329 # Check all arguments are defined, or all are None. 

330 if not ( 

331 all( 

332 ( 

333 isinstance(lat_start, numbers.Real), 

334 isinstance(lat_end, numbers.Real), 

335 isinstance(lon_start, numbers.Real), 

336 isinstance(lon_end, numbers.Real), 

337 ) 

338 ) 

339 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

340 ): 

341 raise TypeError("Bounds must real numbers, or all None.") 

342 

343 # Don't constrain area if all arguments are None. 

344 if lat_start is None: # Only need to check once, as they will be the same. 

345 # An empty constraint allows everything. 

346 return iris.Constraint() 

347 

348 # Handle bounds crossing the date line. 

349 if lon_end < lon_start: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 lon_end = lon_end + 360 

351 

352 def bound_lat(cell: iris.coords.Cell) -> bool: 

353 return lat_start < cell < lat_end 

354 

355 def bound_lon(cell: iris.coords.Cell) -> bool: 

356 # Adjust cell values to handle crossing the date line. 

357 if cell < lon_start: 

358 cell = cell + 360 

359 return lon_start < cell < lon_end 

360 

361 area_constraint = iris.Constraint( 

362 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

363 ) 

364 return area_constraint 

365 

366 

367def generate_remove_single_ensemble_member_constraint( 

368 ensemble_member: int = 0, **kwargs 

369) -> iris.Constraint: 

370 """ 

371 Generate a constraint to remove a single ensemble member. 

372 

373 Operator that returns a constraint to remove the given ensemble member. By 

374 default the ensemble member removed is the control member (assumed to have 

375 a realization of zero). However, any ensemble member can be removed, thus 

376 allowing a non-zero control member to be removed if the control is a 

377 different member. 

378 

379 Arguments 

380 --------- 

381 ensemble_member: int 

382 Default is 0. The ensemble member realization to remove. 

383 

384 Returns 

385 ------- 

386 iris.Constraint 

387 

388 Notes 

389 ----- 

390 This operator is primarily used to remove the control member to allow 

391 ensemble metrics to be calculated without the control member. For 

392 example, the ensemble mean is not normally calculated including the 

393 control member. It is particularly useful to remove the control member 

394 when it is not an equally-likely member of the ensemble. 

395 """ 

396 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

397 

398 

399def generate_realization_constraint( 

400 ensemble_members: int | list[int], **kwargs 

401) -> iris.Constraint: 

402 """ 

403 Generate a constraint to subset ensemble members. 

404 

405 Operator that is given a list of ensemble members and returns a constraint 

406 to select those ensemble members. This operator is particularly useful for 

407 subsetting ensembles. 

408 

409 Arguments 

410 --------- 

411 ensemble_members: int | list[int] 

412 The ensemble members to be subsetted over. 

413 

414 Returns 

415 ------- 

416 iris.Constraint 

417 """ 

418 # Ensure ensemble_members is iterable. 

419 ensemble_members = iter_maybe(ensemble_members) 

420 return iris.Constraint(realization=ensemble_members) 

421 

422 

423def generate_hour_constraint( 

424 hour_start: int, 

425 hour_end: int = None, 

426 **kwargs, 

427) -> iris.Constraint: 

428 """Generate an hour constraint between hour of day limits. 

429 

430 Operator that takes a set of hour of day limits and returns a constraint that 

431 selects only hours within that time frame regardless of day. 

432 

433 Alternatively, the result can be constrained to a single hour by just entering 

434 a starting hour. 

435 

436 Should any sub-hourly data be given these will have the same hour coordinate 

437 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

438 times will be selected with this constraint. 

439 

440 Arguments 

441 --------- 

442 hour_start: int 

443 The hour of day for the lower bound, within 0 to 23. 

444 hour_end: int | None 

445 The hour of day for the upper bound, within 0 to 23. Alternatively, 

446 set to None if only one hour required. 

447 

448 Returns 

449 ------- 

450 hour_constraint: iris.Constraint 

451 

452 Raises 

453 ------ 

454 ValueError 

455 If the provided arguments are outside of the range 0 to 23. 

456 """ 

457 if hour_end is None: 

458 hour_end = hour_start 

459 

460 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

461 raise ValueError("Hours must be between 0 and 23 inclusive.") 

462 

463 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

464 return hour_constraint 

465 

466 

467def combine_constraints( 

468 constraint: iris.Constraint = None, **kwargs 

469) -> iris.Constraint: 

470 """ 

471 Operator that combines multiple constraints into one. 

472 

473 Arguments 

474 --------- 

475 constraint: iris.Constraint 

476 First constraint to combine. 

477 additional_constraint_1: iris.Constraint 

478 Second constraint to combine. This must be a named argument. 

479 additional_constraint_2: iris.Constraint 

480 There can be any number of additional constraint, they just need unique 

481 names. 

482 ... 

483 

484 Returns 

485 ------- 

486 combined_constraint: iris.Constraint 

487 

488 Raises 

489 ------ 

490 TypeError 

491 If the provided arguments are not constraints. 

492 """ 

493 # If the first argument is not a constraint, it is ignored. This handles the 

494 # automatic passing of the previous step's output. 

495 if isinstance(constraint, iris.Constraint): 

496 combined_constraint = constraint 

497 else: 

498 combined_constraint = iris.Constraint() 

499 

500 for constr in kwargs.values(): 

501 combined_constraint = combined_constraint & constr 

502 return combined_constraint 

503 

504 

505def generate_attribute_constraint( 

506 attribute: str, value: str = None, **kwargs 

507) -> iris.AttributeConstraint: 

508 """Generate constraint on cube attributes. 

509 

510 Constrains based on the presence of an attribute, and that attribute having 

511 a particular value. 

512 

513 Arguments 

514 --------- 

515 attribute: str 

516 Attribute to constraint on. 

517 

518 value: str 

519 Attribute value to constrain on. If omitted the constraint merely checks 

520 for the presence of an attribute. 

521 

522 Returns 

523 ------- 

524 attribute_constraint: iris.Constraint 

525 """ 

526 if value is None: 

527 attribute_constraint = iris.Constraint( 

528 cube_func=lambda cube: attribute in cube.attributes 

529 ) 

530 else: 

531 attribute_constraint = iris.AttributeConstraint(**{attribute: value}) 

532 return attribute_constraint