Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 79%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:48 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Retrieve the files from the filesystem for the current cycle point.""" 

16 

17import abc 

18import ast 

19import glob 

20import itertools 

21import logging 

22import os 

23import ssl 

24import sys 

25import urllib.parse 

26import urllib.request 

27from concurrent.futures import ThreadPoolExecutor 

28from datetime import datetime, timedelta 

29from pathlib import Path 

30from typing import Literal 

31 

32import isodate 

33 

34logging.basicConfig( 

35 level=os.getenv("LOGLEVEL", "INFO"), 

36 format="%(asctime)s %(levelname)s %(message)s", 

37 stream=sys.stdout, 

38) 

39 

40 

41class FileRetrieverABC(abc.ABC): 

42 """Abstract base class for retrieving files from a data source. 

43 

44 The `get_file` method must be defined. Optionally the __enter__ and __exit__ 

45 methods maybe be overridden to add setup or cleanup code. 

46 

47 The class is designed to be used as a context manager, so that resources can 

48 be cleaned up after the retrieval is complete. All the files of a model are 

49 retrieved within a single context manager block, within which the `get_file` 

50 method is called for each file path. 

51 """ 

52 

53 def __enter__(self) -> "FileRetrieverABC": 

54 """Initialise the file retriever.""" 

55 logging.debug("Initialising FileRetriever.") 

56 return self 

57 

58 def __exit__(self, exc_type, exc_value, traceback): 

59 """Clean up the file retriever.""" 

60 logging.debug("Tearing down FileRetriever.") 

61 

62 @abc.abstractmethod 

63 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover 

64 """Save a file from the data source to the output directory. 

65 

66 Not all of the given paths will exist, so FileNotFoundErrors should be 

67 logged, but not raised. 

68 

69 Implementations should be thread safe, as the method is called from 

70 multiple threads. 

71 

72 Parameters 

73 ---------- 

74 file_path: str 

75 Path of the file to copy on the data source. It may contain patterns 

76 like globs, which will be expanded in a system specific manner. 

77 output_dir: str 

78 Path to filesystem directory into which the file should be copied. 

79 

80 Returns 

81 ------- 

82 bool: 

83 True if files were transferred, otherwise False. 

84 """ 

85 raise NotImplementedError 

86 

87 

88class FilesystemFileRetriever(FileRetrieverABC): 

89 """Retrieve files from the filesystem.""" 

90 

91 def get_file(self, file_path: str, output_dir: str) -> bool: 

92 """Save a file from the filesystem to the output directory. 

93 

94 Parameters 

95 ---------- 

96 file_path: str 

97 Path of the file to copy on the filesystem. It may contain patterns 

98 like globs, which will be expanded in a system specific manner. 

99 output_dir: str 

100 Path to filesystem directory into which the file should be copied. 

101 

102 Returns 

103 ------- 

104 bool: 

105 True if files were transferred, otherwise False. 

106 """ 

107 file_paths = glob.glob(os.path.expanduser(file_path)) 

108 logging.debug("Copying files:\n%s", "\n".join(file_paths)) 

109 if not file_paths: 

110 logging.warning("file_path does not match any files: %s", file_path) 

111 any_files_copied = False 

112 for f in file_paths: 

113 file = Path(f).absolute() 

114 try: 

115 # Save to a filename derived from the full path, to 

116 # differentiate similarly named files from different 

117 # directories. 

118 # `)` replaces `/` as it can be in file names. 

119 os.symlink(file, f"{output_dir}/{')'.join(file.parts)}") 

120 any_files_copied = True 

121 except OSError as err: 

122 logging.warning("Failed to copy %s, error: %s", file, err) 

123 return any_files_copied 

124 

125 

126class HTTPFileRetriever(FileRetrieverABC): 

127 """Retrieve files via HTTP.""" 

128 

129 def get_file(self, file_path: str, output_dir: str) -> bool: 

130 """Save a file from a HTTP address to the output directory. 

131 

132 Parameters 

133 ---------- 

