Coverage for src / CSET / operators / aggregate.py: 98%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:48 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to aggregate across either 1 or 2 dimensions.""" 

16 

17import logging 

18 

19import iris 

20import iris.analysis 

21import iris.coord_categorisation 

22import iris.cube 

23import iris.exceptions 

24import iris.util 

25import isodate 

26import numpy as np 

27 

28from CSET._common import iter_maybe 

29from CSET.operators._utils import is_time_aggregatable 

30 

31 

32def _add_nref(cube: iris.cube.Cube): 

33 """Retain information on number of forecast_reference_time inputs. 

34 

35 This preserves information on number of aggregated cases that can 

36 otherwise be lost on subsequent calls to collapse functions. 

37 """ 

38 nref = np.size(cube.coord("forecast_reference_time").points) 

39 cube.coord("time").attributes["number_reference_times"] = nref 

40 return cube 

41 

42 

43def time_aggregate( 

44 cube: iris.cube.Cube, 

45 method: str, 

46 interval_iso: str, 

47 **kwargs, 

48) -> iris.cube.Cube: 

49 """Aggregate cube by its time coordinate. 

50 

51 Aggregates similar (stash) fields in a cube for the specified coordinate and 

52 using the method supplied. The aggregated cube will keep the coordinate and 

53 add a further coordinate with the aggregated end time points. 

54 

55 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations 

56 given an interval for the new time coordinate. 

57 

58 We use the isodate class to convert ISO 8601 durations into time intervals 

59 for creating a new time coordinate for aggregation. 

60 

61 We use the lambda function to pass coord and interval into the callable 

62 category function in add_categorised to allow users to define their own 

63 sub-daily intervals for the new time coordinate. 

64 

65 Arguments 

66 --------- 

67 cube: iris.cube.Cube 

68 Cube to aggregate and iterate over one dimension 

69 coordinate: str 

70 Coordinate to aggregate over i.e. 'time', 'longitude', 

71 'latitude','model_level_number'. 

72 method: str 

73 Type of aggregate i.e. method: 'SUM', getattr creates 

74 iris.analysis.SUM, etc. 

75 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins) 

76 Interval to aggregate over. 

77 

78 Returns 

79 ------- 

80 cube: iris.cube.Cube 

81 Single variable but several methods of aggregation 

82 

83 Raises 

84 ------ 

85 ValueError 

86 If the constraint doesn't produce a single cube containing a field. 

87 """ 

88 # Duration of ISO timedelta. 

89 timedelta = isodate.parse_duration(interval_iso) 

90 

91 # Convert interval format to whole hours. 

92 interval = int(timedelta.total_seconds() / 3600) 

93 

94 # Add time categorisation overwriting hourly increment via lambda coord. 

95 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html 

96 iris.coord_categorisation.add_categorised_coord( 

97 cube, "interval", "time", lambda coord, cell: cell // interval * interval 

98 ) 

99 

100 # Aggregate cube using supplied method. 

101 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method)) 

102 aggregated_cube.remove_coord("interval") 

103 return aggregated_cube 

104 

105 

106def ensure_aggregatable_across_cases( 

107 cubes: iris.cube.Cube | iris.cube.CubeList, 

108) -> iris.cube.CubeList: 

109 """Ensure a Cube or CubeList can be aggregated across multiple cases. 

110 

111 The cubes are grouped into buckets of compatible cubes, then each bucket is 

112 converted into a single aggregatable cube with ``forecast_period`` and 

113 ``forecast_reference_time`` dimension coordinates. 

114 

115 Arguments 

116 --------- 

117 cubes: iris.cube.Cube | iris.cube.CubeList 

118 Each cube is checked to determine if it has the the necessary 

119 dimensional coordinates to be aggregatable, being processed if needed. 

120 

121 Returns 

122 ------- 

123 cubes: iris.cube.CubeList 

124 A CubeList of time aggregatable cubes. 

125 

126 Raises 

127 ------ 

128 ValueError 

129 If any of the provided cubes cannot be made aggregatable. 

130 

131 Notes 

132 ----- 

133 This is a simple operator designed to ensure that a Cube is aggregatable 

134 across cases. If a CubeList is presented it will create an aggregatable Cube 

135 from that list. Its functionality is for case study (or trial) aggregation 

136 to ensure that the full dataset can be loaded as a single cube. This 

137 functionality is particularly useful for percentiles, Q-Q plots, and 

138 histograms. 

139 

140 The necessary dimension coordinates for a cube to be aggregatable are 

141 ``forecast_period`` and ``forecast_reference_time``. 

142 """ 

143 

144 # Group compatible cubes. 

145 class Buckets: 

146 def __init__(self): 

147 self.buckets = [] 

148 

149 def add(self, cube: iris.cube.Cube): 

150 """Add a cube into a bucket. 

151 

152 If the cube is compatible with an existing bucket it is added there. 

