Coverage for src/CSET/operators/constraints.py: 92%

93 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 15:17 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import timedelta 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26import CSET.operators._utils as operator_utils 

27from CSET._common import iter_maybe 

28 

29 

30def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

31 """Generate constraint from STASH code. 

32 

33 Operator that takes a stash string, and uses iris to generate a constraint 

34 to be passed into the read operator to minimize the CubeList the read 

35 operator loads and speed up loading. 

36 

37 Arguments 

38 --------- 

39 stash: str 

40 stash code to build iris constraint, such as "m01s03i236" 

41 

42 Returns 

43 ------- 

44 stash_constraint: iris.AttributeConstraint 

45 """ 

46 # At a later stage str list an option to combine constraints. Arguments 

47 # could be a list of stash codes that combined build the constraint. 

48 stash_constraint = iris.AttributeConstraint(STASH=stash) 

49 return stash_constraint 

50 

51 

52def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

53 """Generate constraint from variable name or STASH code. 

54 

55 Operator that takes a CF compliant variable name string, and generates an 

56 iris constraint to be passed into the read or filter operator. Can also be 

57 passed a STASH code to generate a STASH constraint. 

58 

59 Arguments 

60 --------- 

61 varname: str 

62 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

63 

64 Returns 

65 ------- 

66 varname_constraint: iris.Constraint 

67 """ 

68 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname): 

69 varname_constraint = iris.AttributeConstraint(STASH=varname) 

70 else: 

71 varname_constraint = iris.Constraint(name=varname) 

72 return varname_constraint 

73 

74 

75def generate_level_constraint( 

76 coordinate: str, levels: int | list[int] | str, **kwargs 

77) -> iris.Constraint: 

78 """Generate constraint for particular levels on the specified coordinate. 

79 

80 Operator that generates a constraint to constrain to specific model or 

81 pressure levels. If no levels are specified then any cube with the specified 

82 coordinate is rejected. 

83 

84 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

85 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

86 

87 Arguments 

88 --------- 

89 coordinate: str 

90 Level coordinate name about which to constraint. 

91 levels: int | list[int] | str 

92 CF compliant level points, ``"*"`` for retrieving all levels, or 

93 ``[]`` for no levels. 

94 

95 Returns 

96 ------- 

97 constraint: iris.Constraint 

98 

99 Notes 

100 ----- 

101 Due to the specification of ``coordinate`` as an argument any iterable 

102 coordinate can be stratified with this function. Therefore, 

103 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

104 ensemble members, or group of ensemble members you wish to constrain your 

105 results over. 

106 """ 

107 # If asterisks, then return all levels for given coordinate. 

108 if levels == "*": 

109 return iris.Constraint(**{coordinate: lambda cell: True}) 

110 else: 

111 # Ensure is iterable. 

112 if not isinstance(levels, Iterable): 

113 levels = [levels] 

114 

115 # When no levels specified reject cube with level coordinate. 

116 if len(levels) == 0: 

117 

118 def no_levels(cube): 

119 # Reject cubes for which coordinate exists. 

120 return not cube.coords(coordinate) 

121 

122 return iris.Constraint(cube_func=no_levels) 

123 

124 # Filter the coordinate to the desired levels. 

125 # Dictionary unpacking is used to provide programmatic keyword arguments. 

126 return iris.Constraint(**{coordinate: levels}) 

127 

128 

129def generate_cell_methods_constraint( 

130 cell_methods: list, 

131 varname: str | None = None, 

132 coord: iris.coords.Coord | None = None, 

133 interval: str | None = None, 

134 comment: str | None = None, 

135 **kwargs, 

136) -> iris.Constraint: 

137 """Generate constraint from cell methods. 

138 

139 Operator that takes a list of cell methods and generates a constraint from 

140 that. Use [] to specify non-aggregated data. 

141 

142 Arguments 

143 --------- 

144 cell_methods: list 

145 cube.cell_methods for filtering. 

146 varname: str, optional 

147 CF compliant name of variable. 

148 coord: iris.coords.Coord, optional 

149 iris.coords.Coord to which the cell method is applied to. 

150 interval: str, optional 

151 interval over which the cell method is applied to (e.g. 1 hour). 

152 comment: str, optional 

153 any comments in Cube meta data associated with the cell method. 

154 

155 Returns 

156 ------- 

157 cell_method_constraint: iris.Constraint 

158 """ 

159 if len(cell_methods) == 0: 

160 

161 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

162 """Check that any cell methods are "point", meaning no aggregation.""" 

163 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

164 

165 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

166 """Check that any cell methods are "sum".""" 

167 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

168 

169 if varname: 

