Coverage for src/CSET/operators/_utils.py: 94%

128 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 15:17 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Common operator functionality used across CSET. 

17 

18Functions below should only be added if it is not suitable as a standalone 

19operator, and will be used across multiple operators. 

20""" 

21 

22import logging 

23import re 

24from datetime import timedelta 

25 

26import iris 

27import iris.cube 

28import iris.exceptions 

29import iris.util 

30from iris.time import PartialDateTime 

31 

32 

33def pdt_fromisoformat( 

34 datestring, 

35) -> tuple[iris.time.PartialDateTime, timedelta | None]: 

36 """Generate PartialDateTime object. 

37 

38 Function that takes an ISO 8601 date string and returns a PartialDateTime object. 

39 

40 Arguments 

41 --------- 

42 datestring: str 

43 ISO 8601 date. 

44 

45 Returns 

46 ------- 

47 time_object: iris.time.PartialDateTime 

48 """ 

49 

50 def make_offset(sign, value) -> timedelta: 

51 if len(value) not in [2, 4, 5]: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 raise ValueError(f'expected "hh", "hhmm", or "hh:mm", got {value}') 

53 

54 hours = int(value[:2]) 

55 minutes = 0 

56 if len(value) in [4, 5]: 

57 minutes = int(value[-2:]) 

58 return timedelta(hours=sign * hours, minutes=sign * minutes) 

59 

60 # Remove the microseconds coord due to no support in PartialDateTime 

61 datestring = re.sub(r"\.\d+", "", datestring) 

62 

63 datetime_split = datestring.split("T") 

64 date = datetime_split[0] 

65 if len(datetime_split) == 1: 

66 time = "" 

67 elif len(datetime_split) == 2: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true

68 time = datetime_split[1] 

69 else: 

70 raise ValueError("datesting in an unexpected format") 

71 

72 offset = None 

73 time_split = time.split("+") 

74 if len(time_split) == 2: 

75 time = time_split[0] 

76 offset = make_offset(1, time_split[1]) 

77 else: 

78 time_split = time.split("-") 

79 if len(time_split) == 2: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 time = time_split[0] 

81 offset = make_offset(-1, time_split[1]) 

82 else: 

83 offset = None 

84 

85 if re.fullmatch(r"\d{8}", date): 

86 date = f"{date[0:4]}-{date[4:6]}-{date[6:8]}" 

87 elif re.fullmatch(r"\d{6}", date): 

88 date = f"{date[0:4]}-{date[4:6]}" 

89 

90 if len(date) < 7: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 raise ValueError(f"Invalid datestring: {datestring}, must be at least YYYY-MM") 

92 

93 # Returning a PartialDateTime for the special case of string form "YYYY-MM" 

94 if re.fullmatch(r"\d{4}-\d{2}", date): 

95 pdt = PartialDateTime( 

96 year=int(date[0:4]), 

97 month=int(date[5:7]), 

98 day=None, 

99 hour=None, 

100 minute=None, 

101 second=None, 

102 ) 

103 return pdt, offset 

104 

105 year = int(date[0:4]) 

106 month = int(date[5:7]) 

107 day = int(date[8:10]) 

108 

109 kwargs = dict(year=year, month=month, day=day, hour=0, minute=0, second=0) 

110 

111 # Normalise the time parts into standard format 

112 if re.fullmatch(r"\d{4}", time): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 time = f"{time[0:2]}:{time[2:4]}" 

114 if re.fullmatch(r"\d{6}", time): 

115 time = f"{time[0:2]}:{time[2:4]}:{time[4:6]}" 

116 

117 if len(time) >= 2: 

118 kwargs["hour"] = int(time[0:2]) 

119 if len(time) >= 5: 

120 kwargs["minute"] = int(time[3:5]) 

121 if len(time) >= 8: 

122 kwargs["second"] = int(time[6:8]) 

123 

124 pdt = PartialDateTime(**kwargs) 

125 

126 return pdt, offset 

127 

128 

129def get_cube_yxcoordname(cube: iris.cube.Cube) -> tuple[str, str]: 

130 """ 

131 Return horizontal dimension coordinate name(s) from a given cube. 

132 

133 Arguments 

