Coverage for src/CSET/operators/power_spectrum.py: 95%

97 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-18 11:14 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for calculating power spectra.""" 

16 

17import logging 

18 

19import iris 

20import iris.coords 

21import iris.cube 

22import iris.exceptions 

23import numpy as np 

24import scipy.fft as fft 

25 

26from CSET._common import iter_maybe 

27 

28 

29def calculate_power_spectrum(cubes: iris.cube.Cube | iris.cube.CubeList): 

30 """Wrap power spectrum code. 

31 

32 This function is a wrapper that handles power spectrum 

33 calculations for both single cubes and cube lists and includes ensembles. 

34 

35 The input cube is split up into a cube for each model, 

36 time and realization and a power spectrum calculated for each before 

37 combining into one cube ahead of plotting. This is done to retain the 

38 model_name attribute correctly for different models. 

39 

40 In case of a CubeList (Multiple models and ensembles): It iterates through 

41 each cube and calculates an individual power spectrum. In case of a 

42 single cube (one model) it directly calculates the power spectrum. 

43 

44 Parameters 

45 ---------- 

46 cubes: Cube | CubeList 

47 Field over which to calculate a power spectrum. 

48 

49 Returns 

50 ------- 

51 Cube | CubeList: 

52 CubeList of power spectra. 

53 """ 

54 out = iris.cube.CubeList() 

55 for cube in iter_maybe(cubes): 

56 model = cube.attributes.get("model_name") 

57 

58 # Check whether data has a realization coord. 

59 if cube.coords("realization"): 

60 members_and_realizations = [ 

61 (member, int(member.coord("realization").points[0])) 

62 for member in cube.slices_over("realization") 

63 ] 

64 else: 

65 members_and_realizations = [(cube, None)] 

66 

67 # Loop over each realization. 

68 member_power_spectra = iris.cube.CubeList() 

69 for member, realiz in members_and_realizations: 

70 # Calculate power spectrum. 

71 ps = _power_spectrum(member) 

72 # Attach model name if available. 

73 if model: 

74 ps.attributes["model_name"] = model 

75 # Add the correct realization from the parent cube. 

76 if realiz is not None: 

77 ps.add_aux_coord( 

78 iris.coords.AuxCoord(realiz, long_name="realization", units="1") 

79 ) 

80 # Promote to dimension coordinate. 

81 ps = iris.util.new_axis(ps, "realization") 

82 member_power_spectra.append(ps) 

83 

84 # Merge the individual realization cubes into a single cube, then 

85 # squeeze off length 1 realization coordinates. 

86 combined_cube = member_power_spectra.concatenate_cube() 

87 combined_cube = iris.util.squeeze(combined_cube) 

88 out.append(combined_cube) 

89 

90 # Directly return cube if we only have one. 

91 if len(out) == 1: 

92 return out[0] 

93 else: 

94 return out 

95 

96 

97def _power_spectrum(cube: iris.cube.Cube) -> iris.cube.Cube: 

98 """Calculate power spectrum for a single cube for 1 vertical level at 1 time. 

99 

100 Parameters 

101 ---------- 

102 cube: Cube 

103 Data to plot as power spectrum. 

104 The cubes should cover the same phenomenon i.e. all cubes contain temperature data. 

105 We do not support different data such as temperature and humidity in the same CubeList 

106 for plotting. 

107 

108 Returns 

109 ------- 

110 iris.cube.Cube 

111 The power spectrum of the data. 

112 To be plotted and aggregation performed after. 

113 

114 Raises 

115 ------ 

116 ValueError 

117 If the cube doesn't have the right dimensions. 

118 TypeError 

119 If the cube isn't a Cube. 

