Coverage for src/CSET/operators/constraints.py: 91%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 21:08 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to generate constraints to filter with.""" 

16 

17import numbers 

18import re 

19from collections.abc import Iterable 

20from datetime import datetime 

21 

22import iris 

23import iris.coords 

24import iris.cube 

25 

26from CSET._common import iter_maybe 

27 

28 

29def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint: 

30 """Generate constraint from STASH code. 

31 

32 Operator that takes a stash string, and uses iris to generate a constraint 

33 to be passed into the read operator to minimize the CubeList the read 

34 operator loads and speed up loading. 

35 

36 Arguments 

37 --------- 

38 stash: str 

39 stash code to build iris constraint, such as "m01s03i236" 

40 

41 Returns 

42 ------- 

43 stash_constraint: iris.AttributeConstraint 

44 """ 

45 # At a later stage str list an option to combine constraints. Arguments 

46 # could be a list of stash codes that combined build the constraint. 

47 stash_constraint = iris.AttributeConstraint(STASH=stash) 

48 return stash_constraint 

49 

50 

51def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint: 

52 """Generate constraint from variable name or STASH code. 

53 

54 Operator that takes a CF compliant variable name string, and generates an 

55 iris constraint to be passed into the read or filter operator. Can also be 

56 passed a STASH code to generate a STASH constraint. 

57 

58 Arguments 

59 --------- 

60 varname: str 

61 CF compliant name of variable, or a UM STASH code such as "m01s03i236". 

62 

63 Returns 

64 ------- 

65 varname_constraint: iris.Constraint 

66 """ 

67 if re.match(r"m[0-9]{2}s[0-9]{2}i[0-9]{3}$", varname): 

68 varname_constraint = iris.AttributeConstraint(STASH=varname) 

69 else: 

70 varname_constraint = iris.Constraint(name=varname) 

71 return varname_constraint 

72 

73 

74def generate_level_constraint( 

75 coordinate: str, levels: int | list[int] | str, **kwargs 

76) -> iris.Constraint: 

77 """Generate constraint for particular levels on the specified coordinate. 

78 

79 Operator that generates a constraint to constrain to specific model or 

80 pressure levels. If no levels are specified then any cube with the specified 

81 coordinate is rejected. 

82 

83 Typically ``coordinate`` will be ``"pressure"`` or ``"model_level_number"`` 

84 for UM, or ``"full_levels"`` or ``"half_levels"`` for LFRic. 

85 

86 Arguments 

87 --------- 

88 coordinate: str 

89 Level coordinate name about which to constraint. 

90 levels: int | list[int] | str 

91 CF compliant level points, ``"*"`` for retrieving all levels, or 

92 ``[]`` for no levels. 

93 

94 Returns 

95 ------- 

96 constraint: iris.Constraint 

97 

98 Notes 

99 ----- 

100 Due to the specification of ``coordinate`` as an argument any iterable 

101 coordinate can be stratified with this function. Therefore, 

102 ``"realization"`` is a valid option. Subsequently, ``levels`` specifies the 

103 ensemble members, or group of ensemble members you wish to constrain your 

104 results over. 

105 """ 

106 # If asterisks, then return all levels for given coordinate. 

107 if levels == "*": 

108 return iris.Constraint(**{coordinate: lambda cell: True}) 

109 else: 

110 # Ensure is iterable. 

111 if not isinstance(levels, Iterable): 

112 levels = [levels] 

113 

114 # When no levels specified reject cube with level coordinate. 

115 if len(levels) == 0: 

116 

117 def no_levels(cube): 

118 # Reject cubes for which coordinate exists. 

119 return not cube.coords(coordinate) 

120 

121 return iris.Constraint(cube_func=no_levels) 

122 

123 # Filter the coordinate to the desired levels. 

124 # Dictionary unpacking is used to provide programmatic keyword arguments. 

125 return iris.Constraint(**{coordinate: levels}) 

126 

127 

128def generate_cell_methods_constraint( 

129 cell_methods: list, 

130 varname: str | None = None, 

131 coord: iris.coords.Coord | None = None, 

132 interval: str | None = None, 

133 comment: str | None = None, 

134 **kwargs, 

135) -> iris.Constraint: 

136 """Generate constraint from cell methods. 

137 

138 Operator that takes a list of cell methods and generates a constraint from 

139 that. Use [] to specify non-aggregated data. 

140 

141 Arguments 