134 file_path: str 

135 Path of the file to copy on MASS. It may contain patterns like 

136 globs, which will be expanded in a system specific manner. 

137 output_dir: str 

138 Path to filesystem directory into which the file should be copied. 

139 

140 Returns 

141 ------- 

142 bool: 

143 True if files were transferred, otherwise False. 

144 """ 

145 ctx = ssl.create_default_context() 

146 # Needed to enable compatibility with malformed iBoss TLS certificates. 

147 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT 

148 save_path = ( 

149 f"{output_dir.removesuffix('/')}/" 

150 + urllib.parse.urlparse(file_path).path.split("/")[-1] 

151 ) 

152 any_files_copied = False 

153 try: 

154 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response: 

155 with open(save_path, "wb") as fp: 

156 # Read in 1 MiB chunks so data needn't fit in memory. 

157 while data := response.read(1024 * 1024): 

158 fp.write(data) 

159 any_files_copied = True 

160 except OSError as err: 

161 logging.warning("Failed to retrieve %s, error: %s", file_path, err) 

162 return any_files_copied 

163 

164 

165def _get_needed_environment_variables() -> dict: 

166 """Load the needed variables from the environment.""" 

167 variables = { 

168 "raw_path": os.environ["DATA_PATH"], 

169 "date_type": os.environ["DATE_TYPE"], 

170 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

171 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

172 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]), 

173 "model_identifier": os.environ["MODEL_IDENTIFIER"], 

174 "rose_datac": os.environ["ROSE_DATAC"], 

175 } 

176 try: 

177 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"]) 

178 except KeyError: 

179 # Data period is not needed for initiation time. 

180 if variables["date_type"] != "initiation": 

181 raise 

182 variables["data_period"] = None 

183 logging.debug("Environment variables loaded: %s", variables) 

184 return variables 

185 

186 

187def _get_needed_environment_variables_obs() -> dict: 

188 """Load the needed variables from the environment.""" 

189 variables = { 

190 "subtype": os.environ["OBS_SUBTYPE"], 

191 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

192 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

193 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]), 

194 "model_identifier": "OBS", 

195 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS")) 

196 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0 

197 else None, 

198 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT")) 

199 if len(os.environ.get("SUBAREA_EXTENT")) > 0 

200 else None, 

201 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]), 

202 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]), 

203 "rose_datac": os.environ["ROSE_DATAC"], 

204 } 

205 logging.debug("Environment variables loaded: %s", variables) 

206 return variables 

207 

208 

209def _template_file_path( 

210 raw_path: str, 

211 date_type: Literal["validity", "initiation"], 

212 data_time: datetime, 

213 forecast_length: timedelta, 

214 forecast_offset: timedelta, 

215 data_period: timedelta, 

216) -> list[str]: 

217 """Fill time placeholders to generate a file path to fetch.""" 

218 placeholder_times: list[datetime] = [] 

219 lead_times: list[timedelta] = [] 

220 match date_type: 

221 case "validity": 

222 date = data_time 

223 while date < data_time + forecast_length: 

224 placeholder_times.append(date) 

225 date += data_period 

226 case "initiation": 

227 placeholder_times.append(data_time) 

228 lead_time = forecast_offset 

229 while lead_time < forecast_length: 

230 lead_times.append(lead_time) 

231 lead_time += data_period 

232 case _: 

233 raise ValueError(f"Invalid date type: {date_type}") 

234 

235 paths: set[str] = set() 

236 for placeholder_time in placeholder_times: 

237 # Expand out all other format strings. 

238 path = placeholder_time.strftime(os.path.expandvars(raw_path)) 

239 if lead_times: 

240 # Expand out lead time format strings, %N. 

241 for lead_time in lead_times: 

242 # BUG: Will not respect escaped % signs, e.g: %%N. 

243 paths.add( 

244 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}") 

245 ) 

246 else: 

247 paths.add(path) 

248 return sorted(paths) 

249 

250 

251def fetch_data(file_retriever: FileRetrieverABC): 

252 """Fetch the data for a model. 

