Coverage for src / CSET / operators / constraints.py: 91%

105 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 14:01 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 An Iris constraint for either: 

67 - a single UM STASH code 

68 - a single variable name 

69 - a list of variable names (Cardington multi-input case) 

70 """ 

71 _STASH_RE = re.compile(r"m\d{2}s\d{2}i\d{3}$") 

72 # ---- CASE 1: list of variable names (e.g. Cardington multi-variable) ---- 

73 if isinstance(varname, (list, tuple)): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return iris.Constraint( 

75 cube_func=lambda cube: ( 

76 cube.var_name in varname 

77 or cube.standard_name in varname 

78 or cube.name() in varname 

79 ) 

80 ) 

81 

82 # ---- CASE 2: single UM STASH code ---- 

83 if _STASH_RE.match(varname): 

84 return iris.AttributeConstraint(STASH=varname) 

85 

86 # ---- CASE 3: single variable name ---- 

87 return iris.Constraint(name=varname) 

88 

89 

90def generate_level_constraint( 

91 coordinate: str, levels: int | list[int] | str, **kwargs 

92) -> iris.Constraint: 

93 """Generate constraint for particular levels on the specified coordinate. 

94 

95 Operator that generates a constraint to constrain to specific model or 

96 pressure levels. If no levels are specified then any cube with the specified 

97 coordinate is rejected. 

98 

99 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

100 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

101 

102 Arguments 

103 --------- 

104 coordinate: str 

105 Level coordinate name about which to constraint. 

106 levels: int | list[int] | str 

107 CF compliant level points, ``"*"`` for retrieving all levels, or 

108 ``[]`` for no levels. 

109 

110 Returns 

111 ------- 

112 constraint: iris.Constraint 

113 

114 Notes 

115 ----- 

116 Due to the specification of ``coordinate`` as an argument any iterable 

117 coordinate can be stratified with this function. Therefore, 

118 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

119 ensemble members, or group of ensemble members you wish to constrain your 

120 results over. 

121 """ 

122 # If asterisks, then return all levels for given coordinate. 

123 if levels == "*": 

124 return iris.Constraint(**{coordinate: lambda cell: True}) 

125 else: 

126 # Ensure is iterable. 

127 if not isinstance(levels, Iterable): 

128 levels = [levels] 

129 

130 # When no levels specified reject cube with level coordinate. 

131 if len(levels) == 0: 

132 

133 def no_levels(cube): 

134 # Reject cubes for which coordinate exists. 

135 return not cube.coords(coordinate) 

136 

137 return iris.Constraint(cube_func=no_levels) 

138 

139 # Filter the coordinate to the desired levels. 

140 # Dictionary unpacking is used to provide programmatic keyword arguments. 

141 return iris.Constraint(**{coordinate: levels}) 

142 

143 

144def generate_cell_methods_constraint( 

145 cell_methods: list, 

146 varname: str | None = None, 

147 coord: iris.coords.Coord | None = None, 

148 interval: str | None = None, 

149 comment: str | None = None, 

150 **kwargs, 

151) -> iris.Constraint: 

152 """Generate constraint from cell methods. 

153 

154 Operator that takes a list of cell methods and generates a constraint from 

155 that. Use [] to specify non-aggregated data. 

156 

157 Arguments 

158 --------- 

159 cell_methods: list 

160 cube.cell_methods for filtering. 

161 varname: str, optional 

162 CF compliant name of variable. 

163 coord: iris.coords.Coord, optional 

164 iris.coords.Coord to which the cell method is applied to. 

165 interval: str, optional 

166 interval over which the cell method is applied to (e.g. 1 hour). 

167 comment: str, optional 

168 any comments in Cube meta data associated with the cell method. 

169 

170 Returns 

171 ------- 

172 cell_method_constraint: iris.Constraint 

173 """ 

174 if len(cell_methods) == 0: 

175 

176 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

177 """Check that any cell methods are "point", meaning no aggregation.""" 

178 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

179 

180 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

181 """Check that any cell methods are "sum".""" 

182 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

183 

184 def check_cell_mean(cube: iris.cube.Cube) -> bool: 

185 """Check that any cell methods are "mean".""" 

186 return set(cm.method for cm in cube.cell_methods) == {"mean"} 

187 

188 if varname: 

189 # Require number_of_lightning_flashes to be "sum" cell_method input. 

190 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

191 if ("lightning" in varname) or ( 

192 "surface_microphysical" in varname and "amount" in varname 

193 ): 

194 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

195 return cell_methods_constraint 

196 # Require climatological ancillary as time-average mean. 

197 if ("albedo" in varname) or ( 197 ↛ 204line 197 didn't jump to line 204 because the condition on line 197 was always true

198 "ocean" in varname and "chlorophyll" in varname 

199 ): 

200 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean) 

201 return cell_methods_constraint 

202 

203 # If no variable name set, assume require instantaneous cube. 

204 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

205 

206 else: 

207 # If cell_method constraint set in recipe, check for required input. 

208 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

209 return all( 

210 iris.coords.CellMethod( 

211 method=cm, coords=coord, intervals=interval, comments=comment 

212 ) 

213 in cube.cell_methods 

214 for cm in cell_methods 

215 ) 

216 

217 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

218 

219 return cell_methods_constraint 

220 

221 

222def generate_time_constraint( 

223 time_start: str, time_end: str = None, **kwargs 

224) -> iris.Constraint: 

225 """Generate constraint between times. 