120 """ 

121 # Extract time coordinate and convert to datetime 

122 time_coord = cube.coord("time") 

123 time_points = time_coord.units.num2date(time_coord.points) 

124 

125 if cube.ndim == 2: 

126 cube_3d = cube.data[np.newaxis, :, :] 

127 logging.debug("Adding in new axis for a 2 dimensional cube.") 

128 elif cube.ndim == 3: 

129 cube_3d = cube.data 

130 else: 

131 raise ValueError( 

132 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional." 

133 ) 

134 

135 # Calculate spectrum 

136 ps_array = _DCT_ps(cube_3d) 

137 

138 # Make wavenumber comparable between models with different domain sizes and 

139 # resolutions. Try to find appropriate spatial coordinates. 

140 coord_pairs = ( 

141 ("projection_x_coordinate", "projection_y_coordinate"), 

142 ("grid_latitude", "grid_longitude"), 

143 ("latitude", "longitude"), 

144 ) 

145 for x_coord_name, y_coord_name in coord_pairs: 

146 try: 

147 # Try projection coordinates first (most common for limited area models) 

148 x_coord = cube.coord(x_coord_name) 

149 y_coord = cube.coord(y_coord_name) 

150 except iris.exceptions.CoordinateNotFoundError: 

151 continue 

152 logging.debug( 

153 "Using %s and %s coordinates for grid spacing calculation.", 

154 x_coord_name, 

155 y_coord_name, 

156 ) 

157 break # Break out of loop if we found usable coords. 

158 else: 

159 # Raise error if no usable coords found. 

160 raise ValueError( 

161 "Could not find appropriate spatial coordinates. " 

162 "Expected one of: 'projection_x_coordinate'/'projection_y_coordinate', " 

163 "'grid_latitude'/'grid_longitude', or 'latitude'/'longitude'." 

164 ) 

165 

166 # Calculate grid spacing. 

167 dx = np.abs(np.diff(x_coord.points).mean()) 

168 dy = np.abs(np.diff(y_coord.points).mean()) 

169 if "latitude" in x_coord.name(): 

170 # Convert from degrees to meters. x is lat, y is lon. 

171 R_earth = 6371000 # meters 

172 lat_mid = np.mean(x_coord.points) 

173 dx = dx * np.pi / 180 * R_earth * np.cos(lat_mid * np.pi / 180) 

174 dy = dy * np.pi / 180 * R_earth 

175 domain_size_km = ((dx * cube_3d.shape[2]) + (dy * cube_3d.shape[1])) / 2 / 1000 

176 

177 # Convert wavenumber into physically meaningful wavenumber coordinate in 

178 # cycles per km rather than wavenumber per index k. 

179 ps_len = ps_array.shape[1] 

180 k_indices = np.arange(1, ps_len + 1) 

181 physical_wavenumbers = k_indices / domain_size_km # cycles/km 

182 

183 # Create a new DimCoord with physical wavenumber 

184 physical_wavenumbers_coord = iris.coords.DimCoord( 

185 physical_wavenumbers, long_name="physical_wavenumber", units="km-1" 

186 ) 

187 

188 # Calculate wavelength and add as auxiliary coordinate 

189 wavelengths = domain_size_km / k_indices # km 

190 wavelength_coord = iris.coords.AuxCoord( 

191 wavelengths, long_name="wavelength", units="km" 

192 ) 

193 

194 # Ensure power spectrum output is 2D: (time, frequency) 

195 if ps_array.ndim == 1: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 ps_array = ps_array[np.newaxis, :] 

197 

198 # Prepare time coordinate 

199 numeric_time = time_coord.units.date2num(time_points) 

200 numeric_time = np.atleast_1d(numeric_time) 

201 

202 # Make time coord length match the number of spectra 

203 if len(numeric_time) != ps_array.shape[0]: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0]) 

205 

206 new_time_coord = iris.coords.DimCoord( 

207 numeric_time, 

208 standard_name="time", 

209 units=time_coord.units, 

210 ) 

211 

212 # Create output cube with physical coordinates 

213 ps_cube = iris.cube.Cube( 

214 ps_array, 

215 dim_coords_and_dims=[ 

216 (new_time_coord, 0), 

217 (physical_wavenumbers_coord, 1), 

218 ], 

219 long_name="power_spectral_density", 

220 ) 

221 

222 # Add wavelength as auxiliary coordinate 

223 # Realization coordinate is added in _calculate_power_spectrum 

224 ps_cube.add_aux_coord(wavelength_coord, data_dims=1) 

225 

226 return ps_cube 

227 

228 

229def _DCT_ps(y_3d): 

230 """Calculate power spectra for regional domains. 

