Coverage for src / CSET / operators / constraints.py: 93%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:48 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 varname_constraint: iris.Constraint 

67 """ 

68 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname): 

69 varname_constraint = iris.AttributeConstraint(STASH=varname) 

70 else: 

71 varname_constraint = iris.Constraint(name=varname) 

72 return varname_constraint 

73 

74 

75def generate_level_constraint( 

76 coordinate: str, levels: int | list[int] | str, **kwargs 

77) -> iris.Constraint: 

78 """Generate constraint for particular levels on the specified coordinate. 

79 

80 Operator that generates a constraint to constrain to specific model or 

81 pressure levels. If no levels are specified then any cube with the specified 

82 coordinate is rejected. 

83 

84 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

85 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

86 

87 Arguments 

88 --------- 

89 coordinate: str 

90 Level coordinate name about which to constraint. 

91 levels: int | list[int] | str 

92 CF compliant level points, ``"*"`` for retrieving all levels, or 

93 ``[]`` for no levels. 

94 

95 Returns 

96 ------- 

97 constraint: iris.Constraint 

98 

99 Notes 

100 ----- 

101 Due to the specification of ``coordinate`` as an argument any iterable 

102 coordinate can be stratified with this function. Therefore, 

103 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

104 ensemble members, or group of ensemble members you wish to constrain your 

105 results over. 

106 """ 

107 # If asterisks, then return all levels for given coordinate. 

108 if levels == "*": 

109 return iris.Constraint(**{coordinate: lambda cell: True}) 

110 else: 

111 # Ensure is iterable. 

112 if not isinstance(levels, Iterable): 

113 levels = [levels] 

114 

115 # When no levels specified reject cube with level coordinate. 

116 if len(levels) == 0: 

117 

118 def no_levels(cube): 

119 # Reject cubes for which coordinate exists. 

120 return not cube.coords(coordinate) 

121 

122 return iris.Constraint(cube_func=no_levels) 

123 

124 # Filter the coordinate to the desired levels. 

125 # Dictionary unpacking is used to provide programmatic keyword arguments. 

126 return iris.Constraint(**{coordinate: levels}) 

127 

128 

129def generate_cell_methods_constraint( 

130 cell_methods: list, 

131 varname: str | None = None, 

132 coord: iris.coords.Coord | None = None, 

133 interval: str | None = None, 

134 comment: str | None = None, 

135 **kwargs, 

136) -> iris.Constraint: 

137 """Generate constraint from cell methods. 

138 

139 Operator that takes a list of cell methods and generates a constraint from 

140 that. Use [] to specify non-aggregated data. 

141 

142 Arguments 

143 --------- 

144 cell_methods: list 

145 cube.cell_methods for filtering. 

146 varname: str, optional 

147 CF compliant name of variable. 

148 coord: iris.coords.Coord, optional 

149 iris.coords.Coord to which the cell method is applied to. 

150 interval: str, optional 

151 interval over which the cell method is applied to (e.g. 1 hour). 

152 comment: str, optional 

153 any comments in Cube meta data associated with the cell method. 

154 

155 Returns 

156 ------- 

157 cell_method_constraint: iris.Constraint 

158 """ 

159 if len(cell_methods) == 0: 

160 

161 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

162 """Check that any cell methods are "point", meaning no aggregation.""" 

163 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

164 

165 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

166 """Check that any cell methods are "sum".""" 

167 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

168 

169 def check_cell_mean(cube: iris.cube.Cube) -> bool: 

170 """Check that any cell methods are "mean".""" 

171 return set(cm.method for cm in cube.cell_methods) == {"mean"} 

172 

173 if varname: 

174 # Require number_of_lightning_flashes to be "sum" cell_method input. 

175 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

176 if ("lightning" in varname) or ( 

177 "surface_microphysical" in varname and "amount" in varname 

178 ): 

179 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

180 return cell_methods_constraint 

181 # Require climatological ancillary as time-average mean. 

182 if ("albedo" in varname) or ( 182 ↛ 189line 182 didn't jump to line 189 because the condition on line 182 was always true

183 "ocean" in varname and "chlorophyll" in varname 

184 ): 

185 cell_methods_constraint = iris.Constraint(cube_func=check_cell_mean) 

186 return cell_methods_constraint 

187 

188 # If no variable name set, assume require instantaneous cube. 

189 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

190 

191 else: 

192 # If cell_method constraint set in recipe, check for required input. 

193 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

194 return all( 

195 iris.coords.CellMethod( 

196 method=cm, coords=coord, intervals=interval, comments=comment 

197 ) 

198 in cube.cell_methods 

199 for cm in cell_methods 

200 ) 

201 

202 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

203 

204 return cell_methods_constraint 

205 

206 

207def generate_time_constraint( 

208 time_start: str, time_end: str = None, **kwargs 

209) -> iris.Constraint: 

210 """Generate constraint between times. 

