Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 90%

114 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 12:47 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Retrieve the files from the filesystem for the current cycle point.""" 

16 

17import abc 

18import ast 

19import glob 

20import itertools 

21import logging 

22import os 

23import ssl 

24import urllib.parse 

25import urllib.request 

26from concurrent.futures import ThreadPoolExecutor 

27from datetime import datetime, timedelta 

28from pathlib import Path 

29from typing import Literal 

30 

31import isodate 

32 

33logging.basicConfig( 

34 level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" 

35) 

36 

37 

38class FileRetrieverABC(abc.ABC): 

39 """Abstract base class for retrieving files from a data source. 

40 

41 The `get_file` method must be defined. Optionally the __enter__ and __exit__ 

42 methods maybe be overridden to add setup or cleanup code. 

43 

44 The class is designed to be used as a context manager, so that resources can 

45 be cleaned up after the retrieval is complete. All the files of a model are 

46 retrieved within a single context manager block, within which the `get_file` 

47 method is called for each file path. 

48 """ 

49 

50 def __enter__(self) -> "FileRetrieverABC": 

51 """Initialise the file retriever.""" 

52 logging.debug("Initialising FileRetriever.") 

53 return self 

54 

55 def __exit__(self, exc_type, exc_value, traceback): 

56 """Clean up the file retriever.""" 

57 logging.debug("Tearing down FileRetriever.") 

58 

59 @abc.abstractmethod 

60 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover 

61 """Save a file from the data source to the output directory. 

62 

63 Not all of the given paths will exist, so FileNotFoundErrors should be 

64 logged, but not raised. 

65 

66 Implementations should be thread safe, as the method is called from 

67 multiple threads. 

68 

69 Parameters 

70 ---------- 

71 file_path: str 

72 Path of the file to copy on the data source. It may contain patterns 

73 like globs, which will be expanded in a system specific manner. 

74 output_dir: str 

75 Path to filesystem directory into which the file should be copied. 

76 

77 Returns 

78 ------- 

79 bool: 

80 True if files were transferred, otherwise False. 

81 """ 

82 raise NotImplementedError 

83 

84 

85class FilesystemFileRetriever(FileRetrieverABC): 

86 """Retrieve files from the filesystem.""" 

87 

88 def get_file(self, file_path: str, output_dir: str) -> bool: 

89 """Save a file from the filesystem to the output directory. 

90 

91 Parameters 

92 ---------- 

93 file_path: str 

94 Path of the file to copy on the filesystem. It may contain patterns 

95 like globs, which will be expanded in a system specific manner. 

96 output_dir: str 

97 Path to filesystem directory into which the file should be copied. 

98 

99 Returns 

100 ------- 

101 bool: 

102 True if files were transferred, otherwise False. 

103 """ 

104 file_paths = glob.glob(os.path.expanduser(file_path)) 

105 logging.debug("Copying files:\n%s", "\n".join(file_paths)) 

106 if not file_paths: 

107 logging.warning("file_path does not match any files: %s", file_path) 

108 any_files_copied = False 

109 for f in file_paths: 

110 file = Path(f) 

111 try: 

112 # We know file exists from glob. 

113 os.symlink(file.absolute(), f"{output_dir}/{file.name}") 

114 any_files_copied = True 

115 except OSError as err: 

116 logging.warning("Failed to copy %s, error: %s", file, err) 

117 return any_files_copied 

118 

119 

120class HTTPFileRetriever(FileRetrieverABC): 

121 """Retrieve files via HTTP.""" 

122 

123 def get_file(self, file_path: str, output_dir: str) -> bool: 

124 """Save a file from a HTTP address to the output directory. 

125 

126 Parameters 

127 ---------- 

128 file_path: str 

129 Path of the file to copy on MASS. It may contain patterns like 

130 globs, which will be expanded in a system specific manner. 

131 output_dir: str 

132 Path to filesystem directory into which the file should be copied. 

