Coverage for src/CSET/operators/misc.py: 99%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-27 13:48 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Miscellaneous operators.""" 

16 

17import itertools 

18import logging 

19from collections.abc import Iterable 

20 

21import iris 

22import iris.analysis.calculus 

23import numpy as np 

24from iris.cube import Cube, CubeList 

25 

26from CSET._common import is_increasing, iter_maybe 

27from CSET.operators._utils import fully_equalise_attributes, get_cube_yxcoordname 

28from CSET.operators.regrid import regrid_onto_cube 

29 

30 

31def noop(x, **kwargs): 

32 """Return its input without doing anything to it. 

33 

34 Useful for constructing diagnostic chains. 

35 

36 Arguments 

37 --------- 

38 x: Any 

39 Input to return. 

40 

41 Returns 

42 ------- 

43 x: Any 

44 The input that was given. 

45 """ 

46 return x 

47 

48 

49def remove_attribute( 

50 cubes: Cube | CubeList, attribute: str | Iterable, **kwargs 

51) -> CubeList: 

52 """Remove a cube attribute. 

53 

54 If the attribute is not on the cube, the cube is passed through unchanged. 

55 

56 Arguments 

57 --------- 

58 cubes: Cube | CubeList 

59 One or more cubes to remove the attribute from. 

60 attribute: str | Iterable 

61 Name of attribute (or Iterable of names) to remove. 

62 

63 Returns 

64 ------- 

65 cubes: CubeList 

66 CubeList of cube(s) with the attribute removed. 

67 """ 

68 # Ensure cubes is a CubeList. 

69 if not isinstance(cubes, CubeList): 

70 cubes = CubeList(iter_maybe(cubes)) 

71 

72 for cube in cubes: 

73 for attr in iter_maybe(attribute): 

74 cube.attributes.pop(attr, None) 

75 

76 # Combine things that can be merged due to remove removing the 

77 # attributes. 

78 cubes = cubes.merge() 

79 # combine items that can be merged after removing unwanted attributes 

80 cubes = cubes.concatenate() 

81 return cubes 

82 

83 

84def remove_scalar_coords(cubes, coords): 

85 """Remove scalar coordinates. 

86 

87 examples would be: realization, forecast_reference_time from model cubes. 

88 """ 

89 if not isinstance(cubes, CubeList): 89 ↛ 92line 89 didn't jump to line 92 because the condition on line 89 was always true

90 cubes = CubeList([cubes]) 

91 

92 for cube in cubes: 

93 for coord_name in coords: 

94 if cube.coords(coord_name): 94 ↛ 93line 94 didn't jump to line 93 because the condition on line 94 was always true

95 coord = cube.coord(coord_name) 

96 # only remove if scalar 

97 if cube.coord_dims(coord) == (): 

98 cube.remove_coord(coord) 

99 

100 return cubes 

101 

102 

103def addition(addend_1, addend_2): 

104 """Addition of two fields. 

105 

106 Parameters 

107 ---------- 

108 addend_1: Cube 

109 Any field to have another field added to it. 

110 addend_2: Cube 

111 Any field to be added to another field. 

112 

113 Returns 

114 ------- 

115 Cube 

116 

117 Raises 

118 ------ 

119 ValueError, iris.exceptions.NotYetImplementedError 

120 When the cubes are not compatible. 

121 

122 Notes 

123 ----- 

124 This is a simple operator designed for combination of diagnostics or 

125 creating new diagnostics by using recipes. 

126 

127 Examples 

128 -------- 

129 >>> field_addition = misc.addition(kinetic_energy_u, kinetic_energy_v) 

130 

131 """ 

132 return addend_1 + addend_2 

133 

134 

135def subtraction(minuend, subtrahend): 

136 """Subtraction of two fields. 

137 

138 Parameters 

139 ---------- 

140 minuend: Cube 

141 Any field to have another field subtracted from it. 

142 subtrahend: Cube 

143 Any field to be subtracted from to another field. 

144 

145 Returns 

146 ------- 

147 Cube 

148 

149 Raises 

150 ------ 

151 ValueError, iris.exceptions.NotYetImplementedError 

152 When the cubes are not compatible. 

153 

154 Notes 

155 ----- 

156 This is a simple operator designed for combination of diagnostics or 

157 creating new diagnostics by using recipes. It can be used for model 

158 differences to allow for comparisons between the same field in different 