153 Otherwise it gets its own bucket. 

154 """ 

155 for bucket in self.buckets: 

156 if bucket[0].is_compatible(cube): 

157 bucket.append(cube) 

158 return 

159 self.buckets.append(iris.cube.CubeList([cube])) 

160 

161 def get_buckets(self) -> list[iris.cube.CubeList]: 

162 return self.buckets 

163 

164 b = Buckets() 

165 for cube in iter_maybe(cubes): 

166 b.add(cube) 

167 buckets = b.get_buckets() 

168 

169 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets)) 

170 

171 # Ensure each bucket is a single aggregatable cube. 

172 aggregatable_cubes = iris.cube.CubeList() 

173 for bucket in buckets: 

174 # Single cubes that are already aggregatable won't need processing. 

175 if len(bucket) == 1 and is_time_aggregatable(bucket[0]): 

176 aggregatable_cube = bucket[0] 

177 aggregatable_cube = _add_nref(aggregatable_cube) 

178 aggregatable_cubes.append(aggregatable_cube) 

179 continue 

180 

181 # Create an aggregatable cube from the provided CubeList. 

182 to_merge = iris.cube.CubeList() 

183 for cube in bucket: 

184 try: 

185 to_merge.extend( 

186 cube.slices_over(["forecast_period", "forecast_reference_time"]) 

187 ) 

188 except iris.exceptions.CoordinateNotFoundError as err: 

189 raise ValueError( 

190 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.", 

191 cube, 

192 ) from err 

193 aggregatable_cube = to_merge.merge_cube() 

194 

195 # Add attribute on number of forecast_reference_times 

196 aggregatable_cube = _add_nref(aggregatable_cube) 

197 

198 # Verify cube is now aggregatable. 

199 if not is_time_aggregatable(aggregatable_cube): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise ValueError( 

201 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.", 

202 aggregatable_cube, 

203 ) 

204 aggregatable_cubes.append(aggregatable_cube) 

205 

206 return aggregatable_cubes 

207 

208 

209def add_hour_coordinate( 

210 cubes: iris.cube.Cube | iris.cube.CubeList, 

211) -> iris.cube.Cube | iris.cube.CubeList: 

212 """Add a category coordinate of hour of day to a Cube or CubeList. 

213 

214 Arguments 

215 --------- 

216 cubes: iris.cube.Cube | iris.cube.CubeList 

217 Cube of any variable that has a time coordinate. 

218 Note input Cube or CubeList items should only have 1 time dimension. 

219 

220 Returns 

221 ------- 

222 cube: iris.cube.Cube 

223 A Cube with an additional auxiliary coordinate of hour. 

224 

225 Notes 

226 ----- 

227 This is a simple operator designed to be used prior to case aggregation for 

228 histograms, Q-Q plots, and percentiles when aggregated by hour of day. 

229 """ 

230 new_cubelist = iris.cube.CubeList() 

231 for cube in iter_maybe(cubes): 

232 # Add a category coordinate of hour into each cube. 

233 iris.util.promote_aux_coord_to_dim_coord(cube, "time") 

234 iris.coord_categorisation.add_hour(cube, "time", name="hour") 

235 cube.coord("hour").units = "hours" 

236 new_cubelist.append(cube) 

237 

238 if len(new_cubelist) == 1: 

239 return new_cubelist[0] 

240 else: 

241 return new_cubelist 

242 

243 

244def rolling_window_time_aggregation( 

245 cubes: iris.cube.Cube | iris.cube.CubeList, method: str, window: int 

246) -> iris.cube.Cube | iris.cube.CubeList: 

247 """Aggregate a cube along the time dimension using a rolling window. 

248 

249 Arguments 

250 --------- 

251 cubes: iris.cube.Cube | iris.cube.CubeList 

252 Cube or Cubelist of any variable to be aggregated over a rolling window 

253 in time. 

254 method: str 

255 Type of aggregate i.e. method: 'MAX', getattr creates 

256 iris.analysis.MAX, etc. 

257 window: int 

258 The rolling window size. 

259 

260 Returns 

261 ------- 

262 cube: iris.cube.Cube | iris.cube.CubeList 

263 A Cube or Cubelist of the rolling window aggregate. The Cubes will have 

264 a time dimension that is reduced in size to the original cube by the 

265 window size. 

266 

267 Notes 

268 ----- 

269 This operator is designed to be used to help create daily maxima and minima 

270 for any variable. 

271 """ 

272 new_cubelist = iris.cube.CubeList() 

273 for cube in iter_maybe(cubes): 

274 # Use a rolling window in time to applied specified aggregation method 

275 # over a specified window length. 

276 window_cube = cube.rolling_window( 

277 "time", getattr(iris.analysis, method), window 

278 ) 

279 new_cubelist.append(window_cube) 

280 

281 if len(new_cubelist) == 1: 

282 return new_cubelist[0] 

283 else: 

284 return new_cubelist