Coverage for src / CSET / operators / power_spectrum.py: 84%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 14:49 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for calculating power spectra.""" 

16 

17import logging 

18 

19import iris 

20import iris.coords 

21import iris.cube 

22import numpy as np 

23import scipy.fft as fft 

24from iris.util import new_axis 

25 

26 

27def calculate_power_spectrum(cubes): 

28 """Wrap power spectrum code. 

29 

30 This function is a wrapper that handles power spectrum 

31 calculations for both single cubes and cube lists and includes ensembles. 

32 

33 The input cube is split up into a cube for each model, 

34 time and realization and a power spectrum calculated for each before 

35 combining into one cube ahead of plotting. This is done to retain the 

36 model_name attribute correctly for different models. 

37 

38 In case of a CubeList (Multiple models and ensembles): It iterates through 

39 each cube and calculates an individual power spectrum. In case of a 

40 single cube (one model) it directly calculates the power spectrum. 

41 

42 Input: Cube OR CubeList 

43 Output: CubeList of power spectra. 

44 """ 

45 # ------------------------------------------------------------ 

46 # Cube list 

47 # ------------------------------------------------------------ 

48 

49 if isinstance(cubes, iris.cube.CubeList): 

50 out = iris.cube.CubeList() 

51 

52 for cube in cubes: 

53 model = cube.attributes.get("model_name") 

54 

55 # Ensembles: 

56 # Check whether data has more than 1 realization coord 

57 

58 real_coord = cube.coords("realization") 

59 is_ensemble = real_coord and len(real_coord[0].points) > 1 

60 

61 if is_ensemble: 

62 # Ensemble: Loop over each realization 

63 for subcube in cube.slices_over("realization"): 

64 realiz = int(subcube.coord("realization").points[0]) 

65 # calculate power spectrum of subcube 

66 ps = _power_spectrum(subcube) 

67 

68 # Attach model name if available 

69 if model: 

70 ps.attributes["model_name"] = model 

71 

72 # ADD the correct realization from the parent cube 

73 ps.add_aux_coord( 

74 iris.coords.AuxCoord(realiz, long_name="realization", units="1") 

75 ) 

76 

77 # PROMOTE realization coordinate to dimension 

78 ps = new_axis(ps, "realization") 

79 

80 out.append(ps) 

81 

82 else: 

83 # Not ensemble (or only 1 ensemble member) 

84 ps = _power_spectrum(cube) 

85 if model: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 ps.attributes["model_name"] = model 

87 

88 # ADD the correct realization from the parent cube 

89 # Promotion to dimension not required. 

90 ps.add_aux_coord( 

91 iris.coords.AuxCoord(0, long_name="realization", units="1") 

92 ) 

93 

94 out.append(ps) 

95 

96 if is_ensemble: 

97 # Merge the individual realization cubes into a single cube 

98 # for ensemble data 

99 merged = out.concatenate_cube() 

100 

101 return merged 

102 

103 else: 

104 return out 

105 

106 # ------------------------------------------------------------ 

107 # Single cube with realization coord 

108 cube = cubes 

109 out = iris.cube.CubeList() 

110 model = cube.attributes.get("model_name") 

111 

112 # Ensembles: 

113 # Check whether data has more than 1 realization coord 

114 

115 real_coord = cube.coords("realization") 

116 is_ensemble = real_coord and len(real_coord[0].points) > 1 

117 

118 if is_ensemble: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was never true

119 # Ensemble: Loop over each realization 

120 for subcube in cube.slices_over("realization"): 

121 realiz = int(subcube.coord("realization").points[0]) 

122 ps = _power_spectrum(subcube) 

123 

124 # Attach model name if available 

125 if model: 

126 ps.attributes["model_name"] = model 

127 

128 # ADD the correct realization from the parent cube 

129 ps.add_aux_coord( 

130 iris.coords.AuxCoord(realiz, long_name="realization", units="1") 

131 ) 

132 

133 # PROMOTE to dimension 

134 ps = new_axis(ps, "realization") 

135 

136 out.append(ps) 

137 

138 # Merge the individual realization cubes into a single cube 

139 # for ensemble data 

140 merged = out.concatenate_cube() 

141 

142 return merged 

143 

144 # ------------------------------------------------------------ 

145 # Single cube, no realizations 

146 # ------------------------------------------------------------ 

147 ps = _power_spectrum(cube) 

148 if model: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 ps.attributes["model_name"] = model 

150 

151 # ADD the correct realization from the parent cube 

152 # Promotion to dimension not required 

153 ps.add_aux_coord(iris.coords.AuxCoord(0, long_name="realization", units="1")) 

154 

155 return iris.cube.CubeList([ps]) 

156 

157 

158def _power_spectrum( 

159 cube: iris.cube.Cube | iris.cube.CubeList, 

160) -> iris.cube.Cube | iris.cube.CubeList: 

161 """Calculate power spectrum for a single cube for 1 vertical level at 1 time. 