253 

254 The following environment variables need to be set: 

255 * ANALYSIS_OFFSET 

256 * ANALYSIS_LENGTH 

257 * CYLC_TASK_CYCLE_POINT 

258 * DATA_PATH 

259 * DATA_PERIOD 

260 * DATE_TYPE 

261 * MODEL_IDENTIFIER 

262 * ROSE_DATAC 

263 

264 Parameters 

265 ---------- 

266 file_retriever: FileRetriever 

267 FileRetriever implementation to use. 

268 

269 Raises 

270 ------ 

271 FileNotFound: 

272 If no files are found for the model, across all tried paths. 

273 """ 

274 v = _get_needed_environment_variables() 

275 

276 # Prepare output directory. 

277 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}" 

278 os.makedirs(cycle_data_dir, exist_ok=True) 

279 logging.debug("Output directory: %s", cycle_data_dir) 

280 

281 # Get file paths. 

282 paths = _template_file_path( 

283 v["raw_path"], 

284 v["date_type"], 

285 v["data_time"], 

286 v["forecast_length"], 

287 v["forecast_offset"], 

288 v["data_period"], 

289 ) 

290 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

291 

292 # Use file retriever to transfer data with multiple threads. 

293 with file_retriever() as retriever, ThreadPoolExecutor() as executor: 

294 files_found = executor.map( 

295 retriever.get_file, paths, itertools.repeat(cycle_data_dir) 

296 ) 

297 # Exhaust the iterator with list so all futures get resolved before we 

298 # exit the with block, ensuring all files are retrieved. 

299 any_files_found = any(list(files_found)) 

300 if not any_files_found: 

301 raise FileNotFoundError("No files found for model!") 

302 

303 

304def fetch_obs(obs_retriever: FileRetrieverABC): 

305 """Fetch the observations corresponding to a model run. 

306 

307 The following environment variables need to be set: 

308 * ANALYSIS_OFFSET 

309 * ANALYSIS_LENGTH 

310 * CYLC_TASK_CYCLE_POINT 

311 * DATA_PATH 

312 * DATA_PERIOD 

313 * DATE_TYPE 

314 * MODEL_IDENTIFIER 

315 * ROSE_DATAC 

316 

317 Parameters 

318 ---------- 

319 obs_retriever: ObsRetriever 

320 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever. 

321 

322 Raises 

323 ------ 

324 FileNotFound: 

325 If no observations are available. 

326 """ 

327 v = _get_needed_environment_variables_obs() 

328 

329 # Prepare output directory. 

330 cycle_obs_dir = f"{v['rose_datac']}/data/OBS" 

331 os.makedirs(cycle_obs_dir, exist_ok=True) 

332 logging.debug("Output directory: %s", cycle_obs_dir) 

333 

334 # We will get just one file for now, but follow the templating 

335 # syntax for the model for consistency. 

336 obs_base_path = ( 

337 v["subtype"] 

338 + "_" 

339 + "%Y%m%dT%H%MZ_dt_" 

340 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3) 

341 + ".nc" 

342 ) 

343 paths = _template_file_path( 

344 obs_base_path, 

345 "initiation", 

346 v["data_time"], 

347 v["forecast_length"], 

348 timedelta(seconds=0), 

349 v["obs_interval"], 

350 ) 

351 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

352 

353 # Use obs retriever to transfer data with multiple threads. 

354 # We shouldn't need to iterate as we do for the forecast data 

355 # because these files will be smaller. 

356 try: 

357 obs_retriever.get_file( 

358 paths[0], 

359 v["subtype"], 

360 v["obs_fields"], 

361 v["data_time"], 

362 v["obs_offset"], 

363 v["forecast_length"], 

364 v["obs_interval"], 

365 cycle_obs_dir, 

366 wmo_nmbrs=v["wmo_nmbrs"], 

367 subarea_extent=v["subarea_extent"], 

368 ) 

369 except Exception as exc: 

370 raise ValueError("No observations available.") from exc