Coverage for src / CSET / operators / power_spectrum.py: 51%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-02 17:30 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators for calculating power spectra.""" 

16 

17import logging 

18 

19import iris 

20import iris.coords 

21import iris.cube 

22import numpy as np 

23import scipy.fft as fft 

24 

25 

26def calculate_power_spectrum(cubes): 

27 """Wrap power spectrum code. 

28 

29 This function is a wrapper that handles power spectrum  

30 calculations for both single cubes and cube lists. 

31  

32 The input cube is split up into a cube 

33 for each model and time and a power spectrum calculated for each before 

34 combining into one cube ahead of plotting. This is done to retain the 

35 model_name attribute correctly for different models. 

36 

37 In case of a CubeList (Mutiple models): It iterates through  

38 each cube and calculates an individual power spectrum. In case of a  

39 single cube (one model) it directly calculates the power spectrum. 

40  

41 Input: Cube OR CubeList 

42 Output: CubeList of power spectra. 

43 """ 

44 # Multi-model input (CubeList) 

45 if isinstance(cubes, iris.cube.CubeList): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 out = iris.cube.CubeList() 

47 for cube in cubes: 

48 model = cube.attributes.get("model_name") 

49 # Calculate power spectrum 

50 ps = _power_spectrum(cube) 

51 if model is not None: 

52 ps.attributes["model_name"] = model 

53 out.append(ps) 

54 return out 

55 

56 # Single cube (one model) 

57 model = cubes.attributes.get("model_name") 

58 # Calculate power spectrum 

59 ps = _power_spectrum(cubes) 

60 if model is not None: 

61 ps.attributes["model_name"] = model 

62 return iris.cube.CubeList([ps]) 

63 

64 

65def _power_spectrum( 

66 cube: iris.cube.Cube | iris.cube.CubeList, 

67) -> iris.cube.Cube | iris.cube.CubeList: 

68 """Calculate power spectrum for a single cube for 1 vertical level at 1 time. 

69 

70 Parameters 

71 ---------- 

72 cube: Cube 

73 Data to plot as power spectrum. 

74 The cubes should cover the same phenomenon i.e. all cubes contain temperature data. 

75 We do not support different data such as temperature and humidity in the same CubeList 

76 for plotting. 

77 

78 Returns 

79 ------- 

80 iris.cube.Cube 

81 The power spectrum of the data. 

82 To be plotted and aggregation performed after. 

83 

84 Raises 

85 ------ 

86 ValueError 

87 If the cube doesn't have the right dimensions. 

88 TypeError 

89 If the cube isn't a Cube. 

90 """ 

91 # Extract time coordinate and convert to datetime 

92 time_coord = cube.coord("time") 

93 time_points = time_coord.units.num2date(time_coord.points) 

94 

95 if cube.ndim == 2: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 cube_3d = cube.data[np.newaxis, :, :] 

97 logging.debug("Adding in new axis for a 2 dimensional cube.") 

98 elif cube.ndim == 3: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 cube_3d = cube.data 

100 else: 

101 raise ValueError("Cube dimensions unsuitable for power spectra code") 

102 raise ValueError( 

103 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional." 

104 ) 

105 

106 # Calculate spectrum 

107 ps_array = _DCT_ps(cube_3d) 

108 

109 # Ensure power spectrum output is 2D: (time, frequency) 

110 if ps_array.ndim == 1: 

111 ps_array = ps_array[np.newaxis, :] 

112 

113 ps_cube = iris.cube.Cube( 

114 ps_array, 

115 long_name="power_spectra", 

116 ) 

117 

118 # Create a frequency/wavelength array for new coordinate 

119 ps_len = ps_cube.data.shape[1] 

120 freqs = np.arange(1, ps_len + 1) 

121 

122 # Create a new DimCoord with frequency 

123 freq_coord = iris.coords.DimCoord(freqs, long_name="frequency", units="1") 

124 

125 numeric_time = time_coord.units.date2num(time_points) 

126 numeric_time = np.atleast_1d(numeric_time) 

127 

128 # Make time coord length match the number of spectra 

129 if len(numeric_time) != ps_array.shape[0]: 

130 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0]) 

131 

132 new_time_coord = iris.coords.DimCoord( 

133 numeric_time, 

134 standard_name="time", 

135 units=time_coord.units, 

136 ) 

137 

138 ps_cube = iris.cube.Cube( 

139 ps_array, 

140 dim_coords_and_dims=[ 

141 (new_time_coord, 0), 

142 (freq_coord, 1), 

143 ], 

144 long_name="power_spectra", 

145 ) 

146 

147 # Ensure cube has a realisation coordinate by creating and adding to cube 

148 realization_coord = iris.coords.AuxCoord(0, standard_name="realization", units="1") 

149 ps_cube.add_aux_coord(realization_coord) 

150 

151 return ps_cube 

152 

153 

154def _DCT_ps(y_3d): 

155 """Calculate power spectra for regional domains. 