159 models or model configurations. 

160 

161 Examples 

162 -------- 

163 >>> model_diff = misc.subtraction(temperature_model_A, temperature_model_B) 

164 

165 """ 

166 return minuend - subtrahend 

167 

168 

169def division(numerator, denominator): 

170 """Division of two fields. 

171 

172 Parameters 

173 ---------- 

174 numerator: Cube 

175 Any field to have the ratio taken with respect to another field. 

176 denominator: Cube 

177 Any field used to divide another field or provide the reference 

178 value in a ratio. 

179 

180 Returns 

181 ------- 

182 Cube 

183 

184 Raises 

185 ------ 

186 ValueError 

187 When the cubes are not compatible. 

188 

189 Notes 

190 ----- 

191 This is a simple operator designed for combination of diagnostics or 

192 creating new diagnostics by using recipes. 

193 

194 Examples 

195 -------- 

196 >>> bowen_ratio = misc.division(sensible_heat_flux, latent_heat_flux) 

197 

198 """ 

199 return numerator / denominator 

200 

201 

202def multiplication( 

203 multiplicand: Cube | CubeList, multiplier: Cube | CubeList 

204) -> Cube | CubeList: 

205 """Multiplication of two fields. 

206 

207 Parameters 

208 ---------- 

209 multiplicand: Cube | CubeList 

210 Any field to be multiplied by another field. 

211 multiplier: Cube | CubeList 

212 Any field to be multiplied to another field. 

213 

214 Returns 

215 ------- 

216 Cube | CubeList 

217 The result of multiplicand x multiplier. 

218 

219 Raises 

220 ------ 

221 ValueError 

222 When the cubes are not compatible. 

223 

224 Notes 

225 ----- 

226 This is a simple operator designed for combination of diagnostics or 

227 creating new diagnostics by using recipes. CubeLists are multiplied 

228 on a strict ordering (e.g. first cube with first cube). 

229 

230 Examples 

231 -------- 

232 >>> filtered_CAPE_ratio = misc.multiplication(CAPE_ratio, inflow_layer_properties) 

233 

234 """ 

235 new_cubelist = iris.cube.CubeList([]) 

236 for cube_a, cube_b in zip( 

237 iter_maybe(multiplicand), iter_maybe(multiplier), strict=True 

238 ): 

239 multiplied_cube = cube_a * cube_b 

240 multiplied_cube.rename(f"{cube_a.name()}_x_{cube_b.name()}") 

241 new_cubelist.append(multiplied_cube) 

242 if len(new_cubelist) == 1: 

243 return new_cubelist[0] 

244 else: 

245 return new_cubelist 

246 

247 

248def combine_cubes_into_cubelist(first: Cube | CubeList, **kwargs) -> CubeList: 

249 """Operator that combines multiple cubes or CubeLists into one. 

250 

251 Arguments 

252 --------- 

253 first: Cube | CubeList 

254 First cube or CubeList to merge into CubeList. 

255 second: Cube | CubeList 

256 Second cube or CubeList to merge into CubeList. This must be a named 

257 argument. 

258 third: Cube | CubeList 

259 There can be any number of additional arguments, they just need unique 

260 names. 

261 ... 

262 

263 Returns 

264 ------- 

265 combined_cubelist: CubeList 

266 Combined CubeList containing all cubes/CubeLists. 

267 

268 Raises 

269 ------ 

270 TypeError: 

271 If the provided arguments are not either a Cube or CubeList. 

272 """ 

273 # Create empty CubeList to store cubes/CubeList. 

274 all_cubes = CubeList() 

275 # Combine all CubeLists into a single flat iterable. 

276 for item in itertools.chain(iter_maybe(first), *map(iter_maybe, kwargs.values())): 

277 # Check each item is a Cube, erroring if not. 

278 if isinstance(item, Cube): 

279 # Add cube to CubeList. 

280 all_cubes.append(item) 

281 else: 

282 raise TypeError("Not a Cube or CubeList!", item) 

283 return all_cubes 

284 

285 

286def difference(cubes: CubeList): 

287 """Difference of two fields. 

288 

289 Parameters 

290 ---------- 

291 cubes: CubeList 

292 A list of exactly two cubes. One must have the cset_comparison_base 

293 attribute set to 1, and will be used as the base of the comparison. 

294 

295 Returns 

296 ------- 

