Coverage for src / CSET / operators / misc.py: 100%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 14:49 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Miscellaneous operators.""" 

16 

17import itertools 

18import logging 

19from collections.abc import Iterable 

20 

21import iris 

22import numpy as np 

23from iris.cube import Cube, CubeList 

24 

25from CSET._common import is_increasing, iter_maybe 

26from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname 

27from CSET.operators.regrid import regrid_onto_cube 

28 

29 

30def noop(x, **kwargs): 

31 """Return its input without doing anything to it. 

32 

33 Useful for constructing diagnostic chains. 

34 

35 Arguments 

36 --------- 

37 x: Any 

38 Input to return. 

39 

40 Returns 

41 ------- 

42 x: Any 

43 The input that was given. 

44 """ 

45 return x 

46 

47 

48def remove_attribute( 

49 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs 

50) -> CubeList: 

51 """Remove a cube attribute. 

52 

53 If the attribute is not on the cube, the cube is passed through unchanged. 

54 

55 Arguments 

56 --------- 

57 cubes: Cube | CubeList 

58 One or more cubes to remove the attribute from. 

59 attribute: str | Iterable 

60 Name of attribute (or Iterable of names) to remove. 

61 

62 Returns 

63 ------- 

64 cubes: CubeList 

65 CubeList of cube(s) with the attribute removed. 

66 """ 

67 # Ensure cubes is a CubeList. 

68 if not isinstance(cubes, CubeList): 

69 cubes = CubeList(iter_maybe(cubes)) 

70 

71 for cube in cubes: 

72 for attr in iter_maybe(attribute): 

73 cube.attributes.pop(attr, None) 

74 

75 # Combine things that can be merged due to remove removing the 

76 # attributes. 

77 cubes = cubes.merge() 

78 # combine items that can be merged after removing unwanted attributes 

79 cubes = cubes.concatenate() 

80 return cubes 

81 

82 

83def addition(addend_1, addend_2): 

84 """Addition of two fields. 

85 

86 Parameters 

87 ---------- 

88 addend_1: Cube 

89 Any field to have another field added to it. 

90 addend_2: Cube 

91 Any field to be added to another field. 

92 

93 Returns 

94 ------- 

95 Cube 

96 

97 Raises 

98 ------ 

99 ValueError, iris.exceptions.NotYetImplementedError 

100 When the cubes are not compatible. 

101 

102 Notes 

103 ----- 

104 This is a simple operator designed for combination of diagnostics or 

105 creating new diagnostics by using recipes. 

106 

107 Examples 

108 -------- 

109 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v) 

110 

111 """ 

112 return addend_1 + addend_2 

113 

114 

115def subtraction(minuend, subtrahend): 

116 """Subtraction of two fields. 

117 

118 Parameters 

119 ---------- 

120 minuend: Cube 

121 Any field to have another field subtracted from it. 

122 subtrahend: Cube 

123 Any field to be subtracted from to another field. 

124 

125 Returns 

126 ------- 

127 Cube 

128 

129 Raises 

130 ------ 

131 ValueError, iris.exceptions.NotYetImplementedError 

132 When the cubes are not compatible. 

133 

134 Notes 

135 ----- 

136 This is a simple operator designed for combination of diagnostics or 

137 creating new diagnostics by using recipes. It can be used for model 

138 differences to allow for comparisons between the same field in different 

139 models or model configurations. 

140 

141 Examples 

142 -------- 

143 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B) 

144 

145 """ 

146 return minuend - subtrahend 

147 

148 

149def division(numerator, denominator): 

150 """Division of two fields. 

151 

152 Parameters 

153 ---------- 

154 numerator: Cube 

155 Any field to have the ratio taken with respect to another field. 

156 denominator: Cube 

157 Any field used to divide another field or provide the reference 

158 value in a ratio. 

159 

160 Returns 

161 ------- 

162 Cube 

163 

164 Raises 

165 ------ 

166 ValueError 

167 When the cubes are not compatible. 

168 

169 Notes 

170 ----- 

171 This is a simple operator designed for combination of diagnostics or 

172 creating new diagnostics by using recipes. 

173 

174 Examples 

175 -------- 

176 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux) 

177 

178 """ 

179 return numerator / denominator 

180 

181 

182def multiplication( 

183 multiplicand: Cube | CubeList, multiplier: Cube | CubeList 

184) -> Cube | CubeList: 

185 """Multiplication of two fields. 

186 

187 Parameters 

188 ---------- 

189 multiplicand: Cube | CubeList 

190 Any field to be multiplied by another field. 

191 multiplier: Cube | CubeList 

192 Any field to be multiplied to another field. 

193 

194 Returns 

195 ------- 

196 Cube | CubeList 

197 The result of multiplicand x multiplier. 

198 

199 Raises 

