Coverage for src / CSET / operators / constraints.py: 93%

105 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 09:52 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 varname_constraint: iris.Constraint 

67 """ 

68 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname): 

69 varname_constraint = iris.AttributeConstraint(STASH=varname) 

70 else: 

71 varname_constraint = iris.Constraint(name=varname) 

72 return varname_constraint 

73 

74 

75def generate_level_constraint( 

76 coordinate: str, levels: int | list[int] | str, **kwargs 

77) -> iris.Constraint: 

78 """Generate constraint for particular levels on the specified coordinate. 

79 

80 Operator that generates a constraint to constrain to specific model or 

81 pressure levels. If no levels are specified then any cube with the specified 

82 coordinate is rejected. 

83 

84 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

85 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

86 

87 Arguments 

88 --------- 

89 coordinate: str 

90 Level coordinate name about which to constraint. 

91 levels: int | list[int] | str 

92 CF compliant level points, ``"*"`` for retrieving all levels, or 

93 ``[]`` for no levels. 

94 

95 Returns 

96 ------- 

97 constraint: iris.Constraint 

98 

99 Notes 

100 ----- 

101 Due to the specification of ``coordinate`` as an argument any iterable 

102 coordinate can be stratified with this function. Therefore, 

103 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

104 ensemble members, or group of ensemble members you wish to constrain your 

105 results over. 

106 """ 

107 # If asterisks, then return all levels for given coordinate. 

108 if levels == "*": 

109 return iris.Constraint(**{coordinate: lambda cell: True}) 

110 else: 

111 # Ensure is iterable. 

112 if not isinstance(levels, Iterable): 

113 levels = [levels] 

114 

115 # When no levels specified reject cube with level coordinate. 

116 if len(levels) == 0: 

117 

118 def no_levels(cube): 

119 # Reject cubes for which coordinate exists. 

120 return not cube.coords(coordinate) 

121 

122 return iris.Constraint(cube_func=no_levels) 

123 

124 # Filter the coordinate to the desired levels. 

125 # Dictionary unpacking is used to provide programmatic keyword arguments. 

126 return iris.Constraint(**{coordinate: levels}) 

127 

128 

129def generate_remove_single_level_constraint( 

130 coord: str, level: int = 0, **kwargs 

131) -> iris.Constraint: 

132 """ 

133 Generate a constraint to remove a single model level number. 

134 

135 Operator that returns a constraint to remove the given level. By 

136 default the first level is removed (assumed to be 

137 level zero). However, any level can be removed. 

138 

139 Arguments 

140 --------- 

141 coord: str 

142 The coordinate for which the level is to be removed. 

143 level: int 

144 Default is 0. The model level number to remove. 

145 

146 Returns 

147 ------- 

148 iris.Constraint 

149 

150 Notes 

151 ----- 

152 This operator is primarily used to ensure the levels are consistent 

153 as some level sets (e.g. specific humidity) will be on the same level set 

154 but have a different number of levels (e.g 71 instead of expected 70). 

155 """ 

156 return iris.Constraint(**{coord: lambda m: m.point != level}) 

157 

158 

159def generate_cell_methods_constraint( 

160 cell_methods: list, 

161 varname: str | None = None, 

162 coord: iris.coords.Coord | None = None, 

163 interval: str | None = None, 

164 comment: str | None = None, 

165 **kwargs, 

166) -> iris.Constraint: 

167 """Generate constraint from cell methods. 

168 

169 Operator that takes a list of cell methods and generates a constraint from 

170 that. Use [] to specify non-aggregated data. 

171 

172 Arguments 

173 --------- 

174 cell_methods: list 

175 cube.cell_methods for filtering. 

176 varname: str, optional 

177 CF compliant name of variable. 

178 coord: iris.coords.Coord, optional 

179 iris.coords.Coord to which the cell method is applied to. 

180 interval: str, optional 

181 interval over which the cell method is applied to (e.g. 1 hour). 

182 comment: str, optional 

183 any comments in Cube meta data associated with the cell method. 

184 

185 Returns 

186 ------- 

187 cell_method_constraint: iris.Constraint 