133 

134 Returns 

135 ------- 

136 bool: 

137 True if files were transferred, otherwise False. 

138 """ 

139 ctx = ssl.create_default_context() 

140 # Needed to enable compatibility with malformed iBoss TLS certificates. 

141 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT 

142 save_path = ( 

143 f"{output_dir.removesuffix('/')}/" 

144 + urllib.parse.urlparse(file_path).path.split("/")[-1] 

145 ) 

146 any_files_copied = False 

147 try: 

148 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response: 

149 with open(save_path, "wb") as fp: 

150 # Read in 1 MiB chunks so data needn't fit in memory. 

151 while data := response.read(1024 * 1024): 

152 fp.write(data) 

153 any_files_copied = True 

154 except OSError as err: 

155 logging.warning("Failed to retrieve %s, error: %s", file_path, err) 

156 return any_files_copied 

157 

158 

159def _get_needed_environment_variables() -> dict: 

160 """Load the needed variables from the environment.""" 

161 variables = { 

162 "raw_path": os.environ["DATA_PATH"], 

163 "date_type": os.environ["DATE_TYPE"], 

164 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

165 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

166 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]), 

167 "model_identifier": os.environ["MODEL_IDENTIFIER"], 

168 "rose_datac": os.environ["ROSE_DATAC"], 

169 } 

170 try: 

171 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"]) 

172 except KeyError: 

173 # Data period is not needed for initiation time. 

174 if variables["date_type"] != "initiation": 

175 raise 

176 variables["data_period"] = None 

177 logging.debug("Environment variables loaded: %s", variables) 

178 return variables 

179 

180 

181def _get_needed_environment_variables_obs() -> dict: 

182 """Load the needed variables from the environment.""" 

183 variables = { 

184 "subtype": os.environ["OBS_SUBTYPE"], 

185 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

186 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

187 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]), 

188 "model_identifier": "OBS", 

189 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS")) 

190 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0 

191 else None, 

192 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT")) 

193 if len(os.environ.get("SUBAREA_EXTENT")) > 0 

194 else None, 

195 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]), 

196 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]), 

197 "rose_datac": os.environ["ROSE_DATAC"], 

198 } 

199 logging.debug("Environment variables loaded: %s", variables) 

200 return variables 

201 

202 

203def _template_file_path( 

204 raw_path: str, 

205 date_type: Literal["validity", "initiation"], 

206 data_time: datetime, 

207 forecast_length: timedelta, 

208 forecast_offset: timedelta, 

209 data_period: timedelta, 

210) -> list[str]: 

211 """Fill time placeholders to generate a file path to fetch.""" 

212 placeholder_times: list[datetime] = [] 

213 lead_times: list[timedelta] = [] 

214 match date_type: 

215 case "validity": 

216 date = data_time 

217 while date < data_time + forecast_length: 

218 placeholder_times.append(date) 

219 date += data_period 

220 case "initiation": 

221 placeholder_times.append(data_time) 

222 lead_time = forecast_offset 

223 while lead_time < forecast_length: 

224 lead_times.append(lead_time) 

225 lead_time += data_period 

226 case _: 

227 raise ValueError(f"Invalid date type: {date_type}") 

228 

229 paths: set[str] = set() 

230 for placeholder_time in placeholder_times: 

231 # Expand out all other format strings. 

232 path = placeholder_time.strftime(os.path.expandvars(raw_path)) 

233 if lead_times: 

234 # Expand out lead time format strings, %N. 

235 for lead_time in lead_times: 

236 # BUG: Will not respect escaped % signs, e.g: %%N. 

237 paths.add( 

238 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}") 

239 ) 

240 else: 

241 paths.add(path) 

242 return sorted(paths) 

243 

244 

245def fetch_data(file_retriever: FileRetrieverABC): 

246 """Fetch the data for a model. 

247 

248 The following environment variables need to be set: 