200 ------ 

201 ValueError 

202 When the cubes are not compatible. 

203 

204 Notes 

205 ----- 

206 This is a simple operator designed for combination of diagnostics or 

207 creating new diagnostics by using recipes. CubeLists are multiplied 

208 on a strict ordering (e.g. first cube with first cube). 

209 

210 Examples 

211 -------- 

212 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties) 

213 

214 """ 

215 new_cubelist = iris.cube.CubeList([]) 

216 for cube_a, cube_b in zip( 

217 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True 

218 ): 

219 multiplied_cube = cube_a * cube_b 

220 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}") 

221 new_cubelist.append(multiplied_cube) 

222 if len(new_cubelist) == 1: 

223 return new_cubelist[0] 

224 else: 

225 return new_cubelist 

226 

227 

228def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList: 

229 """Operator that combines multiple cubes or CubeLists into one. 

230 

231 Arguments 

232 --------- 

233 first: Cube | CubeList 

234 First cube or CubeList to merge into CubeList. 

235 second: Cube | CubeList 

236 Second cube or CubeList to merge into CubeList. This must be a named 

237 argument. 

238 third: Cube | CubeList 

239 There can be any number of additional arguments, they just need unique 

240 names. 

241 ... 

242 

243 Returns 

244 ------- 

245 combined_cubelist: CubeList 

246 Combined CubeList containing all cubes/CubeLists. 

247 

248 Raises 

249 ------ 

250 TypeError: 

251 If the provided arguments are not either a Cube or CubeList. 

252 """ 

253 # Create empty CubeList to store cubes/CubeList. 

254 all_cubes = CubeList() 

255 # Combine all CubeLists into a single flat iterable. 

256 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())): 

257 # Check each item is a Cube, erroring if not. 

258 if isinstance(item, Cube): 

259 # Add cube to CubeList. 

260 all_cubes.append(item) 

261 else: 

262 raise TypeError("Not a Cube or CubeList!", item) 

263 return all_cubes 

264 

265 

266def difference(cubes: CubeList): 

267 """Difference of two fields. 

268 

269 Parameters 

270 ---------- 

271 cubes: CubeList 

272 A list of exactly two cubes. One must have the cset_comparison_base 

273 attribute set to 1, and will be used as the base of the comparison. 

274 

275 Returns 

276 ------- 

277 Cube 

278 

279 Raises 

280 ------ 

281 ValueError 

282 When the cubes are not compatible. 

283 

284 Notes 

285 ----- 

286 This is a simple operator designed for combination of diagnostics or 

287 creating new diagnostics by using recipes. It can be used for model 

288 differences to allow for comparisons between the same field in different 

289 models or model configurations. 

290 

291 Examples 

292 -------- 

293 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B) 

294 