142 --------- 

143 cell_methods: list 

144 cube.cell_methods for filtering. 

145 varname: str, optional 

146 CF compliant name of variable. 

147 coord: iris.coords.Coord, optional 

148 iris.coords.Coord to which the cell method is applied to. 

149 interval: str, optional 

150 interval over which the cell method is applied to (e.g. 1 hour). 

151 comment: str, optional 

152 any comments in Cube meta data associated with the cell method. 

153 

154 Returns 

155 ------- 

156 cell_method_constraint: iris.Constraint 

157 """ 

158 if len(cell_methods) == 0: 

159 

160 def check_no_aggregation(cube: iris.cube.Cube) -> bool: 

161 """Check that any cell methods are "point", meaning no aggregation.""" 

162 return set(cm.method for cm in cube.cell_methods) <= {"point"} 

163 

164 def check_cell_sum(cube: iris.cube.Cube) -> bool: 

165 """Check that any cell methods are "sum".""" 

166 return set(cm.method for cm in cube.cell_methods) == {"sum"} 

167 

168 if varname: 

169 # Require number_of_lightning_flashes to be "sum" cell_method input. 

170 # Require surface_microphyisical_rainfall_amount and surface_microphysical_snowfall_amount to be "sum" cell_method inputs. 

171 if ("lightning" in varname) or ( 171 ↛ 178line 171 didn't jump to line 178 because the condition on line 171 was always true

172 "surface_microphysical" in varname and "amount" in varname 

173 ): 

174 cell_methods_constraint = iris.Constraint(cube_func=check_cell_sum) 

175 return cell_methods_constraint 

176 

177 # If no variable name set, assume require instantaneous cube. 

178 cell_methods_constraint = iris.Constraint(cube_func=check_no_aggregation) 

179 

180 else: 

181 # If cell_method constraint set in recipe, check for required input. 

182 def check_cell_methods(cube: iris.cube.Cube) -> bool: 

183 return all( 

184 iris.coords.CellMethod( 

185 method=cm, coords=coord, intervals=interval, comments=comment 

186 ) 

187 in cube.cell_methods 

188 for cm in cell_methods 

189 ) 

190 

191 cell_methods_constraint = iris.Constraint(cube_func=check_cell_methods) 

192 

193 return cell_methods_constraint 

194 

195 

196def generate_time_constraint( 

197 time_start: str, time_end: str = None, **kwargs 

198) -> iris.AttributeConstraint: 

199 """Generate constraint between times. 

200 

201 Operator that takes one or two ISO 8601 date strings, and returns a 

202 constraint that selects values between those dates (inclusive). 

203 

204 Arguments 

205 --------- 

206 time_start: str | datetime.datetime 

207 ISO date for lower bound 

208 

209 time_end: str | datetime.datetime 

210 ISO date for upper bound. If omitted it defaults to the same as 

211 time_start 

212 

213 Returns 

214 ------- 

215 time_constraint: iris.Constraint 

216 """ 

217 if isinstance(time_start, str): 

218 time_start = datetime.fromisoformat(time_start) 

219 if time_end is None: 

220 time_end = time_start 

221 elif isinstance(time_end, str): 

222 time_end = datetime.fromisoformat(time_end) 

223 time_constraint = iris.Constraint(time=lambda t: time_start <= t.point <= time_end) 

224 return time_constraint 

225 

226 

227def generate_area_constraint( 

228 lat_start: float | None, 

229 lat_end: float | None, 

230 lon_start: float | None, 

231 lon_end: float | None, 

232 **kwargs, 

233) -> iris.Constraint: 

234 """Generate an area constraint between latitude/longitude limits. 

235 

236 Operator that takes a set of latitude and longitude limits and returns a 

237 constraint that selects grid values only inside that area. Works with the 

238 data's native grid so is defined within the rotated pole CRS. 

239 

240 Alternatively, all arguments may be None to indicate the area should not be 

241 constrained. This is useful to allow making subsetting an optional step in a 

242 processing pipeline. 

243 

244 Arguments 

245 --------- 

246 lat_start: float | None 

247 Latitude value for lower bound 

248 lat_end: float | None 

249 Latitude value for top bound 

250 lon_start: float | None 

251 Longitude value for left bound 

252 lon_end: float | None 

253 Longitude value for right bound 

254 

255 Returns 

256 ------- 

257 area_constraint: iris.Constraint 

