Coverage for src / CSET / operators / misc.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-26 14:31 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Miscellaneous operators.""" 

16 

17import itertools 

18import logging 

19from collections.abc import Iterable 

20 

21import iris 

22import numpy as np 

23from iris.cube import Cube, CubeList 

24 

25from CSET._common import is_increasing, iter_maybe 

26from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname 

27from CSET.operators.regrid import regrid_onto_cube 

28 

29 

30def noop(x, **kwargs): 

31 """Return its input without doing anything to it. 

32 

33 Useful for constructing diagnostic chains. 

34 

35 Arguments 

36 --------- 

37 x: Any 

38 Input to return. 

39 

40 Returns 

41 ------- 

42 x: Any 

43 The input that was given. 

44 """ 

45 return x 

46 

47 

48def remove_attribute( 

49 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs 

50) -> CubeList: 

51 """Remove a cube attribute. 

52 

53 If the attribute is not on the cube, the cube is passed through unchanged. 

54 

55 Arguments 

56 --------- 

57 cubes: Cube | CubeList 

58 One or more cubes to remove the attribute from. 

59 attribute: str | Iterable 

60 Name of attribute (or Iterable of names) to remove. 

61 

62 Returns 

63 ------- 

64 cubes: CubeList 

65 CubeList of cube(s) with the attribute removed. 

66 """ 

67 # Ensure cubes is a CubeList. 

68 if not isinstance(cubes, CubeList): 

69 cubes = CubeList(iter_maybe(cubes)) 

70 

71 for cube in cubes: 

72 for attr in iter_maybe(attribute): 

73 cube.attributes.pop(attr, None) 

74 return cubes 

75 

76 

77def addition(addend_1, addend_2): 

78 """Addition of two fields. 

79 

80 Parameters 

81 ---------- 

82 addend_1: Cube 

83 Any field to have another field added to it. 

84 addend_2: Cube 

85 Any field to be added to another field. 

86 

87 Returns 

88 ------- 

89 Cube 

90 

91 Raises 

92 ------ 

93 ValueError, iris.exceptions.NotYetImplementedError 

94 When the cubes are not compatible. 

95 

96 Notes 

97 ----- 

98 This is a simple operator designed for combination of diagnostics or 

99 creating new diagnostics by using recipes. 

100 

101 Examples 

102 -------- 

103 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v) 

104 

105 """ 

106 return addend_1 + addend_2 

107 

108 

109def subtraction(minuend, subtrahend): 

110 """Subtraction of two fields. 

111 

112 Parameters 

113 ---------- 

114 minuend: Cube 

115 Any field to have another field subtracted from it. 

116 subtrahend: Cube 

117 Any field to be subtracted from to another field. 

118 

119 Returns 

120 ------- 

121 Cube 

122 

123 Raises 

124 ------ 

125 ValueError, iris.exceptions.NotYetImplementedError 

126 When the cubes are not compatible. 

127 

128 Notes 

129 ----- 

130 This is a simple operator designed for combination of diagnostics or 

131 creating new diagnostics by using recipes. It can be used for model 

132 differences to allow for comparisons between the same field in different 

133 models or model configurations. 

134 

135 Examples 

136 -------- 

137 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B) 

138 

139 """ 

140 return minuend - subtrahend 

141 

142 

143def division(numerator, denominator): 

144 """Division of two fields. 

145 

146 Parameters 

147 ---------- 

148 numerator: Cube 

149 Any field to have the ratio taken with respect to another field. 

150 denominator: Cube 

151 Any field used to divide another field or provide the reference 

152 value in a ratio. 

153 

154 Returns 

155 ------- 

156 Cube 

157 

158 Raises 

159 ------ 

160 ValueError 

161 When the cubes are not compatible. 

162 

163 Notes 

164 ----- 

165 This is a simple operator designed for combination of diagnostics or 

166 creating new diagnostics by using recipes. 

167 

168 Examples 

169 -------- 

170 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux) 

171 

172 """ 

173 return numerator / denominator 

174 

175 

176def multiplication( 

177 multiplicand: Cube | CubeList, multiplier: Cube | CubeList 

178) -> Cube | CubeList: 

179 """Multiplication of two fields. 

180 

181 Parameters 

182 ---------- 

183 multiplicand: Cube | CubeList 

184 Any field to be multiplied by another field. 

185 multiplier: Cube | CubeList 

186 Any field to be multiplied to another field. 

187 

188 Returns 

189 ------- 

190 Cube | CubeList 

191 The result of multiplicand x multiplier. 

192 

193 Raises 

194 ------ 

195 ValueError 

196 When the cubes are not compatible. 

197 

198 Notes 

199 ----- 

200 This is a simple operator designed for combination of diagnostics or 