170 # Require number_of_lightning_flashes to be "sum" cell_method input. 

171 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

172 if ("lightning" in varname) or ( 172 ↛ 179line 172 didn't jump to line 179 because the condition on line 172 was always true

173 "surface_microphysical" in varname and "amount" in varname 

174 ): 

175 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

176 return cell_methods_constraint 

177 

178 # If no variable name set, assume require instantaneous cube. 

179 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

180 

181 else: 

182 # If cell_method constraint set in recipe, check for required input. 

183 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

184 return all( 

185 iris.coords.CellMethod( 

186 method=cm, coords=coord, intervals=interval, comments=comment 

187 ) 

188 in cube.cell_methods 

189 for cm in cell_methods 

190 ) 

191 

192 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

193 

194 return cell_methods_constraint 

195 

196 

197def generate_time_constraint( 

198 time_start: str, time_end: str = None, **kwargs 

199) -> iris.Constraint: 

200 """Generate constraint between times. 

201 

202 Operator that takes one or two ISO 8601 date strings, and returns a 

203 constraint that selects values between those dates (inclusive). 

204 

205 Arguments 

206 --------- 

207 time_start: str | datetime.datetime | cftime.datetime 

208 ISO date for lower bound 

209 

210 time_end: str | datetime.datetime | cftime.datetime 

211 ISO date for upper bound. If omitted it defaults to the same as 

212 time_start 

213 

214 Returns 

215 ------- 

216 time_constraint: iris.Constraint 

217 """ 

218 if isinstance(time_start, str): 

219 pdt_start, offset_start = operator_utils.pdt_fromisoformat(time_start) 

220 else: 

221 pdt_start, offset_start = time_start, timedelta(0) 

222 

223 if time_end is None: 

224 pdt_end, offset_end = time_start, offset_start 

225 elif isinstance(time_end, str): 

226 pdt_end, offset_end = operator_utils.pdt_fromisoformat(time_end) 

227 print(pdt_end) 

228 print(offset_end) 

229 else: 

230 pdt_end, offset_end = time_end, timedelta(0) 

231 

232 if offset_start is None: 

233 offset_start = timedelta(0) 

234 if offset_end is None: 

235 offset_end = timedelta(0) 

236 

237 time_constraint = iris.Constraint( 

238 time=lambda t: ( 

239 (pdt_start <= (t.point - offset_start)) 

240 and ((t.point - offset_end) <= pdt_end) 

241 ) 

242 ) 

243 

244 return time_constraint 

245 

246 

247def generate_area_constraint( 

248 lat_start: float | None, 

249 lat_end: float | None, 

250 lon_start: float | None, 

251 lon_end: float | None, 

252 **kwargs, 

253) -> iris.Constraint: 

254 """Generate an area constraint between latitude/longitude limits. 

255 

256 Operator that takes a set of latitude and longitude limits and returns a 

257 constraint that selects grid values only inside that area. Works with the 

258 data's native grid so is defined within the rotated pole CRS. 

259 

260 Alternatively, all arguments may be None to indicate the area should not be 

261 constrained. This is useful to allow making subsetting an optional step in a 

262 processing pipeline. 

263 

264 Arguments 

265 --------- 

266 lat_start: float | None 

267 Latitude value for lower bound 

268 lat_end: float | None 

269 Latitude value for top bound 

270 lon_start: float | None 

271 Longitude value for left bound 

272 lon_end: float | None 

273 Longitude value for right bound 

274 

275 Returns 

276 ------- 

277 area_constraint: iris.Constraint 

278 """ 

279 # Check all arguments are defined, or all are None. 

280 if not ( 

281 all( 

282 ( 

283 isinstance(lat_start, numbers.Real), 

284 isinstance(lat_end, numbers.Real), 

285 isinstance(lon_start, numbers.Real), 

286 isinstance(lon_end, numbers.Real), 

287 ) 

288 ) 

289 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

290 ): 

291 raise TypeError("Bounds must real numbers, or all None.") 

292 

293 # Don't constrain area if all arguments are None. 

294 if lat_start is None: # Only need to check once, as they will be the same. 

295 # An empty constraint allows everything. 

296 return iris.Constraint() 

297 

298 # Handle bounds crossing the date line. 

299 if lon_end < lon_start: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 lon_end = lon_end + 360 

301 

302 def bound_lat(cell: iris.coords.Cell) -> bool: 

303 return lat_start < cell < lat_end 

304 

305 def bound_lon(cell: iris.coords.Cell) -> bool: 

306 # Adjust cell values to handle crossing the date line. 

307 if cell < lon_start: 

308 cell = cell + 360 

