Coverage for src / CSET / operators / power_spectrum.py: 51%
67 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-02 17:30 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-02 17:30 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Operators for calculating power spectra."""
17import logging
19import iris
20import iris.coords
21import iris.cube
22import numpy as np
23import scipy.fft as fft
26def calculate_power_spectrum(cubes):
27 """Wrap power spectrum code.
29 This function is a wrapper that handles power spectrum
30 calculations for both single cubes and cube lists.
32 The input cube is split up into a cube
33 for each model and time and a power spectrum calculated for each before
34 combining into one cube ahead of plotting. This is done to retain the
35 model_name attribute correctly for different models.
37 In case of a CubeList (Mutiple models): It iterates through
38 each cube and calculates an individual power spectrum. In case of a
39 single cube (one model) it directly calculates the power spectrum.
41 Input: Cube OR CubeList
42 Output: CubeList of power spectra.
43 """
44 # Multi-model input (CubeList)
45 if isinstance(cubes, iris.cube.CubeList): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 out = iris.cube.CubeList()
47 for cube in cubes:
48 model = cube.attributes.get("model_name")
49 # Calculate power spectrum
50 ps = _power_spectrum(cube)
51 if model is not None:
52 ps.attributes["model_name"] = model
53 out.append(ps)
54 return out
56 # Single cube (one model)
57 model = cubes.attributes.get("model_name")
58 # Calculate power spectrum
59 ps = _power_spectrum(cubes)
60 if model is not None:
61 ps.attributes["model_name"] = model
62 return iris.cube.CubeList([ps])
65def _power_spectrum(
66 cube: iris.cube.Cube | iris.cube.CubeList,
67) -> iris.cube.Cube | iris.cube.CubeList:
68 """Calculate power spectrum for a single cube for 1 vertical level at 1 time.
70 Parameters
71 ----------
72 cube: Cube
73 Data to plot as power spectrum.
74 The cubes should cover the same phenomenon i.e. all cubes contain temperature data.
75 We do not support different data such as temperature and humidity in the same CubeList
76 for plotting.
78 Returns
79 -------
80 iris.cube.Cube
81 The power spectrum of the data.
82 To be plotted and aggregation performed after.
84 Raises
85 ------
86 ValueError
87 If the cube doesn't have the right dimensions.
88 TypeError
89 If the cube isn't a Cube.
90 """
91 # Extract time coordinate and convert to datetime
92 time_coord = cube.coord("time")
93 time_points = time_coord.units.num2date(time_coord.points)
95 if cube.ndim == 2: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 cube_3d = cube.data[np.newaxis, :, :]
97 logging.debug("Adding in new axis for a 2 dimensional cube.")
98 elif cube.ndim == 3: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 cube_3d = cube.data
100 else:
101 raise ValueError("Cube dimensions unsuitable for power spectra code")
102 raise ValueError(
103 f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional."
104 )
106 # Calculate spectrum
107 ps_array = _DCT_ps(cube_3d)
109 # Ensure power spectrum output is 2D: (time, frequency)
110 if ps_array.ndim == 1:
111 ps_array = ps_array[np.newaxis, :]
113 ps_cube = iris.cube.Cube(
114 ps_array,
115 long_name="power_spectra",
116 )
118 # Create a frequency/wavelength array for new coordinate
119 ps_len = ps_cube.data.shape[1]
120 freqs = np.arange(1, ps_len + 1)
122 # Create a new DimCoord with frequency
123 freq_coord = iris.coords.DimCoord(freqs, long_name="frequency", units="1")
125 numeric_time = time_coord.units.date2num(time_points)
126 numeric_time = np.atleast_1d(numeric_time)
128 # Make time coord length match the number of spectra
129 if len(numeric_time) != ps_array.shape[0]:
130 numeric_time = np.repeat(numeric_time[0], ps_array.shape[0])
132 new_time_coord = iris.coords.DimCoord(
133 numeric_time,
134 standard_name="time",
135 units=time_coord.units,
136 )
138 ps_cube = iris.cube.Cube(
139 ps_array,
140 dim_coords_and_dims=[
141 (new_time_coord, 0),
142 (freq_coord, 1),
143 ],
144 long_name="power_spectra",
145 )
147 # Ensure cube has a realisation coordinate by creating and adding to cube
148 realization_coord = iris.coords.AuxCoord(0, standard_name="realization", units="1")
149 ps_cube.add_aux_coord(realization_coord)
151 return ps_cube
154def _DCT_ps(y_3d):
155 """Calculate power spectra for regional domains.
157 Parameters
158 ----------
159 y_3d: 3D array
160 3 dimensional array to calculate spectrum for.
161 (2D field data with 3rd dimension of time)
163 Returns: ps_array
164 Array of power spectra values calculated for input field (for each time)
166 Method for regional domains:
167 Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT)
168 as described in Denis et al 2002 [Denis_etal_2002]_.
170 References
171 ----------
172 .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002)
173 "Spectral Decomposition of Two-Dimensional Atmospheric Fields on
174 Limited-Area Domains Using the Discrete Cosine Transform (DCT)"
175 Monthly Weather Review, Vol. 130, 1812-1828
176 doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2
177 """
178 Nt, Ny, Nx = y_3d.shape
180 # Max coefficient
181 Nmin = min(Nx - 1, Ny - 1)
183 # Create alpha matrix (of wavenumbers)
184 alpha_matrix = _create_alpha_matrix(Ny, Nx)
186 # Prepare output array
187 ps_array = np.zeros((Nt, Nmin))
189 # Loop over time to get spectrum for each time.
190 for t in range(Nt):
191 y_2d = y_3d[t]
193 # Apply 2D DCT to transform y_3d[t] from physical space to spectral space.
194 # fkk is a 2D array of DCT coefficients, representing the amplitudes of
195 # cosine basis functions at different spatial frequencies.
197 # normalise spectrum to allow comparison between models.
198 fkk = fft.dctn(y_2d, norm="ortho")
200 # Normalise fkk
201 fkk = fkk / np.sqrt(Ny * Nx)
203 # calculate variance of spectral coefficient
204 sigma_2 = fkk**2 / Nx / Ny
206 # Group ellipses of alphas into the same wavenumber k/Nmin
207 for k in range(1, Nmin + 1):
208 alpha = k / Nmin
209 alpha_p1 = (k + 1) / Nmin
211 # Sum up elements matching k
212 mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1))
213 ps_array[t, k - 1] = np.sum(sigma_2[mask_k])
215 return ps_array
218def _create_alpha_matrix(Ny, Nx):
219 """Construct an array of 2D wavenumbers from 2D wavenumber pair.
221 Parameters
222 ----------
223 Ny, Nx:
224 Dimensions of the 2D field for which the power spectra is calculated. Used to
225 create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a
226 single-scale parameter.
228 Returns: alpha_matrix
229 normalisation of 2D wavenumber axes, transforming the spectral domain into
230 an elliptic coordinate system.
232 """
233 # Create x_indices: each row is [1, 2, ..., Nx]
234 x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1))
236 # Create y_indices: each column is [1, 2, ..., Ny]
237 y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx))
239 # Compute alpha_matrix
240 alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2)
242 return alpha_matrix