297 Cube 

298 

299 Raises 

300 ------ 

301 ValueError 

302 When the cubes are not compatible. 

303 

304 Notes 

305 ----- 

306 This is a simple operator designed for combination of diagnostics or 

307 creating new diagnostics by using recipes. It can be used for model 

308 differences to allow for comparisons between the same field in different 

309 models or model configurations. 

310 

311 Examples 

312 -------- 

313 >>> model_diff = misc.difference(temperature_model_A, temperature_model_B) 

314 

315 """ 

316 if len(cubes) != 2: 

317 raise ValueError("cubes should contain exactly 2 cubes.") 

318 base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1)) 

319 other: Cube = cubes.extract_cube( 

320 iris.Constraint( 

321 cube_func=lambda cube: "cset_comparison_base" not in cube.attributes 

322 ) 

323 ) 

324 

325 # If cubes contain a pressure coordinate, ensure it is increasing. 

326 for cube in cubes: 

327 try: 

328 if len(cube.coord("pressure").points) > 2: 328 ↛ 326line 328 didn't jump to line 326 because the condition on line 328 was always true

329 if not is_increasing(cube.coord("pressure").points): 

330 cube.data = np.flip(cube.data, axis=cube.coord_dims("pressure")[0]) 

331 

332 except iris.exceptions.CoordinateNotFoundError: 

333 pass 

334 

335 # Get spatial coord names. 

336 base_lat_name, base_lon_name = get_cube_yxcoordname(base) 

337 other_lat_name, other_lon_name = get_cube_yxcoordname(other) 

338 

339 # Ensure cubes to compare are on common differencing grid. 

340 # This is triggered if either 

341 # i) latitude and longitude shapes are not the same. Note grid points 

342 # are not compared directly as these can differ through rounding 

343 # errors. 

344 # ii) or variables are known to often sit on different grid staggering 

345 # in different models (e.g. cell center vs cell edge), as is the case 

346 # for UM and LFRic comparisons. 

347 # In future greater choice of regridding method might be applied depending 

348 # on variable type. Linear regridding can in general be appropriate for smooth 

349 # variables. Care should be taken with interpretation of differences 

350 # given this dependency on regridding. 

351 if ( 

352 base.coord(base_lat_name).shape != other.coord(other_lat_name).shape 

353 or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape 

354 ) or ( 

355 base.long_name 

356 in [ 

357 "eastward_wind_at_10m", 

358 "northward_wind_at_10m", 

359 "northward_wind_at_cell_centres", 

360 "eastward_wind_at_cell_centres", 

361 "zonal_wind_at_pressure_levels", 

362 "meridional_wind_at_pressure_levels", 

363 "potential_vorticity_at_pressure_levels", 

364 "vapour_specific_humidity_at_pressure_levels_for_climate_averaging", 

365 ] 

366 ): 

367 logging.debug( 

368 "Linear regridding base cube to other grid to compute differences" 

369 ) 

370 base = regrid_onto_cube(base, other, method="Linear") 

371 

372 # Figure out if we are comparing between UM and LFRic; flip array if so. 

373 base_lat_direction = is_increasing(base.coord(base_lat_name).points) 

374 other_lat_direction = is_increasing(other.coord(other_lat_name).points) 

375 if base_lat_direction != other_lat_direction: 

376 other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other)) 

377 

378 # Extract just common time points. 

379 base, other = _extract_common_time_points(base, other) 

380 

381 # Equalise attributes so we can merge. 

382 fully_equalise_attributes([base, other]) 

383 logging.debug("Base: %s\nOther: %s", base, other) 

384 

385 # This currently relies on the cubes having the same underlying data layout. 

386 difference = base.copy() 

387 

388 # Differences don't have a standard name; long name gets a suffix. We are 

389 # assuming we can rely on cubes having a long name, so we don't check for 

390 # its presents. 

391 difference.standard_name = None 

392 difference.long_name = ( 

393 base.long_name if base.long_name else base.name() 

394 ) + "_difference" 

395 if base.var_name: 

396 difference.var_name = base.var_name + "_difference" 

397 elif base.standard_name: 

398 difference.var_name = base.standard_name + "_difference" 

399 

400 difference.data = other.data - base.data 

401 return difference 

402 

403 

404def _extract_common_time_points(base: Cube, other: Cube) -> tuple[Cube, Cube]: 

405 """Extract common time points from cubes to allow comparison.""" 

406 # Get the name of the first non-scalar time coordinate. 

407 time_coord = next( 

408 map( 

409 lambda coord: coord.name(), 

410 filter( 

411 lambda coord: coord.shape > (1,) and coord.name() in ["time", "hour"], 

412 base.coords(), 

413 ), 

414 ), 

415 None, 

416 ) 

417 if not time_coord: 

418 logging.debug("No time coord, skipping equalisation.") 

419 return (base, other) 

420 base_time_coord = base.coord(time_coord) 

421 other_time_coord = other.coord(time_coord) 

422 logging.debug("Base: %s\nOther: %s", base_time_coord, other_time_coord) 

423 if time_coord == "hour": 

424 # We directly compare points when comparing coordinates with 

425 # non-absolute units, such as hour. We can't just check the units are 

426 # equal as iris automatically converts to datetime objects in the 

427 # comparison for certain coordinate names. 

428 base_times = base_time_coord.points 

429 other_times = other_time_coord.points 

430 shared_times = set.intersection(set(base_times), set(other_times)) 

431 else: 

432 # Units don't match, so converting to datetimes for comparison. 

433 base_times = base_time_coord.units.num2date(base_time_coord.points) 

434 other_times = other_time_coord.units.num2date(other_time_coord.points) 

435 shared_times = set.intersection(set(base_times), set(other_times)) 

436 logging.debug("Shared times: %s", shared_times) 

437 time_constraint = iris.Constraint( 

438 coord_values={time_coord: lambda cell: cell.point in shared_times} 

439 ) 

440 # Extract points matching the shared times. 

441 base = base.extract(time_constraint) 

442 other = other.extract(time_constraint) 

443 if base is None or other is None: 

444 raise ValueError("No common time points found!") 

445 return (base, other) 

446 

447 

448def convert_units(cubes: iris.cube.Cube | iris.cube.CubeList, units: str): 

449 """Convert the units of a cube. 