201 creating new diagnostics by using recipes. CubeLists are multiplied 

202 on a strict ordering (e.g. first cube with first cube). 

203 

204 Examples 

205 -------- 

206 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties) 

207 

208 """ 

209 new_cubelist = iris.cube.CubeList([]) 

210 for cube_a, cube_b in zip( 

211 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True 

212 ): 

213 multiplied_cube = cube_a * cube_b 

214 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}") 

215 new_cubelist.append(multiplied_cube) 

216 if len(new_cubelist) == 1: 

217 return new_cubelist[0] 

218 else: 

219 return new_cubelist 

220 

221 

222def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList: 

223 """Operator that combines multiple cubes or CubeLists into one. 

224 

225 Arguments 

226 --------- 

227 first: Cube | CubeList 

228 First cube or CubeList to merge into CubeList. 

229 second: Cube | CubeList 

230 Second cube or CubeList to merge into CubeList. This must be a named 

231 argument. 

232 third: Cube | CubeList 

233 There can be any number of additional arguments, they just need unique 

234 names. 

235 ... 

236 

237 Returns 

238 ------- 

239 combined_cubelist: CubeList 

240 Combined CubeList containing all cubes/CubeLists. 

241 

242 Raises 

243 ------ 

244 TypeError: 

245 If the provided arguments are not either a Cube or CubeList. 

246 """ 

247 # Create empty CubeList to store cubes/CubeList. 

248 all_cubes = CubeList() 

249 # Combine all CubeLists into a single flat iterable. 

250 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())): 

251 # Check each item is a Cube, erroring if not. 

252 if isinstance(item, Cube): 

253 # Add cube to CubeList. 

254 all_cubes.append(item) 

255 else: 

256 raise TypeError("Not a Cube or CubeList!", item) 

257 return all_cubes 

258 

259 

260def difference(cubes: CubeList): 

261 """Difference of two fields. 

262 

263 Parameters 

264 ---------- 

265 cubes: CubeList 

266 A list of exactly two cubes. One must have the cset_comparison_base 

267 attribute set to 1, and will be used as the base of the comparison. 

268 

269 Returns 

270 ------- 

271 Cube 

272 

273 Raises 

274 ------ 

275 ValueError 

276 When the cubes are not compatible. 

277 

278 Notes 

279 ----- 

280 This is a simple operator designed for combination of diagnostics or 

281 creating new diagnostics by using recipes. It can be used for model 

282 differences to allow for comparisons between the same field in different 

283 models or model configurations. 

284 

285 Examples 

286 -------- 

287 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B) 

288 