162 

163 Parameters 

164 ---------- 

165 cube: Cube 

166 Data to plot as power spectrum. 

167 The cubes should cover the same phenomenon i.e. all cubes contain temperature data. 

168 We do not support different data such as temperature and humidity in the same CubeList 

169 for plotting. 

170 

171 Returns 

172 ------- 

173 iris.cube.Cube 

174 The power spectrum of the data. 

175 To be plotted and aggregation performed after. 

176 

177 Raises 

178 ------ 

179 ValueError 

180 If the cube doesn't have the right dimensions. 

181 TypeError 

182 If the cube isn't a Cube. 

183 """ 

184 # Extract time coordinate and convert to datetime 

185 time_coord = cube.coord("time") 

186 time_points = time_coord.units.num2date(time_coord.points) 

187 

188 if cube.ndim == 2: 

189 cube_3d = cube.data[np.newaxis, :, :] 

190 logging.debug("Adding in new axis for a 2 dimensional cube.") 

191 elif cube.ndim == 3: 

192 cube_3d = cube.data 

193 else: 

194 raise ValueError( 

195 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional." 

196 ) 

197 

198 # Calculate spectrum 

199 ps_array = _DCT_ps(cube_3d) 

200 

201 # Make wavenumber comparable between models with different domain sizes and resolutions. 

202 # Try to find appropriate spatial coordinates 

203 try: 

204 # Try projection coordinates first (most common for limited area models) 

205 x_coord = cube.coord("projection_x_coordinate") 

206 y_coord = cube.coord("projection_y_coordinate") 

207 

208 # Grid spacing already in meters for projection coordinates 

209 dx = np.abs(np.diff(x_coord.points).mean()) 

210 dy = np.abs(np.diff(y_coord.points).mean()) 

211 

212 logging.debug("Using projection coordinates for grid spacing calculation.") 

213 

214 except iris.exceptions.CoordinateNotFoundError: 

215 try: 

216 # Try rotated pole coordinates 

217 lat_coord = cube.coord("grid_latitude") 

218 lon_coord = cube.coord("grid_longitude") 

219 

220 R_earth = 6371000 # meters 

221 lat_mid = np.mean(lat_coord.points) 

222 

223 dx = ( 

224 np.abs(np.diff(lon_coord.points).mean()) 

225 * np.pi 

226 / 180 

227 * R_earth 

228 * np.cos(lat_mid * np.pi / 180) 

229 ) 

230 dy = np.abs(np.diff(lat_coord.points).mean()) * np.pi / 180 * R_earth 

231 

232 logging.debug( 

233 "Using rotated pole coordinates for grid spacing calculation." 

234 ) 

235 

236 except iris.exceptions.CoordinateNotFoundError: 

237 try: 

238 # Fall back to regular lat/lon coordinates 

239 lat_coord = cube.coord("latitude") 

240 lon_coord = cube.coord("longitude") 

241 

242 R_earth = 6371000 # meters 

243 lat_mid = np.mean(lat_coord.points) 

244 

245 dx = ( 

246 np.abs(np.diff(lon_coord.points).mean()) 

247 * np.pi 

248 / 180 

249 * R_earth 

250 * np.cos(lat_mid * np.pi / 180) 

251 ) 

252 dy = np.abs(np.diff(lat_coord.points).mean()) * np.pi / 180 * R_earth 

253 

254 logging.debug( 

255 "Using latitude/longitude coordinates for grid spacing calculation." 

256 ) 

257 

258 except iris.exceptions.CoordinateNotFoundError: 

259 raise ValueError( 

260 "Could not find appropriate spatial coordinates. " 

261 "Expected one of: 'projection_x_coordinate'/'projection_y_coordinate', " 

262 "'grid_latitude'/'grid_longitude', or 'latitude'/'longitude'." 

263 ) from None # intentionally replacing CoordinateNotFoundError with a more informative ValueError 

264 

265 domain_size_km = ((dx * cube_3d.shape[2]) + (dy * cube_3d.shape[1])) / 2 / 1000 

266 

267 # Convert wavenumber into physically meaningful wavenumber coordinate in 

268 # cycles per km rather than wavenumber per index k. 

269 ps_len = ps_array.shape[1] 

270 k_indices = np.arange(1, ps_len + 1) 

271 physical_wavenumbers = k_indices / domain_size_km # cycles/km 

272 

273 # Create a new DimCoord with physical wavenumber 

274 physical_wavenumbers_coord = iris.coords.DimCoord( 

275 physical_wavenumbers, long_name="physical_wavenumber", units="km-1" 

276 ) 

277 

278 # Calculate wavelength and add as auxiliary coordinate 

279 wavelengths = domain_size_km / k_indices # km 

280 wavelength_coord = iris.coords.AuxCoord( 

281 wavelengths, long_name="wavelength", units="km" 

282 ) 

283 

284 # Ensure power spectrum output is 2D: (time, frequency) 

285 if ps_array.ndim == 1: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 ps_array = ps_array[np.newaxis, :] 

287 

288 # Prepare time coordinate 

289 numeric_time = time_coord.units.date2num(time_points) 

290 numeric_time = np.atleast_1d(numeric_time) 

291 

292 # Make time coord length match the number of spectra 

293 if len(numeric_time) != ps_array.shape[0]: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0]) 

295 

296 new_time_coord = iris.coords.DimCoord( 

297 numeric_time, 

298 standard_name="time", 

299 units=time_coord.units, 

300 ) 

301 

302 # Create output cube with physical coordinates 

303 ps_cube = iris.cube.Cube( 

304 ps_array, 

305 dim_coords_and_dims=[ 

306 (new_time_coord, 0), 

307 (physical_wavenumbers_coord, 1), 

308 ], 

309 long_name="power_spectral_density", 

310 ) 

311 

312 # Add wavelength as auxiliary coordinate 

313 # Realization coordinate is added in _calculate_power_spectrum 

314 ps_cube.add_aux_coord(wavelength_coord, data_dims=1) 

315 

316 return ps_cube 

317 

318 

319def _DCT_ps(y_3d): 

320 """Calculate power spectra for regional domains. 