450 

451 Arguments 

452 --------- 

453 cubes: iris.cube.Cube | iris.cube.CubeList 

454 A Cube or CubeList of a field for its units to be converted. 

455 

456 units: str 

457 The unit that the original field is to be converted to. It takes 

458 CF compliant units. 

459 

460 Returns 

461 ------- 

462 iris.cube.Cube | iris.cube.CubeList 

463 The field converted into the specified units. 

464 

465 Examples 

466 -------- 

467 >>> T_in_F = misc.convert_units(temperature_in_K, "Fahrenheit") 

468 

469 """ 

470 new_cubelist = iris.cube.CubeList([]) 

471 for cube in iter_maybe(cubes): 

472 # Copy cube to keep original data. 

473 cube_a = cube.copy() 

474 # Convert cube units. 

475 cube_a.convert_units(units) 

476 new_cubelist.append(cube_a) 

477 if len(new_cubelist) == 1: 

478 return new_cubelist[0] 

479 else: 

480 return new_cubelist 

481 

482 

483def rename_cube(cubes: iris.cube.Cube | iris.cube.CubeList, name: str): 

484 """Rename a cube. 

485 

486 Arguments 

487 --------- 

488 cubes: iris.cube.Cube | iris.cube.CubeList 

489 A Cube or CubeList of a field to be renamed. 

490 

491 name: str 

492 The new name of the cube. It should be CF compliant. 

493 

494 Returns 

495 ------- 

496 iris.cube.Cube | iris.cube.CubeList 

497 The renamed field. 

498 

499 Notes 

500 ----- 

501 This operator is designed to be used when the output field name does not 

502 match expectations or needs to be different to defaults in standard_name, var_name or 

503 long_name. For example, if combining masks 

504 to create light rain you would like the field to be named "mask_for_light_rain" 

505 rather than "mask_for_microphysical_precip_gt_0.0_x_mask_for_microphysical_precip_lt_2.0". 

506 

507 Examples 

508 -------- 