188 """ 

189 if len(cell_methods) == 0: 

190 

191 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

192 """Check that any cell methods are "point", meaning no aggregation.""" 

193 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

194 

195 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

196 """Check that any cell methods are "sum".""" 

197 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

198 

199 def check_cell_mean(cube: iris.cube.Cube) -> bool: 

200 """Check that any cell methods are "mean".""" 

201 return set(cm.method for cm in cube.cell_methods) == {"mean"} 

202 

203 if varname: 

204 # Require number_of_lightning_flashes to be "sum" cell_method input. 

205 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

206 if ("lightning" in varname) or ( 

207 "surface_microphysical" in varname and "amount" in varname 

208 ): 

209 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

210 return cell_methods_constraint 

211 # Require climatological ancillary as time-average mean. 

212 if ("albedo" in varname) or ( 212 ↛ 219line 212 didn't jump to line 219 because the condition on line 212 was always true

213 "ocean" in varname and "chlorophyll" in varname 

214 ): 

215 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean) 

216 return cell_methods_constraint 

217 

218 # If no variable name set, assume require instantaneous cube. 

219 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

220 

221 else: 

222 # If cell_method constraint set in recipe, check for required input. 

223 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

224 return all( 

225 iris.coords.CellMethod( 

226 method=cm, coords=coord, intervals=interval, comments=comment 

227 ) 

228 in cube.cell_methods 

229 for cm in cell_methods 

230 ) 

231 

232 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

233 

234 return cell_methods_constraint 

235 

236 

237def generate_time_constraint( 

238 time_start: str, time_end: str = None, **kwargs 

239) -> iris.Constraint: 

240 """Generate constraint between times. 

241 

242 Operator that takes one or two ISO 8601 date strings, and returns a 

243 constraint that selects values between those dates (inclusive). 

244 

245 Arguments 

246 --------- 

247 time_start: str | datetime.datetime | cftime.datetime 

248 ISO date for lower bound 

249 

250 time_end: str | datetime.datetime | cftime.datetime 

251 ISO date for upper bound. If omitted it defaults to the same as 

252 time_start 

253 

254 Returns 

255 ------- 

256 time_constraint: iris.Constraint 

257 """ 

258 if isinstance(time_start, str): 

259 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

260 else: 

261 pdt_start, offset_start = time_start, timedelta(0) 

262 

263 if time_end is None: 

264 pdt_end, offset_end = time_start, offset_start 

265 elif isinstance(time_end, str): 

266 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

267 print(pdt_end) 

268 print(offset_end) 

269 else: 

270 pdt_end, offset_end = time_end, timedelta(0) 

271 

272 if offset_start is None: 

273 offset_start = timedelta(0) 

274 if offset_end is None: 

275 offset_end = timedelta(0) 

276 

277 time_constraint = iris.Constraint( 

278 time=lambda t: ( 

279 (pdt_start <= (t.point - offset_start)) 

280 and ((t.point - offset_end) <= pdt_end) 

281 ) 

282 ) 

283 

284 return time_constraint 

285 

286 

287def generate_area_constraint( 

288 lat_start: float | None, 

289 lat_end: float | None, 

290 lon_start: float | None, 

291 lon_end: float | None, 

292 **kwargs, 

293) -> iris.Constraint: 

294 """Generate an area constraint between latitude/longitude limits. 

295 

296 Operator that takes a set of latitude and longitude limits and returns a 

297 constraint that selects grid values only inside that area. Works with the 

298 data's native grid so is defined within the rotated pole CRS. 

299 

300 Alternatively, all arguments may be None to indicate the area should not be 

301 constrained. This is useful to allow making subsetting an optional step in a 

302 processing pipeline. 

303 

304 Arguments 

305 --------- 

306 lat_start: float | None 

307 Latitude value for lower bound 

308 lat_end: float | None 

309 Latitude value for top bound 

310 lon_start: float | None 

311 Longitude value for left bound 

312 lon_end: float | None 

313 Longitude value for right bound 

314 

315 Returns 

316 ------- 

317 area_constraint: iris.Constraint 

318 """ 

319 # Check all arguments are defined, or all are None. 

320 if not ( 

321 all( 

322 ( 

323 isinstance(lat_start, numbers.Real), 

324 isinstance(lat_end, numbers.Real), 

325 isinstance(lon_start, numbers.Real), 

326 isinstance(lon_end, numbers.Real), 

327 ) 

328 ) 

329 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

330 ): 

331 raise TypeError("Bounds must real numbers, or all None.") 

332 

333 # Don't constrain area if all arguments are None. 

334 if lat_start is None: # Only need to check once, as they will be the same. 

335 # An empty constraint allows everything. 

336 return iris.Constraint() 

337 

338 # Handle bounds crossing the date line. 

339 if lon_end < lon_start: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 lon_end = lon_end + 360 

341 

342 def bound_lat(cell: iris.coords.Cell) -> bool: 

343 return lat_start < cell < lat_end 

344 

345 def bound_lon(cell: iris.coords.Cell) -> bool: 

346 # Adjust cell values to handle crossing the date line. 

347 if cell < lon_start: 

348 cell = cell + 360 

349 return lon_start < cell < lon_end 

350 

351 area_constraint = iris.Constraint( 

352 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

353 ) 

354 return area_constraint 

355 

356 

357def generate_remove_single_ensemble_member_constraint( 

358 ensemble_member: int = 0, **kwargs 

359) -> iris.Constraint: 

360 """ 

361 Generate a constraint to remove a single ensemble member. 

362 

363 Operator that returns a constraint to remove the given ensemble member. By 