258 """ 

259 # Check all arguments are defined, or all are None. 

260 if not ( 

261 all( 

262 ( 

263 isinstance(lat_start, numbers.Real), 

264 isinstance(lat_end, numbers.Real), 

265 isinstance(lon_start, numbers.Real), 

266 isinstance(lon_end, numbers.Real), 

267 ) 

268 ) 

269 or all((lat_start is None, lat_end is None, lon_start is None, lon_end is None)) 

270 ): 

271 raise TypeError("Bounds must real numbers, or all None.") 

272 

273 # Don't constrain area if all arguments are None. 

274 if lat_start is None: # Only need to check once, as they will be the same. 

275 # An empty constraint allows everything. 

276 return iris.Constraint() 

277 

278 # Handle bounds crossing the date line. 

279 if lon_end < lon_start: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 lon_end = lon_end + 360 

281 

282 def bound_lat(cell: iris.coords.Cell) -> bool: 

283 return lat_start < cell < lat_end 

284 

285 def bound_lon(cell: iris.coords.Cell) -> bool: 

286 # Adjust cell values to handle crossing the date line. 

287 if cell < lon_start: 

288 cell = cell + 360 

289 return lon_start < cell < lon_end 

290 

291 area_constraint = iris.Constraint( 

292 coord_values={"grid_latitude": bound_lat, "grid_longitude": bound_lon} 

293 ) 

294 return area_constraint 

295 

296 

297def generate_remove_single_ensemble_member_constraint( 

298 ensemble_member: int = 0, **kwargs 

299) -> iris.Constraint: 

300 """ 

301 Generate a constraint to remove a single ensemble member. 

302 

303 Operator that returns a constraint to remove the given ensemble member. By 

304 default the ensemble member removed is the control member (assumed to have 

305 a realization of zero). However, any ensemble member can be removed, thus 

306 allowing a non-zero control member to be removed if the control is a 

307 different member. 

308 

309 Arguments 

310 --------- 

311 ensemble_member: int 

312 Default is 0. The ensemble member realization to remove. 

313 

314 Returns 

315 ------- 

316 iris.Constraint 

317 

318 Notes 

319 ----- 

320 This operator is primarily used to remove the control member to allow 

321 ensemble metrics to be calculated without the control member. For 

322 example, the ensemble mean is not normally calculated including the 

323 control member. It is particularly useful to remove the control member 

324 when it is not an equally-likely member of the ensemble. 

325 """ 

326 return iris.Constraint(realization=lambda m: m.point != ensemble_member) 

327 

328 

329def generate_realization_constraint( 

330 ensemble_members: int | list[int], **kwargs 

331) -> iris.Constraint: 

332 """ 

333 Generate a constraint to subset ensemble members. 

334 

335 Operator that is given a list of ensemble members and returns a constraint 

336 to select those ensemble members. This operator is particularly useful for 

337 subsetting ensembles. 

338 

339 Arguments 

340 --------- 

341 ensemble_members: int | list[int] 

342 The ensemble members to be subsetted over. 

343 

344 Returns 

345 ------- 

346 iris.Constraint 

347 """ 

348 # Ensure ensemble_members is iterable. 

349 ensemble_members = iter_maybe(ensemble_members) 

350 return iris.Constraint(realization=ensemble_members) 

351 

352 

353def combine_constraints( 

354 constraint: iris.Constraint = None, **kwargs 

355) -> iris.Constraint: 

356 """ 

357 Operator that combines multiple constraints into one. 

358 

359 Arguments 

360 --------- 

361 constraint: iris.Constraint 

362 First constraint to combine. 

363 additional_constraint_1: iris.Constraint 

364 Second constraint to combine. This must be a named argument. 

365 additional_constraint_2: iris.Constraint 

366 There can be any number of additional constraint, they just need unique 

367 names. 

368 ... 

369 

370 Returns 

371 ------- 

372 combined_constraint: iris.Constraint 

373 

374 Raises 

375 ------ 

376 TypeError 

377 If the provided arguments are not constraints. 

378 """ 

379 # If the first argument is not a constraint, it is ignored. This handles the 

380 # automatic passing of the previous step's output. 

381 if isinstance(constraint, iris.Constraint): 

382 combined_constraint = constraint 

383 else: 

384 combined_constraint = iris.Constraint() 

385 

386 for constr in kwargs.values(): 

387 combined_constraint = combined_constraint & constr 

388 return combined_constraint