Coverage for src / CSET / operators / power_spectrum.py: 84%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 14:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 14:49 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for calculating power spectra."""
17import logging
19import iris
20import iris.coords
21import iris.cube
22import numpy as np
23import scipy.fft as fft
24from iris.util import new_axis
27def calculate_power_spectrum(cubes):
28 """Wrap power spectrum code.
30 This function is a wrapper that handles power spectrum
31 calculations for both single cubes and cube lists and includes ensembles.
33 The input cube is split up into a cube for each model,
34 time and realization and a power spectrum calculated for each before
35 combining into one cube ahead of plotting. This is done to retain the
36 model_name attribute correctly for different models.
38 In case of a CubeList (Multiple models and ensembles): It iterates through
39 each cube and calculates an individual power spectrum. In case of a
40 single cube (one model) it directly calculates the power spectrum.
42 Input: Cube OR CubeList
43 Output: CubeList of power spectra.
44 """
45 # ------------------------------------------------------------
46 # Cube list
47 # ------------------------------------------------------------
49 if isinstance(cubes, iris.cube.CubeList):
50 out = iris.cube.CubeList()
52 for cube in cubes:
53 model = cube.attributes.get("model_name")
55 # Ensembles:
56 # Check whether data has more than 1 realization coord
58 real_coord = cube.coords("realization")
59 is_ensemble = real_coord and len(real_coord[0].points) > 1
61 if is_ensemble:
62 # Ensemble: Loop over each realization
63 for subcube in cube.slices_over("realization"):
64 realiz = int(subcube.coord("realization").points[0])
65 # calculate power spectrum of subcube
66 ps = _power_spectrum(subcube)
68 # Attach model name if available
69 if model:
70 ps.attributes["model_name"] = model
72 # ADD the correct realization from the parent cube
73 ps.add_aux_coord(
74 iris.coords.AuxCoord(realiz, long_name="realization", units="1")
75 )
77 # PROMOTE realization coordinate to dimension
78 ps = new_axis(ps, "realization")
80 out.append(ps)
82 else:
83 # Not ensemble (or only 1 ensemble member)
84 ps = _power_spectrum(cube)
85 if model: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 ps.attributes["model_name"] = model
88 # ADD the correct realization from the parent cube
89 # Promotion to dimension not required.
90 ps.add_aux_coord(
91 iris.coords.AuxCoord(0, long_name="realization", units="1")
92 )
94 out.append(ps)
96 if is_ensemble:
97 # Merge the individual realization cubes into a single cube
98 # for ensemble data
99 merged = out.concatenate_cube()
101 return merged
103 else:
104 return out
106 # ------------------------------------------------------------
107 # Single cube with realization coord
108 cube = cubes
109 out = iris.cube.CubeList()
110 model = cube.attributes.get("model_name")
112 # Ensembles:
113 # Check whether data has more than 1 realization coord
115 real_coord = cube.coords("realization")
116 is_ensemble = real_coord and len(real_coord[0].points) > 1
118 if is_ensemble: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was never true
119 # Ensemble: Loop over each realization
120 for subcube in cube.slices_over("realization"):
121 realiz = int(subcube.coord("realization").points[0])
122 ps = _power_spectrum(subcube)
124 # Attach model name if available
125 if model:
126 ps.attributes["model_name"] = model
128 # ADD the correct realization from the parent cube
129 ps.add_aux_coord(
130 iris.coords.AuxCoord(realiz, long_name="realization", units="1")
131 )
133 # PROMOTE to dimension
134 ps = new_axis(ps, "realization")
136 out.append(ps)
138 # Merge the individual realization cubes into a single cube
139 # for ensemble data
140 merged = out.concatenate_cube()
142 return merged
144 # ------------------------------------------------------------
145 # Single cube, no realizations
146 # ------------------------------------------------------------
147 ps = _power_spectrum(cube)
148 if model: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 ps.attributes["model_name"] = model
151 # ADD the correct realization from the parent cube
152 # Promotion to dimension not required
153 ps.add_aux_coord(iris.coords.AuxCoord(0, long_name="realization", units="1"))
155 return iris.cube.CubeList([ps])
158def _power_spectrum(
159 cube: iris.cube.Cube | iris.cube.CubeList,
160) -> iris.cube.Cube | iris.cube.CubeList:
161 """Calculate power spectrum for a single cube for 1 vertical level at 1 time.
163 Parameters
164 ----------
165 cube: Cube
166 Data to plot as power spectrum.
167 The cubes should cover the same phenomenon i.e. all cubes contain temperature data.
168 We do not support different data such as temperature and humidity in the same CubeList
169 for plotting.
171 Returns
172 -------
173 iris.cube.Cube
174 The power spectrum of the data.
175 To be plotted and aggregation performed after.
177 Raises
178 ------
179 ValueError
180 If the cube doesn't have the right dimensions.
181 TypeError
182 If the cube isn't a Cube.
183 """
184 # Extract time coordinate and convert to datetime
185 time_coord = cube.coord("time")
186 time_points = time_coord.units.num2date(time_coord.points)
188 if cube.ndim == 2:
189 cube_3d = cube.data[np.newaxis, :, :]
190 logging.debug("Adding in new axis for a 2 dimensional cube.")
191 elif cube.ndim == 3:
192 cube_3d = cube.data
193 else:
194 raise ValueError(
195 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional."
196 )
198 # Calculate spectrum
199 ps_array = _DCT_ps(cube_3d)
201 # Make wavenumber comparable between models with different domain sizes and resolutions.
202 # Try to find appropriate spatial coordinates
203 try:
204 # Try projection coordinates first (most common for limited area models)
205 x_coord = cube.coord("projection_x_coordinate")
206 y_coord = cube.coord("projection_y_coordinate")
208 # Grid spacing already in meters for projection coordinates
209 dx = np.abs(np.diff(x_coord.points).mean())
210 dy = np.abs(np.diff(y_coord.points).mean())
212 logging.debug("Using projection coordinates for grid spacing calculation.")
214 except iris.exceptions.CoordinateNotFoundError:
215 try:
216 # Try rotated pole coordinates
217 lat_coord = cube.coord("grid_latitude")
218 lon_coord = cube.coord("grid_longitude")
220 R_earth = 6371000 # meters
221 lat_mid = np.mean(lat_coord.points)
223 dx = (
224 np.abs(np.diff(lon_coord.points).mean())
225 * np.pi
226 / 180
227 * R_earth
228 * np.cos(lat_mid * np.pi / 180)
229 )
230 dy = np.abs(np.diff(lat_coord.points).mean()) * np.pi / 180 * R_earth
232 logging.debug(
233 "Using rotated pole coordinates for grid spacing calculation."
234 )
236 except iris.exceptions.CoordinateNotFoundError:
237 try:
238 # Fall back to regular lat/lon coordinates
239 lat_coord = cube.coord("latitude")
240 lon_coord = cube.coord("longitude")
242 R_earth = 6371000 # meters
243 lat_mid = np.mean(lat_coord.points)
245 dx = (
246 np.abs(np.diff(lon_coord.points).mean())
247 * np.pi
248 / 180
249 * R_earth
250 * np.cos(lat_mid * np.pi / 180)
251 )
252 dy = np.abs(np.diff(lat_coord.points).mean()) * np.pi / 180 * R_earth
254 logging.debug(
255 "Using latitude/longitude coordinates for grid spacing calculation."
256 )
258 except iris.exceptions.CoordinateNotFoundError:
259 raise ValueError(
260 "Could not find appropriate spatial coordinates. "
261 "Expected one of: 'projection_x_coordinate'/'projection_y_coordinate', "
262 "'grid_latitude'/'grid_longitude', or 'latitude'/'longitude'."
263 ) from None # intentionally replacing CoordinateNotFoundError with a more informative ValueError
265 domain_size_km = ((dx * cube_3d.shape[2]) + (dy * cube_3d.shape[1])) / 2 / 1000
267 # Convert wavenumber into physically meaningful wavenumber coordinate in
268 # cycles per km rather than wavenumber per index k.
269 ps_len = ps_array.shape[1]
270 k_indices = np.arange(1, ps_len + 1)
271 physical_wavenumbers = k_indices / domain_size_km # cycles/km
273 # Create a new DimCoord with physical wavenumber
274 physical_wavenumbers_coord = iris.coords.DimCoord(
275 physical_wavenumbers, long_name="physical_wavenumber", units="km-1"
276 )
278 # Calculate wavelength and add as auxiliary coordinate
279 wavelengths = domain_size_km / k_indices # km
280 wavelength_coord = iris.coords.AuxCoord(
281 wavelengths, long_name="wavelength", units="km"
282 )
284 # Ensure power spectrum output is 2D: (time, frequency)
285 if ps_array.ndim == 1: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 ps_array = ps_array[np.newaxis, :]
288 # Prepare time coordinate
289 numeric_time = time_coord.units.date2num(time_points)
290 numeric_time = np.atleast_1d(numeric_time)
292 # Make time coord length match the number of spectra
293 if len(numeric_time) != ps_array.shape[0]: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0])
296 new_time_coord = iris.coords.DimCoord(
297 numeric_time,
298 standard_name="time",
299 units=time_coord.units,
300 )
302 # Create output cube with physical coordinates
303 ps_cube = iris.cube.Cube(
304 ps_array,
305 dim_coords_and_dims=[
306 (new_time_coord, 0),
307 (physical_wavenumbers_coord, 1),
308 ],
309 long_name="power_spectral_density",
310 )
312 # Add wavelength as auxiliary coordinate
313 # Realization coordinate is added in _calculate_power_spectrum
314 ps_cube.add_aux_coord(wavelength_coord, data_dims=1)
316 return ps_cube
319def _DCT_ps(y_3d):
320 """Calculate power spectra for regional domains.
322 Parameters
323 ----------
324 y_3d: 3D array
325 3 dimensional array to calculate spectrum for.
326 (2D field data with 3rd dimension of time)
328 Returns: ps_array
329 Array of power spectra values calculated for input field (for each time)
331 Method for regional domains:
332 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT)
333 as described in Denis et al 2002 [Denis_etal_2002]_.
335 References
336 ----------
337 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002)
338 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on
339 Limited-Area Domains Using the Discrete Cosine Transform (DCT)"
340 Monthly Weather Review, Vol. 130, 1812-1828
341 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2
342 """
343 Nt, Ny, Nx = y_3d.shape
345 # Max coefficient
346 Nmin = min(Nx - 1, Ny - 1)
348 # Create alpha matrix (of wavenumbers)
349 alpha_matrix = _create_alpha_matrix(Ny, Nx)
351 # Prepare output array
352 ps_array = np.zeros((Nt, Nmin))
354 # Loop over time to get spectrum for each time.
355 for t in range(Nt):
356 y_2d = y_3d[t]
358 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space.
359 # fkk is a 2D array of DCT coefficients, representing the amplitudes of
360 # cosine basis functions at different spatial frequencies.
362 # DCT transform and normalise spectrum to allow comparison between models.
363 fkk = fft.dctn(y_2d, norm="ortho")
365 # calculate variance (energy) of spectral coefficient at each wavenumber pair (k_x, k_y)
366 # as the square of the DCT coefficient, normalised by the total number of grid points (Nx * Ny).
367 sigma_2 = fkk**2 / Nx / Ny
369 # Group ellipses of alphas into the same wavenumber k/Nmin
370 for k in range(1, Nmin + 1):
371 # Define the bounds of the current normalised wavenumber magnitude of bin k
372 alpha = k / Nmin
373 alpha_p1 = (k + 1) / Nmin
375 # Sum up elements matching in bin k and divide by bin size
376 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1))
377 n_coeffs = len(mask_k[0]) # number of coefficients in bin k
378 if n_coeffs > 0: 378 ↛ 383line 378 didn't jump to line 383 because the condition on line 378 was always true
379 ps_array[t, k - 1] = (
380 np.sum(sigma_2[mask_k]) / n_coeffs
381 ) # average power in bin k
382 else:
383 ps_array[t, k - 1] = 0.0
385 return ps_array
388def _create_alpha_matrix(Ny, Nx):
389 """Construct an array of 2D wavenumbers from 2D wavenumber pair.
391 Parameters
392 ----------
393 Ny, Nx:
394 Dimensions of the 2D field for which the power spectra is calculated. Used to
395 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a
396 single-scale parameter.
398 Returns: alpha_matrix
399 normalisation of 2D wavenumber axes, transforming the spectral domain into
400 an elliptic coordinate system.
402 """
403 # Create x_indices: each row is [1, 2, ..., Nx]
404 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1))
406 # Create y_indices: each column is [1, 2, ..., Ny]
407 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx))
409 # Compute alpha_matrix
410 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2)
412 return alpha_matrix