226 

227 Operator that takes one or two ISO 8601 date strings, and returns a 

228 constraint that selects values between those dates (inclusive). 

229 

230 Arguments 

231 --------- 

232 time_start: str | datetime.datetime | cftime.datetime 

233 ISO date for lower bound 

234 

235 time_end: str | datetime.datetime | cftime.datetime 

236 ISO date for upper bound. If omitted it defaults to the same as 

237 time_start 

238 

239 Returns 

240 ------- 

241 time_constraint: iris.Constraint 

242 """ 

243 if isinstance(time_start, str): 

244 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

245 else: 

246 pdt_start, offset_start = time_start, timedelta(0) 

247 

248 if time_end is None: 

249 pdt_end, offset_end = time_start, offset_start 

250 elif isinstance(time_end, str): 

251 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

252 print(pdt_end) 

253 print(offset_end) 

254 else: 

255 pdt_end, offset_end = time_end, timedelta(0) 

256 

257 if offset_start is None: 

258 offset_start = timedelta(0) 

259 if offset_end is None: 

260 offset_end = timedelta(0) 

261 

262 time_constraint = iris.Constraint( 

263 time=lambda t: ( 

264 (pdt_start <= (t.point - offset_start)) 

265 and ((t.point - offset_end) <= pdt_end) 

266 ) 

267 ) 

268 

269 return time_constraint 

270 

271 

272def generate_area_constraint( 

273 lat_start: float | None, 

274 lat_end: float | None, 

275 lon_start: float | None, 

276 lon_end: float | None, 

277 **kwargs, 

278) -> iris.Constraint: 

279 """Generate an area constraint between latitude/longitude limits. 

280 

281 Operator that takes a set of latitude and longitude limits and returns a 

282 constraint that selects grid values only inside that area. Works with the 

283 data's native grid so is defined within the rotated pole CRS. 

284 

285 Alternatively, all arguments may be None to indicate the area should not be 

286 constrained. This is useful to allow making subsetting an optional step in a 

287 processing pipeline. 

288 

289 Arguments 

290 --------- 

291 lat_start: float | None 

292 Latitude value for lower bound 

293 lat_end: float | None 

294 Latitude value for top bound 

295 lon_start: float | None 

296 Longitude value for left bound 

297 lon_end: float | None 

298 Longitude value for right bound 

299 

300 Returns 

301 ------- 

302 area_constraint: iris.Constraint 

303 """ 

304 # Check all arguments are defined, or all are None. 

305 if not ( 

306 all( 

307 ( 

308 isinstance(lat_start, numbers.Real), 

309 isinstance(lat_end, numbers.Real), 

310 isinstance(lon_start, numbers.Real), 

311 isinstance(lon_end, numbers.Real), 

312 ) 

313 ) 

314 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

315 ): 

316 raise TypeError("Bounds must real numbers, or all None.") 

317 

318 # Don't constrain area if all arguments are None. 

319 if lat_start is None: # Only need to check once, as they will be the same. 

320 # An empty constraint allows everything. 

321 return iris.Constraint() 

322 

323 # Handle bounds crossing the date line. 

324 if lon_end < lon_start: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 lon_end = lon_end + 360 

326 

327 def bound_lat(cell: iris.coords.Cell) -> bool: 

328 return lat_start < cell < lat_end 

329 

330 def bound_lon(cell: iris.coords.Cell) -> bool: 

331 # Adjust cell values to handle crossing the date line. 

332 if cell < lon_start: 

333 cell = cell + 360 

334 return lon_start < cell < lon_end 

335 

336 area_constraint = iris.Constraint( 

337 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

338 ) 

339 return area_constraint 

340 

341 

342def generate_remove_single_ensemble_member_constraint( 

343 ensemble_member: int = 0, **kwargs 

344) -> iris.Constraint: 

345 """ 

346 Generate a constraint to remove a single ensemble member. 

347 

348 Operator that returns a constraint to remove the given ensemble member. By 

349 default the ensemble member removed is the control member (assumed to have 

350 a realization of zero). However, any ensemble member can be removed, thus 