156 

157 Parameters 

158 ---------- 

159 y_3d: 3D array 

160 3 dimensional array to calculate spectrum for. 

161 (2D field data with 3rd dimension of time) 

162 

163 Returns: ps_array 

164 Array of power spectra values calculated for input field (for each time) 

165 

166 Method for regional domains: 

167 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT) 

168 as described in Denis et al 2002 [Denis_etal_2002]_. 

169 

170 References 

171 ---------- 

172 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002) 

173 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on 

174 Limited-Area Domains Using the Discrete Cosine Transform (DCT)" 

175 Monthly Weather Review, Vol. 130, 1812-1828 

176 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2 

177 """ 

178 Nt, Ny, Nx = y_3d.shape 

179 

180 # Max coefficient 

181 Nmin = min(Nx - 1, Ny - 1) 

182 

183 # Create alpha matrix (of wavenumbers) 

184 alpha_matrix = _create_alpha_matrix(Ny, Nx) 

185 

186 # Prepare output array 

187 ps_array = np.zeros((Nt, Nmin)) 

188 

189 # Loop over time to get spectrum for each time. 

190 for t in range(Nt): 

191 y_2d = y_3d[t] 

192 

193 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space. 

194 # fkk is a 2D array of DCT coefficients, representing the amplitudes of 

195 # cosine basis functions at different spatial frequencies. 

196 

197 # normalise spectrum to allow comparison between models. 

198 fkk = fft.dctn(y_2d, norm="ortho") 

199 

200 # Normalise fkk 

201 fkk = fkk / np.sqrt(Ny * Nx) 

202 

203 # calculate variance of spectral coefficient 

204 sigma_2 = fkk**2 / Nx / Ny 

205 

206 # Group ellipses of alphas into the same wavenumber k/Nmin 

207 for k in range(1, Nmin + 1): 

208 alpha = k / Nmin 

209 alpha_p1 = (k + 1) / Nmin 

210 

211 # Sum up elements matching k 

212 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1)) 

213 ps_array[t, k - 1] = np.sum(sigma_2[mask_k]) 

214 

215 return ps_array 

216 

217 

218def _create_alpha_matrix(Ny, Nx): 

219 """Construct an array of 2D wavenumbers from 2D wavenumber pair. 

220 

221 Parameters 

222 ---------- 

223 Ny, Nx: 

224 Dimensions of the 2D field for which the power spectra is calculated. Used to 

225 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a 

226 single-scale parameter. 

227 

228 Returns: alpha_matrix 

229 normalisation of 2D wavenumber axes, transforming the spectral domain into 

230 an elliptic coordinate system. 

231 

232 """ 

233 # Create x_indices: each row is [1, 2, ..., Nx] 

234 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1)) 

235 

236 # Create y_indices: each column is [1, 2, ..., Ny] 

237 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx)) 

238 

239 # Compute alpha_matrix 

240 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2) 

241 

242 return alpha_matrix