249 * ANALYSIS_OFFSET 

250 * ANALYSIS_LENGTH 

251 * CYLC_TASK_CYCLE_POINT 

252 * DATA_PATH 

253 * DATA_PERIOD 

254 * DATE_TYPE 

255 * MODEL_IDENTIFIER 

256 * ROSE_DATAC 

257 

258 Parameters 

259 ---------- 

260 file_retriever: FileRetriever 

261 FileRetriever implementation to use. 

262 

263 Raises 

264 ------ 

265 FileNotFound: 

266 If no files are found for the model, across all tried paths. 

267 """ 

268 v = _get_needed_environment_variables() 

269 

270 # Prepare output directory. 

271 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}" 

272 os.makedirs(cycle_data_dir, exist_ok=True) 

273 logging.debug("Output directory: %s", cycle_data_dir) 

274 

275 # Get file paths. 

276 paths = _template_file_path( 

277 v["raw_path"], 

278 v["date_type"], 

279 v["data_time"], 

280 v["forecast_length"], 

281 v["forecast_offset"], 

282 v["data_period"], 

283 ) 

284 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

285 

286 # Use file retriever to transfer data with multiple threads. 

287 with file_retriever() as retriever, ThreadPoolExecutor() as executor: 

288 files_found = executor.map( 

289 retriever.get_file, paths, itertools.repeat(cycle_data_dir) 

290 ) 

291 # Exhaust the iterator with list so all futures get resolved before we 

292 # exit the with block, ensuring all files are retrieved. 

293 any_files_found = any(list(files_found)) 

294 if not any_files_found: 

295 raise FileNotFoundError("No files found for model!") 

296 

297 

298def fetch_obs(obs_retriever: FileRetrieverABC): 

299 """Fetch the observations corresponding to a model run. 

300 

301 The following environment variables need to be set: 

302 * ANALYSIS_OFFSET 

303 * ANALYSIS_LENGTH 

304 * CYLC_TASK_CYCLE_POINT 

305 * DATA_PATH 

306 * DATA_PERIOD 

307 * DATE_TYPE 

308 * MODEL_IDENTIFIER 

309 * ROSE_DATAC 

310 

311 Parameters 

312 ---------- 

313 obs_retriever: ObsRetriever 

314 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever. 

315 

316 Raises 

317 ------ 

318 FileNotFound: 

319 If no observations are available. 

320 """ 

321 v = _get_needed_environment_variables_obs() 

322 

323 # Prepare output directory. 

324 cycle_obs_dir = f"{v['rose_datac']}/data/OBS" 

325 os.makedirs(cycle_obs_dir, exist_ok=True) 

326 logging.debug("Output directory: %s", cycle_obs_dir) 

327 

328 # We will get just one file for now, but follow the templating 

329 # syntax for the model for consistency. 

330 obs_base_path = ( 

331 v["subtype"] 

332 + "_" 

333 + "%Y%m%dT%H%MZ_dt_" 

334 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3) 

335 + ".nc" 

336 ) 

337 paths = _template_file_path( 

338 obs_base_path, 

339 "initiation", 

340 v["data_time"], 

341 v["forecast_length"], 

342 timedelta(seconds=0), 

343 v["obs_interval"], 

344 ) 

345 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

346 

347 # Use obs retriever to transfer data with multiple threads. 

348 # We shouldn't need to iterate as we do for the forecast data 

349 # because these files will be smaller. 

350 try: 

351 obs_retriever.get_file( 

352 paths[0], 

353 v["subtype"], 

354 v["obs_fields"], 

355 v["data_time"], 

356 v["obs_offset"], 

357 v["forecast_length"], 

358 v["obs_interval"], 

359 cycle_obs_dir, 

360 wmo_nmbrs=v["wmo_nmbrs"], 

361 subarea_extent=v["subarea_extent"], 

362 ) 

363 except Exception as exc: 

364 raise ValueError("No observations available.") from exc