231 

232 Parameters 

233 ---------- 

234 y_3d: 3D array 

235 3 dimensional array to calculate spectrum for. 

236 (2D field data with 3rd dimension of time) 

237 

238 Returns 

239 ------- 

240 ps_array: 

241 Array of power spectra values calculated for input field (for each time) 

242 

243 Method for regional domains: 

244 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT) 

245 as described in Denis et al 2002 [Denis_etal_2002]_. 

246 

247 References 

248 ---------- 

249 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002) 

250 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on 

251 Limited-Area Domains Using the Discrete Cosine Transform (DCT)" 

252 Monthly Weather Review, Vol. 130, 1812-1828 

253 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2 

254 """ 

255 Nt, Ny, Nx = y_3d.shape 

256 

257 # Max coefficient 

258 Nmin = min(Nx - 1, Ny - 1) 

259 

260 # Create alpha matrix (of wavenumbers) 

261 alpha_matrix = _create_alpha_matrix(Ny, Nx) 

262 

263 # Prepare output array 

264 ps_array = np.zeros((Nt, Nmin)) 

265 

266 # Loop over time to get spectrum for each time. 

267 for t in range(Nt): 

268 y_2d = y_3d[t] 

269 

270 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space. 

271 # fkk is a 2D array of DCT coefficients, representing the amplitudes of 

272 # cosine basis functions at different spatial frequencies. 

273 

274 # DCT transform and normalise spectrum to allow comparison between models. 

275 fkk = fft.dctn(y_2d, norm="ortho") 

276 

277 # calculate variance (energy) of spectral coefficient at each wavenumber pair (k_x, k_y) 

278 # as the square of the DCT coefficient, normalised by the total number of grid points (Nx * Ny). 

279 sigma_2 = fkk**2 / Nx / Ny 

280 

281 # Group ellipses of alphas into the same wavenumber k/Nmin 

282 for k in range(1, Nmin + 1): 

283 # Define the bounds of the current normalised wavenumber magnitude of bin k 

284 alpha = k / Nmin 

285 alpha_p1 = (k + 1) / Nmin 

286 

287 # Sum up elements matching in bin k and divide by bin size 

288 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1)) 

289 n_coeffs = len(mask_k[0]) # number of coefficients in bin k 

290 if n_coeffs > 0: 290 ↛ 295line 290 didn't jump to line 295 because the condition on line 290 was always true

291 ps_array[t, k - 1] = ( 

292 np.sum(sigma_2[mask_k]) / n_coeffs 

293 ) # average power in bin k 

294 else: 

295 ps_array[t, k - 1] = 0.0 

296 

297 return ps_array 

298 

299 

300def _create_alpha_matrix(Ny, Nx): 

301 """Construct an array of 2D wavenumbers from 2D wavenumber pair. 

302 

303 Parameters 

304 ---------- 

305 Ny, Nx: 

306 Dimensions of the 2D field for which the power spectra is calculated. Used to 

307 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a 

308 single-scale parameter. 

309 

310 Returns 

311 ------- 

312 alpha_matrix: 

313 normalisation of 2D wavenumber axes, transforming the spectral domain into 

314 an elliptic coordinate system. 

315 

316 """ 

317 # Create x_indices: each row is [1, 2, ..., Nx] 

318 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1)) 

319 

320 # Create y_indices: each column is [1, 2, ..., Ny] 

321 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx)) 

322 

323 # Compute alpha_matrix 

324 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2) 

325 

326 return alpha_matrix