211 

212 Operator that takes one or two ISO 8601 date strings, and returns a 

213 constraint that selects values between those dates (inclusive). 

214 

215 Arguments 

216 --------- 

217 time_start: str | datetime.datetime | cftime.datetime 

218 ISO date for lower bound 

219 

220 time_end: str | datetime.datetime | cftime.datetime 

221 ISO date for upper bound. If omitted it defaults to the same as 

222 time_start 

223 

224 Returns 

225 ------- 

226 time_constraint: iris.Constraint 

227 """ 

228 if isinstance(time_start, str): 

229 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

230 else: 

231 pdt_start, offset_start = time_start, timedelta(0) 

232 

233 if time_end is None: 

234 pdt_end, offset_end = time_start, offset_start 

235 elif isinstance(time_end, str): 

236 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

237 print(pdt_end) 

238 print(offset_end) 

239 else: 

240 pdt_end, offset_end = time_end, timedelta(0) 

241 

242 if offset_start is None: 

243 offset_start = timedelta(0) 

244 if offset_end is None: 

245 offset_end = timedelta(0) 

246 

247 time_constraint = iris.Constraint( 

248 time=lambda t: ( 

249 (pdt_start <= (t.point - offset_start)) 

250 and ((t.point - offset_end) <= pdt_end) 

251 ) 

252 ) 

253 

254 return time_constraint 

255 

256 

257def generate_area_constraint( 

258 lat_start: float | None, 

259 lat_end: float | None, 

260 lon_start: float | None, 

261 lon_end: float | None, 

262 **kwargs, 

263) -> iris.Constraint: 

264 """Generate an area constraint between latitude/longitude limits. 

265 

266 Operator that takes a set of latitude and longitude limits and returns a 

267 constraint that selects grid values only inside that area. Works with the 

268 data's native grid so is defined within the rotated pole CRS. 

269 

270 Alternatively, all arguments may be None to indicate the area should not be 

271 constrained. This is useful to allow making subsetting an optional step in a 

272 processing pipeline. 

273 

274 Arguments 

275 --------- 

276 lat_start: float | None 

277 Latitude value for lower bound 

278 lat_end: float | None 

279 Latitude value for top bound 

280 lon_start: float | None 

281 Longitude value for left bound 

282 lon_end: float | None 

283 Longitude value for right bound 

284 

285 Returns 

286 ------- 

287 area_constraint: iris.Constraint 

288 """ 

289 # Check all arguments are defined, or all are None. 

290 if not ( 

291 all( 

292 ( 

293 isinstance(lat_start, numbers.Real), 

294 isinstance(lat_end, numbers.Real), 

295 isinstance(lon_start, numbers.Real), 

296 isinstance(lon_end, numbers.Real), 

297 ) 

298 ) 

299 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

300 ): 

301 raise TypeError("Bounds must real numbers, or all None.") 

302 

303 # Don't constrain area if all arguments are None. 

304 if lat_start is None: # Only need to check once, as they will be the same. 

305 # An empty constraint allows everything. 

306 return iris.Constraint() 

307 

308 # Handle bounds crossing the date line. 

309 if lon_end < lon_start: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 lon_end = lon_end + 360 

311 

312 def bound_lat(cell: iris.coords.Cell) -> bool: 

313 return lat_start < cell < lat_end 

314 

315 def bound_lon(cell: iris.coords.Cell) -> bool: 

316 # Adjust cell values to handle crossing the date line. 

317 if cell < lon_start: 

318 cell = cell + 360 

319 return lon_start < cell < lon_end 

320 

321 area_constraint = iris.Constraint( 

322 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

323 ) 

324 return area_constraint 

325 

326 

327def generate_remove_single_ensemble_member_constraint( 

328 ensemble_member: int = 0, **kwargs 

329) -> iris.Constraint: 

330 """ 

331 Generate a constraint to remove a single ensemble member. 

332 

333 Operator that returns a constraint to remove the given ensemble member. By 

334 default the ensemble member removed is the control member (assumed to have 

335 a realization of zero). However, any ensemble member can be removed, thus 

336 allowing a non-zero control member to be removed if the control is a 

337 different member. 

338 

339 Arguments 

340 --------- 

341 ensemble_member: int 