134 --------- 

135 

136 cube: iris.cube.Cube 

137 An iris cube which will be checked to see if it contains coordinate 

138 names that match a pre-defined list of acceptable horizontal 

139 dimension coordinate names. 

140 

141 Returns 

142 ------- 

143 (y_coord, x_coord) 

144 A tuple containing the horizontal coordinate name for latitude and longitude respectively 

145 found within the cube. 

146 

147 Raises 

148 ------ 

149 ValueError 

150 If a unique y/x horizontal coordinate cannot be found. 

151 """ 

152 # Acceptable horizontal coordinate names. 

153 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

154 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

155 

156 # Get a list of dimension coordinate names for the cube 

157 dim_coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

158 coord_names = [coord.name() for coord in cube.coords()] 

159 

160 # Check which x-coordinate we have, if any 

161 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

162 if len(x_coords) != 1: 

163 x_coords = [coord for coord in dim_coord_names if coord in X_COORD_NAMES] 

164 if len(x_coords) != 1: 

165 raise ValueError("Could not identify a unique x-coordinate in cube") 

166 

167 # Check which y-coordinate we have, if any 

168 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

169 if len(y_coords) != 1: 

170 y_coords = [coord for coord in dim_coord_names if coord in Y_COORD_NAMES] 

171 if len(y_coords) != 1: 

172 raise ValueError("Could not identify a unique y-coordinate in cube") 

173 

174 return (y_coords[0], x_coords[0]) 

175 

176 

177def get_cube_coordindex(cube: iris.cube.Cube, coord_name) -> int: 

178 """ 

179 Return coordinate dimension for a named coordinate from a given cube. 

180 

181 Arguments 

182 --------- 

183 

184 cube: iris.cube.Cube 

185 An iris cube which will be checked to see if it contains coordinate 

186 names that match a pre-defined list of acceptable horizontal 

187 coordinate names. 

188 

189 coord_name: str 

190 A cube dimension name 

191 

192 Returns 

193 ------- 

194 coord_index 

195 An integer specifying where in the cube dimension list a specified coordinate name is found. 

196 

197 Raises 

198 ------ 

199 ValueError 

200 If a specified dimension coordinate cannot be found. 

201 """ 

202 # Get a list of dimension coordinate names for the cube 

203 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

204 

205 # Check if requested dimension is found in cube and get index 

206 if coord_name in coord_names: 

207 coord_index = cube.coord_dims(coord_name)[0] 

208 else: 

209 raise ValueError("Could not find requested dimension %s", coord_name) 

210 

211 return coord_index 

212 

213 

214def is_spatialdim(cube: iris.cube.Cube) -> bool: 

215 """Determine whether a cube is has two spatial dimension coordinates. 

216 

217 If cube has both spatial dims, it will contain two unique coordinates 

218 that explain space (latitude and longitude). The coordinates have to 

219 be iterable/contain usable dimension data, as cubes may contain these 

220 coordinates as scalar dimensions after being collapsed. 

221 

222 Arguments 

223 --------- 

224 cube: iris.cube.Cube 

225 An iris cube which will be checked to see if it contains coordinate 

226 names that match a pre-defined list of acceptable coordinate names. 

227 

228 Returns 

229 ------- 

230 bool 

231 If true, then the cube has a spatial projection and thus can be plotted 

232 as a map. 

233 """ 

234 # Acceptable horizontal coordinate names. 

235 X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] 

236 Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] 

237 

238 # Get a list of coordinate names for the cube 

239 coord_names = [coord.name() for coord in cube.dim_coords] 

240 x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] 

241 y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] 

242 

243 # If there is one coordinate for both x and y direction return True. 

244 if len(x_coords) == 1 and len(y_coords) == 1: 

245 return True 

246 else: 

247 return False 

248 

249 

250def is_transect(cube: iris.cube.Cube) -> bool: 

251 """Determine whether a cube is a transect. 

252 

253 If cube is a transect, it will contain only one spatial (map) coordinate, 

254 and one vertical coordinate (either pressure or model level). 

255 

256 Arguments 

257 --------- 

258 cube: iris.cube.Cube 

259 An iris cube which will be checked to see if it contains coordinate 

