Coverage for src/CSET/operators/aggregate.py: 100%

56 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 21:08 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Operators to aggregate across either 1 or 2 dimensions.""" 

16 

17import logging 

18 

19import iris 

20import iris.analysis 

21import iris.coord_categorisation 

22import iris.cube 

23import isodate 

24 

25from CSET._common import iter_maybe 

26from CSET.operators._utils import is_time_aggregatable 

27 

28 

29def time_aggregate( 

30 cube: iris.cube.Cube, 

31 method: str, 

32 interval_iso: str, 

33 **kwargs, 

34) -> iris.cube.Cube: 

35 """Aggregate cube by its time coordinate. 

36 

37 Aggregates similar (stash) fields in a cube for the specified coordinate and 

38 using the method supplied. The aggregated cube will keep the coordinate and 

39 add a further coordinate with the aggregated end time points. 

40 

41 Examples are: 1. Generating hourly or 6-hourly precipitation accumulations 

42 given an interval for the new time coordinate. 

43 

44 We use the isodate class to convert ISO 8601 durations into time intervals 

45 for creating a new time coordinate for aggregation. 

46 

47 We use the lambda function to pass coord and interval into the callable 

48 category function in add_categorised to allow users to define their own 

49 sub-daily intervals for the new time coordinate. 

50 

51 Arguments 

52 --------- 

53 cube: iris.cube.Cube 

54 Cube to aggregate and iterate over one dimension 

55 coordinate: str 

56 Coordinate to aggregate over i.e. 'time', 'longitude', 

57 'latitude','model_level_number'. 

58 method: str 

59 Type of aggregate i.e. method: 'SUM', getattr creates 

60 iris.analysis.SUM, etc. 

61 interval_iso: isodate timedelta ISO 8601 object i.e PT6H (6 hours), PT30M (30 mins) 

62 Interval to aggregate over. 

63 

64 Returns 

65 ------- 

66 cube: iris.cube.Cube 

67 Single variable but several methods of aggregation 

68 

69 Raises 

70 ------ 

71 ValueError 

72 If the constraint doesn't produce a single cube containing a field. 

73 """ 

74 # Duration of ISO timedelta. 

75 timedelta = isodate.parse_duration(interval_iso) 

76 

77 # Convert interval format to whole hours. 

78 interval = int(timedelta.total_seconds() / 3600) 

79 

80 # Add time categorisation overwriting hourly increment via lambda coord. 

81 # https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html 

82 iris.coord_categorisation.add_categorised_coord( 

83 cube, "interval", "time", lambda coord, cell: cell // interval * interval 

84 ) 

85 

86 # Aggregate cube using supplied method. 

87 aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method)) 

88 aggregated_cube.remove_coord("interval") 

89 return aggregated_cube 

90 

91 

92def ensure_aggregatable_across_cases( 

93 cubes: iris.cube.Cube | iris.cube.CubeList, 

94) -> iris.cube.CubeList: 

95 """Ensure a Cube or CubeList can be aggregated across multiple cases. 

96 

97 The cubes are grouped into buckets of compatible cubes, then each bucket is 

98 converted into a single aggregatable cube with ``forecast_period`` and 

99 ``forecast_reference_time`` dimension coordinates. 

100 

101 Arguments 

102 --------- 

103 cubes: iris.cube.Cube | iris.cube.CubeList 

104 Each cube is checked to determine if it has the the necessary 

105 dimensional coordinates to be aggregatable, being processed if needed. 

106 

107 Returns 

108 ------- 

109 cubes: iris.cube.CubeList 

110 A CubeList of time aggregatable cubes. 

111 

112 Raises 

113 ------ 

114 ValueError 

115 If any of the provided cubes cannot be made aggregatable. 

116 

117 Notes 

118 ----- 

119 This is a simple operator designed to ensure that a Cube is aggregatable 

120 across cases. If a CubeList is presented it will create an aggregatable Cube 

121 from that list. Its functionality is for case study (or trial) aggregation 

122 to ensure that the full dataset can be loaded as a single cube. This 

123 functionality is particularly useful for percentiles, Q-Q plots, and 

124 histograms. 

125 

126 The necessary dimension coordinates for a cube to be aggregatable are 

127 ``forecast_period`` and ``forecast_reference_time``. 

128 """ 

129 

130 # Group compatible cubes. 

131 class Buckets: 

132 def __init__(self): 

133 self.buckets = [] 

134 

135 def add(self, cube: iris.cube.Cube): 

136 """Add a cube into a bucket. 

137 

138 If the cube is compatible with an existing bucket it is added there. 

139 Otherwise it gets its own bucket. 

140 """ 

141 for bucket in self.buckets: 

142 if bucket[0].is_compatible(cube): 

143 bucket.append(cube) 

144 return 

145 self.buckets.append(iris.cube.CubeList([cube])) 

146 

147 def get_buckets(self) -> list[iris.cube.CubeList]: 

148 return self.buckets 

149 

150 b = Buckets() 

151 for cube in iter_maybe(cubes): 

152 b.add(cube) 

153 buckets = b.get_buckets() 

154 

155 logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets)) 

156 

157 # Ensure each bucket is a single aggregatable cube. 

158 aggregatable_cubes = iris.cube.CubeList() 

159 for bucket in buckets: 

160 # Single cubes that are already aggregatable won't need processing. 

161 if len(bucket) == 1 and is_time_aggregatable(bucket[0]): 

162 aggregatable_cube = bucket[0] 

163 aggregatable_cubes.append(aggregatable_cube) 

164 continue 

165 

166 # Create an aggregatable cube from the provided CubeList. 

167 to_merge = iris.cube.CubeList() 

168 for cube in bucket: 

169 to_merge.extend( 

170 cube.slices_over(["forecast_period", "forecast_reference_time"]) 

171 ) 

172 aggregatable_cube = to_merge.merge_cube() 

173 

174 # Verify cube is now aggregatable. 

175 if not is_time_aggregatable(aggregatable_cube): 

176 raise ValueError( 

177 "Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.", 

178 aggregatable_cube, 

179 ) 

180 aggregatable_cubes.append(aggregatable_cube) 

181 

182 return aggregatable_cubes 

183 

184 

185def add_hour_coordinate( 

186 cubes: iris.cube.Cube | iris.cube.CubeList, 

187) -> iris.cube.Cube | iris.cube.CubeList: 

188 """Add a category coordinate of hour of day to a Cube or CubeList. 

189 

190 Arguments 

191 --------- 

192 cubes: iris.cube.Cube | iris.cube.CubeList 

193 Cube of any variable that has a time coordinate. 

194 Note input Cube or CubeList items should only have 1 time dimension. 

195 

196 Returns 

197 ------- 

198 cube: iris.cube.Cube 

199 A Cube with an additional auxiliary coordinate of hour. 

200 

201 Notes 

202 ----- 

203 This is a simple operator designed to be used prior to case aggregation for 

204 histograms, Q-Q plots, and percentiles when aggregated by hour of day. 

205 """ 

206 new_cubelist = iris.cube.CubeList() 

207 for cube in iter_maybe(cubes): 

208 # Add a category coordinate of hour into each cube. 

209 iris.util.promote_aux_coord_to_dim_coord(cube, "time") 

210 iris.coord_categorisation.add_hour(cube, "time", name="hour") 

211 cube.coord("hour").units = "hours" 

212 new_cubelist.append(cube) 

213 

214 if len(new_cubelist) == 1: 

215 return new_cubelist[0] 

216 else: 

217 return new_cubelist