321 

322 Parameters 

323 ---------- 

324 y_3d: 3D array 

325 3 dimensional array to calculate spectrum for. 

326 (2D field data with 3rd dimension of time) 

327 

328 Returns: ps_array 

329 Array of power spectra values calculated for input field (for each time) 

330 

331 Method for regional domains: 

332 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT) 

333 as described in Denis et al 2002 [Denis_etal_2002]_. 

334 

335 References 

336 ---------- 

337 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002) 

338 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on 

339 Limited-Area Domains Using the Discrete Cosine Transform (DCT)" 

340 Monthly Weather Review, Vol. 130, 1812-1828 

341 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2 

342 """ 

343 Nt, Ny, Nx = y_3d.shape 

344 

345 # Max coefficient 

346 Nmin = min(Nx - 1, Ny - 1) 

347 

348 # Create alpha matrix (of wavenumbers) 

349 alpha_matrix = _create_alpha_matrix(Ny, Nx) 

350 

351 # Prepare output array 

352 ps_array = np.zeros((Nt, Nmin)) 

353 

354 # Loop over time to get spectrum for each time. 

355 for t in range(Nt): 

356 y_2d = y_3d[t] 

357 

358 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space. 

359 # fkk is a 2D array of DCT coefficients, representing the amplitudes of 

360 # cosine basis functions at different spatial frequencies. 

361 

362 # DCT transform and normalise spectrum to allow comparison between models. 

363 fkk = fft.dctn(y_2d, norm="ortho") 

364 

365 # calculate variance (energy) of spectral coefficient at each wavenumber pair (k_x, k_y) 

366 # as the square of the DCT coefficient, normalised by the total number of grid points (Nx * Ny). 

367 sigma_2 = fkk**2 / Nx / Ny 

368 

369 # Group ellipses of alphas into the same wavenumber k/Nmin 

370 for k in range(1, Nmin + 1): 

371 # Define the bounds of the current normalised wavenumber magnitude of bin k 

372 alpha = k / Nmin 

373 alpha_p1 = (k + 1) / Nmin 

374 

375 # Sum up elements matching in bin k and divide by bin size 

376 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1)) 

377 n_coeffs = len(mask_k[0]) # number of coefficients in bin k 

378 if n_coeffs > 0: 378 ↛ 383line 378 didn't jump to line 383 because the condition on line 378 was always true

379 ps_array[t, k - 1] = ( 

380 np.sum(sigma_2[mask_k]) / n_coeffs 

381 ) # average power in bin k 

382 else: 

383 ps_array[t, k - 1] = 0.0 

384 

385 return ps_array 

386 

387 

388def _create_alpha_matrix(Ny, Nx): 

389 """Construct an array of 2D wavenumbers from 2D wavenumber pair. 

390 

391 Parameters 

392 ---------- 

393 Ny, Nx: 

394 Dimensions of the 2D field for which the power spectra is calculated. Used to 

395 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a 

396 single-scale parameter. 

397 

398 Returns: alpha_matrix 

399 normalisation of 2D wavenumber axes, transforming the spectral domain into 

400 an elliptic coordinate system. 

401 

402 """ 

403 # Create x_indices: each row is [1, 2, ..., Nx] 

404 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1)) 

405 

406 # Create y_indices: each column is [1, 2, ..., Ny] 

407 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx)) 

408 

409 # Compute alpha_matrix 

410 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2) 

411 

412 return alpha_matrix