295 """ 

296 if len(cubes) != 2: 

297 raise ValueError("cubes should contain exactly 2 cubes.") 

298 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1)) 

299 other: Cube = cubes.extract_cube( 

300 iris.Constraint( 

301 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes 

302 ) 

303 ) 

304 

305 # Get spatial coord names. 

306 base_lat_name, base_lon_name = get_cube_yxcoordname(base) 

307 other_lat_name, other_lon_name = get_cube_yxcoordname(other) 

308 

309 # Ensure cubes to compare are on common differencing grid. 

310 # This is triggered if either 

311 # i) latitude and longitude shapes are not the same. Note grid points 

312 # are not compared directly as these can differ through rounding 

313 # errors. 

314 # ii) or variables are known to often sit on different grid staggering 

315 # in different models (e.g. cell center vs cell edge), as is the case 

316 # for UM and LFRic comparisons. 

317 # In future greater choice of regridding method might be applied depending 

318 # on variable type. Linear regridding can in general be appropriate for smooth 

319 # variables. Care should be taken with interpretation of differences 

320 # given this dependency on regridding. 

321 if ( 

322 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape 

323 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape 

324 ) or ( 

325 base.long_name 

326 in [ 

327 "eastward_wind_at_10m", 

328 "northward_wind_at_10m", 

329 "northward_wind_at_cell_centres", 

330 "eastward_wind_at_cell_centres", 

331 "zonal_wind_at_pressure_levels", 

332 "meridional_wind_at_pressure_levels", 

333 "potential_vorticity_at_pressure_levels", 

334 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging", 

335 ] 

336 ): 

337 logging.debug( 

338 "Linear regridding base cube to other grid to compute differences" 

339 ) 

340 base = regrid_onto_cube(base, other, method="Linear") 

341 

342 # Figure out if we are comparing between UM and LFRic; flip array if so. 

343 base_lat_direction = is_increasing(base.coord(base_lat_name).points) 

344 other_lat_direction = is_increasing(other.coord(other_lat_name).points) 

345 if base_lat_direction != other_lat_direction: 

346 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other)) 

347 

348 # Extract just common time points. 

349 base, other = _extract_common_time_points(base, other) 

350 

351 # Equalise attributes so we can merge. 

352 fully_equalise_attributes([base, other]) 

353 logging.debug("Base: %s\nOther: %s", base, other) 

354 

355 # This currently relies on the cubes having the same underlying data layout. 

356 difference = base.copy() 

357 

358 # Differences don't have a standard name; long name gets a suffix. We are 

359 # assuming we can rely on cubes having a long name, so we don't check for 

360 # its presents. 

361 difference.standard_name = None 

362 difference.long_name = ( 

363 base.long_name if base.long_name else base.name() 

364 ) + "_difference" 

365 if base.var_name: 

366 difference.var_name = base.var_name + "_difference" 

367 elif base.standard_name: 

368 difference.var_name = base.standard_name + "_difference" 

369 

370 difference.data = base.data - other.data 

371 return difference 

372 

373 

374def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]: 

375 """Extract common time points from cubes to allow comparison.""" 

376 # Get the name of the first non-scalar time coordinate. 

377 time_coord = next( 

378 map( 

379 lambda coord: coord.name(), 

380 filter( 

381 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"], 

382 base.coords(), 

383 ), 

384 ), 

385 None, 

386 ) 

387 if not time_coord: 

388 logging.debug("No time coord, skipping equalisation.") 

389 return (base, other) 

390 base_time_coord = base.coord(time_coord) 

391 other_time_coord = other.coord(time_coord) 

392 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord) 

393 if time_coord == "hour": 

394 # We directly compare points when comparing coordinates with 

395 # non-absolute units, such as hour. We can't just check the units are 

396 # equal as iris automatically converts to datetime objects in the 

397 # comparison for certain coordinate names. 

398 base_times = base_time_coord.points 

399 other_times = other_time_coord.points 

400 shared_times = set.intersection(set(base_times), set(other_times)) 

401 else: 

402 # Units don't match, so converting to datetimes for comparison. 

403 base_times = base_time_coord.units.num2date(base_time_coord.points) 

404 other_times = other_time_coord.units.num2date(other_time_coord.points) 

405 shared_times = set.intersection(set(base_times), set(other_times)) 

406 logging.debug("Shared times: %s", shared_times) 

407 time_constraint = iris.Constraint( 

408 coord_values={time_coord: lambda cell: cell.point in shared_times} 

409 ) 

410 # Extract points matching the shared times. 

411 base = base.extract(time_constraint) 

412 other = other.extract(time_constraint) 

413 if base is None or other is None: 

414 raise ValueError("No common time points found!") 

415 return (base, other) 

416 

417 

418def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str): 

419 """Convert the units of a cube. 

420 

421 Arguments 

422 --------- 

423 cubes: iris.cube.Cube | iris.cube.CubeList 

424 A Cube or CubeList of a field for its units to be converted. 

425 

426 units: str 

427 The unit that the original field is to be converted to. It takes 

428 CF compliant units. 

429 

430 Returns 

431 ------- 

432 iris.cube.Cube | iris.cube.CubeList 

433 The field converted into the specified units. 

434 

435 Examples 

436 -------- 

437 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit") 

438 

439 """ 

440 new_cubelist = iris.cube.CubeList([]) 

441 for cube in iter_maybe(cubes): 

442 # Copy cube to keep original data. 

443 cube_a = cube.copy() 

444 # Convert cube units. 

445 cube_a.convert_units(units) 

446 new_cubelist.append(cube_a) 

447 if len(new_cubelist) == 1: 

448 return new_cubelist[0] 

449 else: 

450 return new_cubelist 

451 

452 

453def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str): 

454 """Rename a cube. 

455 

456 Arguments 

457 --------- 

458 cubes: iris.cube.Cube | iris.cube.CubeList 

459 A Cube or CubeList of a field to be renamed. 

460 

461 name: str 

462 The new name of the cube. It should be CF compliant. 

463 

464 Returns 

465 ------- 

466 iris.cube.Cube | iris.cube.CubeList 

467 The renamed field. 

468 

469 Notes 

470 ----- 

471 This operator is designed to be used when the output field name does not 

472 match expectations or needs to be different to defaults in standard_name, var_name or 

473 long_name. For example, if combining masks 

474 to create light rain you would like the field to be named "mask_for_light_rain" 

475 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0". 

476 

477 Examples 

478 -------- 

479 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall" 

480 """ 

481 new_cubelist = iris.cube.CubeList([]) 

482 for cube in iter_maybe(cubes): 

483 cube.rename(name) 

484 new_cubelist.append(cube) 

485 if len(new_cubelist) == 1: 

486 return new_cubelist[0] 

487 else: 

488 return new_cubelist