509 >>> light_rain_mask = misc.rename_cube(light_rain_mask,"mask_for_light_rainfall" 

510 """ 

511 new_cubelist = iris.cube.CubeList([]) 

512 for cube in iter_maybe(cubes): 

513 cube.rename(name) 

514 new_cubelist.append(cube) 

515 if len(new_cubelist) == 1: 

516 return new_cubelist[0] 

517 else: 

518 return new_cubelist 

519 

520 

521def _slice_cube_on_levels(cube: iris.cube.Cube, coord_name: str, levels: list): 

522 """ 

523 Extract levels from a cube for a given coordinate. 

524 

525 Arguments 

526 --------- 

527 cube: iris.cube.Cube 

528 A Cube to be sliced. 

529 

530 coord_name: str 

531 The coordinate name to be sliced 

532 

533 levels: list 

534 A list containing points to be extracted from the cube. 

535 

536 Returns 

537 ------- 

538 iris.cube.Cube 

539 The sliced cube. 

540 """ 

541 coord = cube.coord(coord_name) 

542 (dim_index,) = cube.coord_dims(coord) 

543 

544 mask = np.isin(coord.points, levels) 

545 

546 slicer = [slice(None)] * cube.ndim 

547 slicer[dim_index] = mask 

548 

549 return cube[tuple(slicer)] 

550 

551 

552def extract_common_points(cubes: iris.cube.CubeList, coordinate: str): 

553 """ 

554 Extract common points for a given coordinate between two cubes. 

555 

556 Parameters 

557 ---------- 

558 cubes: iris.cube.CubeList 

559 CubeList containing exactly two cubes. 

560 

561 coordinate: str 

562 The coordinate name to be checked for common levels. 

563 

564 Returns 

565 ------- 

566 iris.cube.CubeList 

567 CubeList containing the two cubes sliced to common levels 

568 for the given coordinate. 

569 """ 

570 # Check type of input 

571 if type(cubes) is not iris.cube.CubeList: 

572 raise TypeError(f"Not a CubeList, got type {type(cubes)}") 

573 

574 # Check that only two cubes are passed into function. 

575 if len(cubes) != 2: 

576 raise ValueError(f"Maximum of two cubes allowed, received {len(cubes)}") 

577 

578 # Extract coordinate 

579 try: 

580 p1 = cubes[0].coord(coordinate) 

581 p2 = cubes[1].coord(coordinate) 

582 except iris.exceptions.CoordinateNotFoundError as err: 

583 raise ValueError(f"Both cubes must have an {coordinate} coordinate") from err 

584 

585 # Find common points 

586 common_points = np.intersect1d(p1.points, p2.points) 

587 

588 # Check that common points is more than zero. 

589 if common_points.size == 0: 

590 raise ValueError("No common levels found") 

591 

592 # Extract common points 

593 cube0_common = _slice_cube_on_levels(cubes[0], coordinate, common_points) 

594 cube1_common = _slice_cube_on_levels(cubes[1], coordinate, common_points) 

595 

596 return iris.cube.CubeList([cube0_common, cube1_common]) 

597 

598 

599def differentiate( 

600 cubes: iris.cube.Cube | iris.cube.CubeList, coordinate: str, **kwargs 

601) -> iris.cube.Cube | iris.cube.CubeList: 

602 """Differentiate a cube on a specified coordinate. 

603 

604 Arguments 

605 --------- 

606 cubes: iris.cube.Cube | iris.cube.CubeList 

607 A Cube or CubeList of a field that is to be differentiated. 

608 

609 coordinate: str 

610 The coordinate that is to be differentiated over. 

611 

612 Returns 

613 ------- 

614 iris.cube.Cube | iris.cube.CubeList 

615 The differential of the cube along the specified coordinate. 

616 

617 Notes 

618 ----- 

619 The differential is calculated based on a carteisan grid. This calculation 

620 is then suitable for vertical and temporal derivatives. It is not sensible 

621 for horizontal derivatives if they are based on spherical coordinates (e.g. 

622 latitude and longitude). In essence this operator is a CSET wrapper around 

623 `iris.analysis.calculus.differentiate <https://scitools-iris.readthedocs.io/en/stable/generated/api/iris.analysis.calculus.html#iris.analysis.calculus.differentiate>`_. 

624 

625 Examples 

626 -------- 

627 >>> dT_dz = misc.differentiate(temperature, "altitude") 

628 """ 

629 new_cubelist = iris.cube.CubeList([]) 

630 for cube in iter_maybe(cubes): 

631 dcube = iris.analysis.calculus.differentiate(cube, coordinate) 

632 new_cubelist.append(dcube) 

633 if len(new_cubelist) == 1: 

634 return new_cubelist[0] 

635 else: 

636 return new_cubelist