309 return lon_start < cell < lon_end 

310 

311 area_constraint = iris.Constraint( 

312 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

313 ) 

314 return area_constraint 

315 

316 

317def generate_remove_single_ensemble_member_constraint( 

318 ensemble_member: int = 0, **kwargs 

319) -> iris.Constraint: 

320 """ 

321 Generate a constraint to remove a single ensemble member. 

322 

323 Operator that returns a constraint to remove the given ensemble member. By 

324 default the ensemble member removed is the control member (assumed to have 

325 a realization of zero). However, any ensemble member can be removed, thus 

326 allowing a non-zero control member to be removed if the control is a 

327 different member. 

328 

329 Arguments 

330 --------- 

331 ensemble_member: int 

332 Default is 0. The ensemble member realization to remove. 

333 

334 Returns 

335 ------- 

336 iris.Constraint 

337 

338 Notes 

339 ----- 

340 This operator is primarily used to remove the control member to allow 

341 ensemble metrics to be calculated without the control member. For 

342 example, the ensemble mean is not normally calculated including the 

343 control member. It is particularly useful to remove the control member 

344 when it is not an equally-likely member of the ensemble. 

345 """ 

346 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

347 

348 

349def generate_realization_constraint( 

350 ensemble_members: int | list[int], **kwargs 

351) -> iris.Constraint: 

352 """ 

353 Generate a constraint to subset ensemble members. 

354 

355 Operator that is given a list of ensemble members and returns a constraint 

356 to select those ensemble members. This operator is particularly useful for 

357 subsetting ensembles. 

358 

359 Arguments 

360 --------- 

361 ensemble_members: int | list[int] 

362 The ensemble members to be subsetted over. 

363 

364 Returns 

365 ------- 

366 iris.Constraint 

367 """ 

368 # Ensure ensemble_members is iterable. 

369 ensemble_members = iter_maybe(ensemble_members) 

370 return iris.Constraint(realization=ensemble_members) 

371 

372 

373def generate_hour_constraint( 

374 hour_start: int, 

375 hour_end: int = None, 

376 **kwargs, 

377) -> iris.Constraint: 

378 """Generate an hour constraint between hour of day limits. 

379 

380 Operator that takes a set of hour of day limits and returns a constraint that 

381 selects only hours within that time frame regardless of day. 

382 

383 Alternatively, the result can be constrained to a single hour by just entering 

384 a starting hour. 

385 

386 Should any sub-hourly data be given these will have the same hour coordinate 

387 (e.g., 12:00 and 12:05 both have an hour coordinate of 12) all 

388 times will be selected with this constraint. 

389 

390 Arguments 

391 --------- 

392 hour_start: int 

393 The hour of day for the lower bound, within 0 to 23. 

394 hour_end: int | None 

395 The hour of day for the upper bound, within 0 to 23. Alternatively, 

396 set to None if only one hour required. 

397 

398 Returns 

399 ------- 

400 hour_constraint: iris.Constraint 

401 

402 Raises 

403 ------ 

404 ValueError 

405 If the provided arguments are outside of the range 0 to 23. 

406 """ 

407 if hour_end is None: 

408 hour_end = hour_start 

409 

410 if (hour_start < 0) or (hour_start > 23) or (hour_end < 0) or (hour_end > 23): 

411 raise ValueError("Hours must be between 0 and 23 inclusive.") 

412 

413 hour_constraint = iris.Constraint(hour=lambda h: hour_start <= h.point <= hour_end) 

414 return hour_constraint 

415 

416 

417def combine_constraints( 

418 constraint: iris.Constraint = None, **kwargs 

419) -> iris.Constraint: 

420 """ 

421 Operator that combines multiple constraints into one. 

422 

423 Arguments 

424 --------- 

425 constraint: iris.Constraint 

426 First constraint to combine. 

427 additional_constraint_1: iris.Constraint 

428 Second constraint to combine. This must be a named argument. 

429 additional_constraint_2: iris.Constraint 

430 There can be any number of additional constraint, they just need unique 

431 names. 

432 ... 

433 

434 Returns 

435 ------- 

436 combined_constraint: iris.Constraint 

437 

438 Raises 

439 ------ 

440 TypeError 

441 If the provided arguments are not constraints. 

442 """ 

443 # If the first argument is not a constraint, it is ignored. This handles the 

444 # automatic passing of the previous step's output. 

445 if isinstance(constraint, iris.Constraint): 

446 combined_constraint = constraint 

447 else: 

448 combined_constraint = iris.Constraint() 

449 

450 for constr in kwargs.values(): 

451 combined_constraint = combined_constraint & constr 

452 return combined_constraint