260 names that match a pre-defined list of acceptable coordinate names. 

261 

262 Returns 

263 ------- 

264 bool 

265 If true, then the cube is a transect that contains one spatial (map) 

266 coordinate and one vertical coordinate. 

267 """ 

268 # Acceptable spatial (map) coordinate names. 

269 SPATIAL_MAP_COORD_NAMES = [ 

270 "longitude", 

271 "grid_longitude", 

272 "projection_x_coordinate", 

273 "x", 

274 "latitude", 

275 "grid_latitude", 

276 "projection_y_coordinate", 

277 "y", 

278 "distance", 

279 ] 

280 

281 # Acceptable vertical coordinate names 

282 VERTICAL_COORD_NAMES = ["pressure", "model_level_number", "level_height"] 

283 

284 # Get a list of coordinate names for the cube 

285 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

286 

287 # Check which spatial coordinates we have. 

288 spatial_coords = [ 

289 coord for coord in coord_names if coord in SPATIAL_MAP_COORD_NAMES 

290 ] 

291 if len(spatial_coords) != 1: 

292 return False 

293 

294 # Check which vertical coordinates we have. 

295 vertical_coords = [coord for coord in coord_names if coord in VERTICAL_COORD_NAMES] 

296 if len(vertical_coords) != 1: 

297 return False 

298 

299 # Passed criteria so return True 

300 return True 

301 

302 

303def fully_equalise_attributes(cubes: iris.cube.CubeList): 

304 """Remove any unique attributes between cubes or coordinates in place.""" 

305 # Equalise cube attributes. 

306 removed = iris.util.equalise_attributes(cubes) 

307 logging.debug("Removed attributes from cube: %s", removed) 

308 

309 # Equalise coordinate attributes. 

310 coord_sets = [{coord.name() for coord in cube.coords()} for cube in cubes] 

311 

312 all_coords = set.union(*coord_sets) 

313 coords_to_equalise = set.intersection(*coord_sets) 

314 coords_to_remove = set.difference(all_coords, coords_to_equalise) 

315 

316 logging.debug("All coordinates: %s", all_coords) 

317 logging.debug("Coordinates to remove: %s", coords_to_remove) 

318 logging.debug("Coordinates to equalise: %s", coords_to_equalise) 

319 

320 for coord in coords_to_remove: 

321 for cube in cubes: 

322 try: 

323 cube.remove_coord(coord) 

324 logging.debug("Removed coordinate %s from %s cube.", coord, cube.name()) 

325 except iris.exceptions.CoordinateNotFoundError: 

326 pass 

327 

328 for coord in coords_to_equalise: 

329 removed = iris.util.equalise_attributes([cube.coord(coord) for cube in cubes]) 

330 logging.debug("Removed attributes from coordinate %s: %s", coord, removed) 

331 

332 return cubes 

333 

334 

335def is_time_aggregatable(cube: iris.cube.Cube) -> bool: 

336 """Determine whether a cube can be aggregated in time. 

337 

338 If a cube is aggregatable it will contain both a 'forecast_reference_time' 

339 and 'forecast_period' coordinate as dimensional coordinates. 

340 

341 Arguments 

342 --------- 

343 cube: iris.cube.Cube 

344 An iris cube which will be checked to see if it is aggregatable based 

345 on a set of pre-defined dimensional time coordinates: 

346 'forecast_period' and 'forecast_reference_time'. 

347 

348 Returns 

349 ------- 

350 bool 

351 If true, then the cube is aggregatable and contains dimensional 

352 coordinates including both 'forecast_reference_time' and 

353 'forecast_period'. 

354 """ 

355 # Acceptable time coordinate names for aggregatable cube. 

356 TEMPORAL_COORD_NAMES = ["forecast_period", "forecast_reference_time"] 

357 

358 # Coordinate names for the cube. 

359 coord_names = [coord.name() for coord in cube.coords(dim_coords=True)] 

360 

361 # Check which temporal coordinates we have. 

362 temporal_coords = [coord for coord in coord_names if coord in TEMPORAL_COORD_NAMES] 

363 # Return whether both coordinates are in the temporal coordinates. 

364 return len(temporal_coords) == 2