289 """ 

290 if len(cubes) != 2: 

291 raise ValueError("cubes should contain exactly 2 cubes.") 

292 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1)) 

293 other: Cube = cubes.extract_cube( 

294 iris.Constraint( 

295 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes 

296 ) 

297 ) 

298 

299 # Get spatial coord names. 

300 base_lat_name, base_lon_name = get_cube_yxcoordname(base) 

301 other_lat_name, other_lon_name = get_cube_yxcoordname(other) 

302 

303 # Ensure cubes to compare are on common differencing grid. 

304 # This is triggered if either 

305 # i) latitude and longitude shapes are not the same. Note grid points 

306 # are not compared directly as these can differ through rounding 

307 # errors. 

308 # ii) or variables are known to often sit on different grid staggering 

309 # in different models (e.g. cell center vs cell edge), as is the case 

310 # for UM and LFRic comparisons. 

311 # In future greater choice of regridding method might be applied depending 

312 # on variable type. Linear regridding can in general be appropriate for smooth 

313 # variables. Care should be taken with interpretation of differences 

314 # given this dependency on regridding. 

315 if ( 

316 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape 

317 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape 

318 ) or ( 

319 base.long_name 

320 in [ 

321 "eastward_wind_at_10m", 

322 "northward_wind_at_10m", 

323 "northward_wind_at_cell_centres", 

324 "eastward_wind_at_cell_centres", 

325 "zonal_wind_at_pressure_levels", 

326 "meridional_wind_at_pressure_levels", 

327 "potential_vorticity_at_pressure_levels", 

328 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging", 

329 ] 

330 ): 

331 logging.debug( 

332 "Linear regridding base cube to other grid to compute differences" 

333 ) 

334 base = regrid_onto_cube(base, other, method="Linear") 

335 

336 # Figure out if we are comparing between UM and LFRic; flip array if so. 

337 base_lat_direction = is_increasing(base.coord(base_lat_name).points) 

338 other_lat_direction = is_increasing(other.coord(other_lat_name).points) 

339 if base_lat_direction != other_lat_direction: 

340 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other)) 

341 

342 # Extract just common time points. 

343 base, other = _extract_common_time_points(base, other) 

344 

345 # Equalise attributes so we can merge. 

346 fully_equalise_attributes([base, other]) 

347 logging.debug("Base: %s\nOther: %s", base, other) 

348 

349 # This currently relies on the cubes having the same underlying data layout. 

350 difference = base.copy() 

351 

352 # Differences don't have a standard name; long name gets a suffix. We are 

353 # assuming we can rely on cubes having a long name, so we don't check for 

354 # its presents. 

355 difference.standard_name = None 

356 difference.long_name = ( 

357 base.long_name if base.long_name else base.name() 

358 ) + "_difference" 

359 if base.var_name: 

360 difference.var_name = base.var_name + "_difference" 

361 elif base.standard_name: 

362 difference.var_name = base.standard_name + "_difference" 

363 

364 difference.data = base.data - other.data 

365 return difference 

366 

367 

368def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]: 

369 """Extract common time points from cubes to allow comparison.""" 

370 # Get the name of the first non-scalar time coordinate. 

371 time_coord = next( 

372 map( 

373 lambda coord: coord.name(), 

374 filter( 

375 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"], 

376 base.coords(), 

377 ), 

378 ), 

379 None, 

380 ) 

381 if not time_coord: 

382 logging.debug("No time coord, skipping equalisation.") 

383 return (base, other) 

384 base_time_coord = base.coord(time_coord) 

385 other_time_coord = other.coord(time_coord) 

386 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord) 

387 if time_coord == "hour": 

388 # We directly compare points when comparing coordinates with 

389 # non-absolute units, such as hour. We can't just check the units are 

390 # equal as iris automatically converts to datetime objects in the 

391 # comparison for certain coordinate names. 

392 base_times = base_time_coord.points 

393 other_times = other_time_coord.points 

394 shared_times = set.intersection(set(base_times), set(other_times)) 

395 else: 

396 # Units don't match, so converting to datetimes for comparison. 

397 base_times = base_time_coord.units.num2date(base_time_coord.points) 

398 other_times = other_time_coord.units.num2date(other_time_coord.points) 

399 shared_times = set.intersection(set(base_times), set(other_times)) 

400 logging.debug("Shared times: %s", shared_times) 

401 time_constraint = iris.Constraint( 

402 coord_values={time_coord: lambda cell: cell.point in shared_times} 

403 ) 

404 # Extract points matching the shared times. 

405 base = base.extract(time_constraint) 

406 other = other.extract(time_constraint) 

407 if base is None or other is None: 

408 raise ValueError("No common time points found!") 

409 return (base, other) 

410 

411 

412def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str): 

413 """Convert the units of a cube. 

414 

415 Arguments 

416 --------- 

417 cubes: iris.cube.Cube | iris.cube.CubeList 

418 A Cube or CubeList of a field for its units to be converted. 

419 

420 units: str 

421 The unit that the original field is to be converted to. It takes 

422 CF compliant units. 

423 

424 Returns 

425 ------- 

426 iris.cube.Cube | iris.cube.CubeList 

427 The field converted into the specified units. 

428 

429 Examples 

430 -------- 

431 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit") 

432 

433 """ 

434 new_cubelist = iris.cube.CubeList([]) 

435 for cube in iter_maybe(cubes): 

436 # Copy cube to keep original data. 

437 cube_a = cube.copy() 

438 # Convert cube units. 

439 cube_a.convert_units(units) 

440 new_cubelist.append(cube_a) 

441 if len(new_cubelist) == 1: 

442 return new_cubelist[0] 

443 else: 

444 return new_cubelist 

445 

446 

447def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str): 

448 """Rename a cube. 

449 

450 Arguments 

451 --------- 

452 cubes: iris.cube.Cube | iris.cube.CubeList 

453 A Cube or CubeList of a field to be renamed. 

454 

455 name: str 

456 The new name of the cube. It should be CF compliant. 

457 

458 Returns 

459 ------- 

460 iris.cube.Cube | iris.cube.CubeList 

461 The renamed field. 

462 

463 Notes 

464 ----- 

465 This operator is designed to be used when the output field name does not 

466 match expectations or needs to be different to defaults in standard_name, var_name or 

467 long_name. For example, if combining masks 

468 to create light rain you would like the field to be named "mask_for_light_rain" 

469 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0". 

470 

471 Examples 

472 -------- 

473 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall" 

474 """ 

475 new_cubelist = iris.cube.CubeList([]) 

476 for cube in iter_maybe(cubes): 

477 cube.rename(name) 

478 new_cubelist.append(cube) 

479 if len(new_cubelist) == 1: 

480 return new_cubelist[0] 

481 else: 

482 return new_cubelist