Coverage for src/CSET/operators/power_spectrum.py: 95%
97 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 11:14 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-18 11:14 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for calculating power spectra."""
17import logging
19import iris
20import iris.coords
21import iris.cube
22import iris.exceptions
23import numpy as np
24import scipy.fft as fft
26from CSET._common import iter_maybe
29def calculate_power_spectrum(cubes: iris.cube.Cube | iris.cube.CubeList):
30 """Wrap power spectrum code.
32 This function is a wrapper that handles power spectrum
33 calculations for both single cubes and cube lists and includes ensembles.
35 The input cube is split up into a cube for each model,
36 time and realization and a power spectrum calculated for each before
37 combining into one cube ahead of plotting. This is done to retain the
38 model_name attribute correctly for different models.
40 In case of a CubeList (Multiple models and ensembles): It iterates through
41 each cube and calculates an individual power spectrum. In case of a
42 single cube (one model) it directly calculates the power spectrum.
44 Parameters
45 ----------
46 cubes: Cube | CubeList
47 Field over which to calculate a power spectrum.
49 Returns
50 -------
51 Cube | CubeList:
52 CubeList of power spectra.
53 """
54 out = iris.cube.CubeList()
55 for cube in iter_maybe(cubes):
56 model = cube.attributes.get("model_name")
58 # Check whether data has a realization coord.
59 if cube.coords("realization"):
60 members_and_realizations = [
61 (member, int(member.coord("realization").points[0]))
62 for member in cube.slices_over("realization")
63 ]
64 else:
65 members_and_realizations = [(cube, None)]
67 # Loop over each realization.
68 member_power_spectra = iris.cube.CubeList()
69 for member, realiz in members_and_realizations:
70 # Calculate power spectrum.
71 ps = _power_spectrum(member)
72 # Attach model name if available.
73 if model:
74 ps.attributes["model_name"] = model
75 # Add the correct realization from the parent cube.
76 if realiz is not None:
77 ps.add_aux_coord(
78 iris.coords.AuxCoord(realiz, long_name="realization", units="1")
79 )
80 # Promote to dimension coordinate.
81 ps = iris.util.new_axis(ps, "realization")
82 member_power_spectra.append(ps)
84 # Merge the individual realization cubes into a single cube, then
85 # squeeze off length 1 realization coordinates.
86 combined_cube = member_power_spectra.concatenate_cube()
87 combined_cube = iris.util.squeeze(combined_cube)
88 out.append(combined_cube)
90 # Directly return cube if we only have one.
91 if len(out) == 1:
92 return out[0]
93 else:
94 return out
97def _power_spectrum(cube: iris.cube.Cube) -> iris.cube.Cube:
98 """Calculate power spectrum for a single cube for 1 vertical level at 1 time.
100 Parameters
101 ----------
102 cube: Cube
103 Data to plot as power spectrum.
104 The cubes should cover the same phenomenon i.e. all cubes contain temperature data.
105 We do not support different data such as temperature and humidity in the same CubeList
106 for plotting.
108 Returns
109 -------
110 iris.cube.Cube
111 The power spectrum of the data.
112 To be plotted and aggregation performed after.
114 Raises
115 ------
116 ValueError
117 If the cube doesn't have the right dimensions.
118 TypeError
119 If the cube isn't a Cube.
120 """
121 # Extract time coordinate and convert to datetime
122 time_coord = cube.coord("time")
123 time_points = time_coord.units.num2date(time_coord.points)
125 if cube.ndim == 2:
126 cube_3d = cube.data[np.newaxis, :, :]
127 logging.debug("Adding in new axis for a 2 dimensional cube.")
128 elif cube.ndim == 3:
129 cube_3d = cube.data
130 else:
131 raise ValueError(
132 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional."
133 )
135 # Calculate spectrum
136 ps_array = _DCT_ps(cube_3d)
138 # Make wavenumber comparable between models with different domain sizes and
139 # resolutions. Try to find appropriate spatial coordinates.
140 coord_pairs = (
141 ("projection_x_coordinate", "projection_y_coordinate"),
142 ("grid_latitude", "grid_longitude"),
143 ("latitude", "longitude"),
144 )
145 for x_coord_name, y_coord_name in coord_pairs:
146 try:
147 # Try projection coordinates first (most common for limited area models)
148 x_coord = cube.coord(x_coord_name)
149 y_coord = cube.coord(y_coord_name)
150 except iris.exceptions.CoordinateNotFoundError:
151 continue
152 logging.debug(
153 "Using %s and %s coordinates for grid spacing calculation.",
154 x_coord_name,
155 y_coord_name,
156 )
157 break # Break out of loop if we found usable coords.
158 else:
159 # Raise error if no usable coords found.
160 raise ValueError(
161 "Could not find appropriate spatial coordinates. "
162 "Expected one of: 'projection_x_coordinate'/'projection_y_coordinate', "
163 "'grid_latitude'/'grid_longitude', or 'latitude'/'longitude'."
164 )
166 # Calculate grid spacing.
167 dx = np.abs(np.diff(x_coord.points).mean())
168 dy = np.abs(np.diff(y_coord.points).mean())
169 if "latitude" in x_coord.name():
170 # Convert from degrees to meters. x is lat, y is lon.
171 R_earth = 6371000 # meters
172 lat_mid = np.mean(x_coord.points)
173 dx = dx * np.pi / 180 * R_earth * np.cos(lat_mid * np.pi / 180)
174 dy = dy * np.pi / 180 * R_earth
175 domain_size_km = ((dx * cube_3d.shape[2]) + (dy * cube_3d.shape[1])) / 2 / 1000
177 # Convert wavenumber into physically meaningful wavenumber coordinate in
178 # cycles per km rather than wavenumber per index k.
179 ps_len = ps_array.shape[1]
180 k_indices = np.arange(1, ps_len + 1)
181 physical_wavenumbers = k_indices / domain_size_km # cycles/km
183 # Create a new DimCoord with physical wavenumber
184 physical_wavenumbers_coord = iris.coords.DimCoord(
185 physical_wavenumbers, long_name="physical_wavenumber", units="km-1"
186 )
188 # Calculate wavelength and add as auxiliary coordinate
189 wavelengths = domain_size_km / k_indices # km
190 wavelength_coord = iris.coords.AuxCoord(
191 wavelengths, long_name="wavelength", units="km"
192 )
194 # Ensure power spectrum output is 2D: (time, frequency)
195 if ps_array.ndim == 1: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 ps_array = ps_array[np.newaxis, :]
198 # Prepare time coordinate
199 numeric_time = time_coord.units.date2num(time_points)
200 numeric_time = np.atleast_1d(numeric_time)
202 # Make time coord length match the number of spectra
203 if len(numeric_time) != ps_array.shape[0]: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0])
206 new_time_coord = iris.coords.DimCoord(
207 numeric_time,
208 standard_name="time",
209 units=time_coord.units,
210 )
212 # Create output cube with physical coordinates
213 ps_cube = iris.cube.Cube(
214 ps_array,
215 dim_coords_and_dims=[
216 (new_time_coord, 0),
217 (physical_wavenumbers_coord, 1),
218 ],
219 long_name="power_spectral_density",
220 )
222 # Add wavelength as auxiliary coordinate
223 # Realization coordinate is added in _calculate_power_spectrum
224 ps_cube.add_aux_coord(wavelength_coord, data_dims=1)
226 return ps_cube
229def _DCT_ps(y_3d):
230 """Calculate power spectra for regional domains.
232 Parameters
233 ----------
234 y_3d: 3D array
235 3 dimensional array to calculate spectrum for.
236 (2D field data with 3rd dimension of time)
238 Returns
239 -------
240 ps_array:
241 Array of power spectra values calculated for input field (for each time)
243 Method for regional domains:
244 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT)
245 as described in Denis et al 2002 [Denis_etal_2002]_.
247 References
248 ----------
249 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002)
250 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on
251 Limited-Area Domains Using the Discrete Cosine Transform (DCT)"
252 Monthly Weather Review, Vol. 130, 1812-1828
253 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2
254 """
255 Nt, Ny, Nx = y_3d.shape
257 # Max coefficient
258 Nmin = min(Nx - 1, Ny - 1)
260 # Create alpha matrix (of wavenumbers)
261 alpha_matrix = _create_alpha_matrix(Ny, Nx)
263 # Prepare output array
264 ps_array = np.zeros((Nt, Nmin))
266 # Loop over time to get spectrum for each time.
267 for t in range(Nt):
268 y_2d = y_3d[t]
270 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space.
271 # fkk is a 2D array of DCT coefficients, representing the amplitudes of
272 # cosine basis functions at different spatial frequencies.
274 # DCT transform and normalise spectrum to allow comparison between models.
275 fkk = fft.dctn(y_2d, norm="ortho")
277 # calculate variance (energy) of spectral coefficient at each wavenumber pair (k_x, k_y)
278 # as the square of the DCT coefficient, normalised by the total number of grid points (Nx * Ny).
279 sigma_2 = fkk**2 / Nx / Ny
281 # Group ellipses of alphas into the same wavenumber k/Nmin
282 for k in range(1, Nmin + 1):
283 # Define the bounds of the current normalised wavenumber magnitude of bin k
284 alpha = k / Nmin
285 alpha_p1 = (k + 1) / Nmin
287 # Sum up elements matching in bin k and divide by bin size
288 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1))
289 n_coeffs = len(mask_k[0]) # number of coefficients in bin k
290 if n_coeffs > 0: 290 ↛ 295line 290 didn't jump to line 295 because the condition on line 290 was always true
291 ps_array[t, k - 1] = (
292 np.sum(sigma_2[mask_k]) / n_coeffs
293 ) # average power in bin k
294 else:
295 ps_array[t, k - 1] = 0.0
297 return ps_array
300def _create_alpha_matrix(Ny, Nx):
301 """Construct an array of 2D wavenumbers from 2D wavenumber pair.
303 Parameters
304 ----------
305 Ny, Nx:
306 Dimensions of the 2D field for which the power spectra is calculated. Used to
307 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a
308 single-scale parameter.
310 Returns
311 -------
312 alpha_matrix:
313 normalisation of 2D wavenumber axes, transforming the spectral domain into
314 an elliptic coordinate system.
316 """
317 # Create x_indices: each row is [1, 2, ..., Nx]
318 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1))
320 # Create y_indices: each column is [1, 2, ..., Ny]
321 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx))
323 # Compute alpha_matrix
324 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2)
326 return alpha_matrix