342 Default is 0. The ensemble member realization to remove. 

343 

344 Returns 

345 ------- 

346 iris.Constraint 

347 

348 Notes 

349 ----- 

350 This operator is primarily used to remove the control member to allow 

351 ensemble metrics to be calculated without the control member. For 

352 example, the ensemble mean is not normally calculated including the 

353 control member. It is particularly useful to remove the control member 

354 when it is not an equally-likely member of the ensemble. 

355 """ 

356 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

357 

358 

359def generate_realization_constraint( 

360 ensemble_members: int | list[int], **kwargs 

361) -> iris.Constraint: 

362 """ 

363 Generate a constraint to subset ensemble members. 

364 

365 Operator that is given a list of ensemble members and returns a constraint 

366 to select those ensemble members. This operator is particularly useful for 

367 subsetting ensembles. 

368 

369 Arguments 

370 --------- 

371 ensemble_members: int | list[int] 

372 The ensemble members to be subsetted over. 

373 

374 Returns 

375 ------- 

376 iris.Constraint 

377 """ 

378 # Ensure ensemble_members is iterable. 

379 ensemble_members = iter_maybe(ensemble_members) 

380 return iris.Constraint(realization=ensemble_members) 

381 

382 

383def generate_hour_constraint( 

384 hour_start: int, 

385 hour_end: int = None, 

386 **kwargs, 

387) -> iris.Constraint: 

388 """Generate an hour constraint between hour of day limits. 

389 

390 Operator that takes a set of hour of day limits and returns a constraint that 

391 selects only hours within that time frame regardless of day. 

392 

393 Alternatively, the result can be constrained to a single hour by just entering 

394 a starting hour. 

395 

396 Should any sub-hourly data be given these will have the same hour coordinate 

397 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

398 times will be selected with this constraint. 

399 

400 Arguments 

401 --------- 

402 hour_start: int 

403 The hour of day for the lower bound, within 0 to 23. 

404 hour_end: int | None 

405 The hour of day for the upper bound, within 0 to 23. Alternatively, 

406 set to None if only one hour required. 

407 

408 Returns 

409 ------- 

410 hour_constraint: iris.Constraint 

411 

412 Raises 

413 ------ 

414 ValueError 

415 If the provided arguments are outside of the range 0 to 23. 

416 """ 

417 if hour_end is None: 

418 hour_end = hour_start 

419 

420 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

421 raise ValueError("Hours must be between 0 and 23 inclusive.") 

422 

423 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

424 return hour_constraint 

425 

426 

427def combine_constraints( 

428 constraint: iris.Constraint = None, **kwargs 

429) -> iris.Constraint: 

430 """ 

431 Operator that combines multiple constraints into one. 

432 

433 Arguments 

434 --------- 

435 constraint: iris.Constraint 

436 First constraint to combine. 

437 additional_constraint_1: iris.Constraint 

438 Second constraint to combine. This must be a named argument. 

439 additional_constraint_2: iris.Constraint 

440 There can be any number of additional constraint, they just need unique 

441 names. 

442 ... 

443 

444 Returns 

445 ------- 

446 combined_constraint: iris.Constraint 

447 

448 Raises 

449 ------ 

450 TypeError 

451 If the provided arguments are not constraints. 

452 """ 

453 # If the first argument is not a constraint, it is ignored. This handles the 

454 # automatic passing of the previous step's output. 

455 if isinstance(constraint, iris.Constraint): 

456 combined_constraint = constraint 

457 else: 

458 combined_constraint = iris.Constraint() 

459 

460 for constr in kwargs.values(): 

461 combined_constraint = combined_constraint & constr 

462 return combined_constraint 

463 

464 

465def generate_attribute_constraint( 

466 attribute: str, value: str = None, **kwargs 

467) -> iris.AttributeConstraint: 

468 """Generate constraint on cube attributes. 

469 

470 Constrains based on the presence of an attribute, and that attribute having 

471 a particular value. 

472 

473 Arguments 

474 --------- 

475 attribute: str 

476 Attribute to constraint on. 

477 

478 value: str 

479 Attribute value to constrain on. If omitted the constraint merely checks 

480 for the presence of an attribute. 

481 

482 Returns 

483 ------- 

484 attribute_constraint: iris.Constraint 

485 """ 

486 if value is None: 

487 attribute_constraint = iris.Constraint( 

488 cube_func=lambda cube: attribute in cube.attributes 

489 ) 

490 else: 

491 attribute_constraint = iris.AttributeConstraint(**{attribute: value}) 

492 return attribute_constraint