351 allowing a non-zero control member to be removed if the control is a 

352 different member. 

353 

354 Arguments 

355 --------- 

356 ensemble_member: int 

357 Default is 0. The ensemble member realization to remove. 

358 

359 Returns 

360 ------- 

361 iris.Constraint 

362 

363 Notes 

364 ----- 

365 This operator is primarily used to remove the control member to allow 

366 ensemble metrics to be calculated without the control member. For 

367 example, the ensemble mean is not normally calculated including the 

368 control member. It is particularly useful to remove the control member 

369 when it is not an equally-likely member of the ensemble. 

370 """ 

371 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

372 

373 

374def generate_realization_constraint( 

375 ensemble_members: int | list[int], **kwargs 

376) -> iris.Constraint: 

377 """ 

378 Generate a constraint to subset ensemble members. 

379 

380 Operator that is given a list of ensemble members and returns a constraint 

381 to select those ensemble members. This operator is particularly useful for 

382 subsetting ensembles. 

383 

384 Arguments 

385 --------- 

386 ensemble_members: int | list[int] 

387 The ensemble members to be subsetted over. 

388 

389 Returns 

390 ------- 

391 iris.Constraint 

392 """ 

393 # Ensure ensemble_members is iterable. 

394 ensemble_members = iter_maybe(ensemble_members) 

395 return iris.Constraint(realization=ensemble_members) 

396 

397 

398def generate_hour_constraint( 

399 hour_start: int, 

400 hour_end: int = None, 

401 **kwargs, 

402) -> iris.Constraint: 

403 """Generate an hour constraint between hour of day limits. 

404 

405 Operator that takes a set of hour of day limits and returns a constraint that 

406 selects only hours within that time frame regardless of day. 

407 

408 Alternatively, the result can be constrained to a single hour by just entering 

409 a starting hour. 

410 

411 Should any sub-hourly data be given these will have the same hour coordinate 

412 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

413 times will be selected with this constraint. 

414 

415 Arguments 

416 --------- 

417 hour_start: int 

418 The hour of day for the lower bound, within 0 to 23. 

419 hour_end: int | None 

420 The hour of day for the upper bound, within 0 to 23. Alternatively, 

421 set to None if only one hour required. 

422 

423 Returns 

424 ------- 

425 hour_constraint: iris.Constraint 

426 

427 Raises 

428 ------ 

429 ValueError 

430 If the provided arguments are outside of the range 0 to 23. 

431 """ 

432 if hour_end is None: 

433 hour_end = hour_start 

434 

435 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

436 raise ValueError("Hours must be between 0 and 23 inclusive.") 

437 

438 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

439 return hour_constraint 

440 

441 

442def combine_constraints( 

443 constraint: iris.Constraint = None, **kwargs 

444) -> iris.Constraint: 

445 """ 

446 Operator that combines multiple constraints into one. 

447 

448 Arguments 

449 --------- 

450 constraint: iris.Constraint 

451 First constraint to combine. 

452 additional_constraint_1: iris.Constraint 

453 Second constraint to combine. This must be a named argument. 

454 additional_constraint_2: iris.Constraint 

455 There can be any number of additional constraint, they just need unique 

456 names. 

457 ... 

458 

459 Returns 

460 ------- 

461 combined_constraint: iris.Constraint 

462 

463 Raises 

464 ------ 

465 TypeError 

466 If the provided arguments are not constraints. 

467 """ 

468 # If the first argument is not a constraint, it is ignored. This handles the 

469 # automatic passing of the previous step's output. 

470 if isinstance(constraint, iris.Constraint): 

471 combined_constraint = constraint 

472 else: 

473 combined_constraint = iris.Constraint() 

474 

475 for constr in kwargs.values(): 

476 combined_constraint = combined_constraint & constr 

477 return combined_constraint 

478 

479 

480def generate_attribute_constraint( 

481 attribute: str, value: str = None, **kwargs 

482) -> iris.AttributeConstraint: 

483 """Generate constraint on cube attributes. 

484 

485 Constrains based on the presence of an attribute, and that attribute having 

486 a particular value. 

487 

488 Arguments 

489 --------- 

490 attribute: str 

491 Attribute to constraint on. 

492 

493 value: str 

494 Attribute value to constrain on. If omitted the constraint merely checks 

495 for the presence of an attribute. 

496 

497 Returns 

498 ------- 

499 attribute_constraint: iris.Constraint 

500 """ 

501 if value is None: 

502 attribute_constraint = iris.Constraint( 

503 cube_func=lambda cube: attribute in cube.attributes 

504 ) 

505 else: 

506 attribute_constraint = iris.AttributeConstraint(**{attribute: value}) 

507 return attribute_constraint