364 default the ensemble member removed is the control member (assumed to have 

365 a realization of zero). However, any ensemble member can be removed, thus 

366 allowing a non-zero control member to be removed if the control is a 

367 different member. 

368 

369 Arguments 

370 --------- 

371 ensemble_member: int 

372 Default is 0. The ensemble member realization to remove. 

373 

374 Returns 

375 ------- 

376 iris.Constraint 

377 

378 Notes 

379 ----- 

380 This operator is primarily used to remove the control member to allow 

381 ensemble metrics to be calculated without the control member. For 

382 example, the ensemble mean is not normally calculated including the 

383 control member. It is particularly useful to remove the control member 

384 when it is not an equally-likely member of the ensemble. 

385 """ 

386 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

387 

388 

389def generate_realization_constraint( 

390 ensemble_members: int | list[int], **kwargs 

391) -> iris.Constraint: 

392 """ 

393 Generate a constraint to subset ensemble members. 

394 

395 Operator that is given a list of ensemble members and returns a constraint 

396 to select those ensemble members. This operator is particularly useful for 

397 subsetting ensembles. 

398 

399 Arguments 

400 --------- 

401 ensemble_members: int | list[int] 

402 The ensemble members to be subsetted over. 

403 

404 Returns 

405 ------- 

406 iris.Constraint 

407 """ 

408 # Ensure ensemble_members is iterable. 

409 ensemble_members = iter_maybe(ensemble_members) 

410 return iris.Constraint(realization=ensemble_members) 

411 

412 

413def generate_hour_constraint( 

414 hour_start: int, 

415 hour_end: int = None, 

416 **kwargs, 

417) -> iris.Constraint: 

418 """Generate an hour constraint between hour of day limits. 

419 

420 Operator that takes a set of hour of day limits and returns a constraint that 

421 selects only hours within that time frame regardless of day. 

422 

423 Alternatively, the result can be constrained to a single hour by just entering 

424 a starting hour. 

425 

426 Should any sub-hourly data be given these will have the same hour coordinate 

427 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

428 times will be selected with this constraint. 

429 

430 Arguments 

431 --------- 

432 hour_start: int 

433 The hour of day for the lower bound, within 0 to 23. 

434 hour_end: int | None 

435 The hour of day for the upper bound, within 0 to 23. Alternatively, 

436 set to None if only one hour required. 

437 

438 Returns 

439 ------- 

440 hour_constraint: iris.Constraint 

441 

442 Raises 

443 ------ 

444 ValueError 

445 If the provided arguments are outside of the range 0 to 23. 

446 """ 

447 if hour_end is None: 

448 hour_end = hour_start 

449 

450 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

451 raise ValueError("Hours must be between 0 and 23 inclusive.") 

452 

453 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

454 return hour_constraint 

455 

456 

457def combine_constraints( 

458 constraint: iris.Constraint = None, **kwargs 

459) -> iris.Constraint: 

460 """ 

461 Operator that combines multiple constraints into one. 

462 

463 Arguments 

464 --------- 

465 constraint: iris.Constraint 

466 First constraint to combine. 

467 additional_constraint_1: iris.Constraint 

468 Second constraint to combine. This must be a named argument. 

469 additional_constraint_2: iris.Constraint 

470 There can be any number of additional constraint, they just need unique 

471 names. 

472 ... 

473 

474 Returns 

475 ------- 

476 combined_constraint: iris.Constraint 

477 

478 Raises 

479 ------ 

480 TypeError 

481 If the provided arguments are not constraints. 

482 """ 

483 # If the first argument is not a constraint, it is ignored. This handles the 

484 # automatic passing of the previous step's output. 

485 if isinstance(constraint, iris.Constraint): 

486 combined_constraint = constraint 

487 else: 

488 combined_constraint = iris.Constraint() 

489 

490 for constr in kwargs.values(): 

491 combined_constraint = combined_constraint & constr 

492 return combined_constraint 

493 

494 

495def generate_attribute_constraint( 

496 attribute: str, value: str = None, **kwargs 

497) -> iris.AttributeConstraint: 

498 """Generate constraint on cube attributes. 

499 

500 Constrains based on the presence of an attribute, and that attribute having 

501 a particular value. 

502 

503 Arguments 

504 --------- 

505 attribute: str 

506 Attribute to constraint on. 

507 

508 value: str 

509 Attribute value to constrain on. If omitted the constraint merely checks 

510 for the presence of an attribute. 

511 

512 Returns 

513 ------- 

514 attribute_constraint: iris.Constraint 

515 """ 

516 if value is None: 

517 attribute_constraint = iris.Constraint( 

518 cube_func=lambda cube: attribute in cube.attributes 

519 ) 

520 else: 

521 attribute_constraint = iris.AttributeConstraint(**{attribute: value}) 

522 return attribute_constraint