Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 90%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-17 11:22 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Retrieve the files from the filesystem for the current cycle point.""" 

16 

17import abc 

18import ast 

19import glob 

20import itertools 

21import logging 

22import os 

23import ssl 

24import sys 

25import urllib.parse 

26import urllib.request 

27from concurrent.futures import ThreadPoolExecutor 

28from datetime import datetime, timedelta 

29from pathlib import Path 

30from typing import Literal 

31 

32import isodate 

33 

34logging.basicConfig( 

35 level=os.getenv("LOGLEVEL", "INFO"), 

36 format="%(asctime)s %(levelname)s %(message)s", 

37 stream=sys.stdout, 

38) 

39 

40 

41class FileRetrieverABC(abc.ABC): 

42 """Abstract base class for retrieving files from a data source. 

43 

44 The `get_file` method must be defined. Optionally the __enter__ and __exit__ 

45 methods maybe be overridden to add setup or cleanup code. 

46 

47 The class is designed to be used as a context manager, so that resources can 

48 be cleaned up after the retrieval is complete. All the files of a model are 

49 retrieved within a single context manager block, within which the `get_file` 

50 method is called for each file path. 

51 """ 

52 

53 def __enter__(self) -> "FileRetrieverABC": 

54 """Initialise the file retriever.""" 

55 logging.debug("Initialising FileRetriever.") 

56 return self 

57 

58 def __exit__(self, exc_type, exc_value, traceback): 

59 """Clean up the file retriever.""" 

60 logging.debug("Tearing down FileRetriever.") 

61 

62 @abc.abstractmethod 

63 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover 

64 """Save a file from the data source to the output directory. 

65 

66 Not all of the given paths will exist, so FileNotFoundErrors should be 

67 logged, but not raised. 

68 

69 Implementations should be thread safe, as the method is called from 

70 multiple threads. 

71 

72 Parameters 

73 ---------- 

74 file_path: str 

75 Path of the file to copy on the data source. It may contain patterns 

76 like globs, which will be expanded in a system specific manner. 

77 output_dir: str 

78 Path to filesystem directory into which the file should be copied. 

79 

80 Returns 

81 ------- 

82 bool: 

83 True if files were transferred, otherwise False. 

84 """ 

85 raise NotImplementedError 

86 

87 

88class FilesystemFileRetriever(FileRetrieverABC): 

89 """Retrieve files from the filesystem.""" 

90 

91 def get_file(self, file_path: str, output_dir: str) -> bool: 

92 """Save a file from the filesystem to the output directory. 

93 

94 Parameters 

95 ---------- 

96 file_path: str 

97 Path of the file to copy on the filesystem. It may contain patterns 

98 like globs, which will be expanded in a system specific manner. 

99 output_dir: str 

100 Path to filesystem directory into which the file should be copied. 

101 

102 Returns 

103 ------- 

104 bool: 

105 True if files were transferred, otherwise False. 

106 """ 

107 file_paths = glob.glob(os.path.expanduser(file_path)) 

108 logging.debug("Copying files:\n%s", "\n".join(file_paths)) 

109 if not file_paths: 

110 logging.warning("file_path does not match any files: %s", file_path) 

111 any_files_copied = False 

112 for f in file_paths: 

113 file = Path(f) 

114 try: 

115 # We know file exists from glob. 

116 os.symlink(file.absolute(), f"{output_dir}/{file.name}") 

117 any_files_copied = True 

118 except OSError as err: 

119 logging.warning("Failed to copy %s, error: %s", file, err) 

120 return any_files_copied 

121 

122 

123class HTTPFileRetriever(FileRetrieverABC): 

124 """Retrieve files via HTTP.""" 

125 

126 def get_file(self, file_path: str, output_dir: str) -> bool: 

127 """Save a file from a HTTP address to the output directory. 

128 

129 Parameters 

130 ---------- 

131 file_path: str 

132 Path of the file to copy on MASS. It may contain patterns like 

133 globs, which will be expanded in a system specific manner. 

134 output_dir: str 

135 Path to filesystem directory into which the file should be copied. 

136 

137 Returns 

138 ------- 

139 bool: 

140 True if files were transferred, otherwise False. 

141 """ 

142 ctx = ssl.create_default_context() 

143 # Needed to enable compatibility with malformed iBoss TLS certificates. 

144 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT 

145 save_path = ( 

146 f"{output_dir.removesuffix('/')}/" 

147 + urllib.parse.urlparse(file_path).path.split("/")[-1] 

148 ) 

149 any_files_copied = False 

150 try: 

151 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response: 

152 with open(save_path, "wb") as fp: 

153 # Read in 1 MiB chunks so data needn't fit in memory. 

154 while data := response.read(1024 * 1024): 

155 fp.write(data) 

156 any_files_copied = True 

157 except OSError as err: 

158 logging.warning("Failed to retrieve %s, error: %s", file_path, err) 

159 return any_files_copied 

160 

161 

162def _get_needed_environment_variables() -> dict: 

163 """Load the needed variables from the environment.""" 

164 variables = { 

165 "raw_path": os.environ["DATA_PATH"], 

166 "date_type": os.environ["DATE_TYPE"], 

167 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

168 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

169 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]), 

170 "model_identifier": os.environ["MODEL_IDENTIFIER"], 

171 "rose_datac": os.environ["ROSE_DATAC"], 

172 } 

173 try: 

174 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"]) 

175 except KeyError: 

176 # Data period is not needed for initiation time. 

177 if variables["date_type"] != "initiation": 

178 raise 

179 variables["data_period"] = None 

180 logging.debug("Environment variables loaded: %s", variables) 

181 return variables 

182 

183 

184def _get_needed_environment_variables_obs() -> dict: 

185 """Load the needed variables from the environment.""" 

186 variables = { 

187 "subtype": os.environ["OBS_SUBTYPE"], 

188 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

189 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

190 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]), 

191 "model_identifier": "OBS", 

192 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS")) 

193 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0 

194 else None, 

195 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT")) 

196 if len(os.environ.get("SUBAREA_EXTENT")) > 0 

197 else None, 

198 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]), 

199 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]), 

200 "rose_datac": os.environ["ROSE_DATAC"], 

201 } 

202 logging.debug("Environment variables loaded: %s", variables) 

203 return variables 

204 

205 

206def _template_file_path( 

207 raw_path: str, 

208 date_type: Literal["validity", "initiation"], 

209 data_time: datetime, 

210 forecast_length: timedelta, 

211 forecast_offset: timedelta, 

212 data_period: timedelta, 

213) -> list[str]: 

214 """Fill time placeholders to generate a file path to fetch.""" 

215 placeholder_times: list[datetime] = [] 

216 lead_times: list[timedelta] = [] 

217 match date_type: 

218 case "validity": 

219 date = data_time 

220 while date <= data_time + forecast_length: 

221 placeholder_times.append(date) 

222 date += data_period 

223 case "initiation": 

224 placeholder_times.append(data_time) 

225 lead_time = forecast_offset 

226 while lead_time <= forecast_length: 

227 lead_times.append(lead_time) 

228 lead_time += data_period 

229 case _: 

230 raise ValueError(f"Invalid date type: {date_type}") 

231 

232 paths: set[str] = set() 

233 for placeholder_time in placeholder_times: 

234 # Expand out all other format strings. 

235 path = placeholder_time.strftime(os.path.expandvars(raw_path)) 

236 if lead_times: 

237 # Expand out lead time format strings, %N. 

238 for lead_time in lead_times: 

239 # BUG: Will not respect escaped % signs, e.g: %%N. 

240 paths.add( 

241 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}") 

242 ) 

243 else: 

244 paths.add(path) 

245 return sorted(paths) 

246 

247 

248def fetch_data(file_retriever: FileRetrieverABC): 

249 """Fetch the data for a model. 

250 

251 The following environment variables need to be set: 

252 * ANALYSIS_OFFSET 

253 * ANALYSIS_LENGTH 

254 * CYLC_TASK_CYCLE_POINT 

255 * DATA_PATH 

256 * DATA_PERIOD 

257 * DATE_TYPE 

258 * MODEL_IDENTIFIER 

259 * ROSE_DATAC 

260 

261 Parameters 

262 ---------- 

263 file_retriever: FileRetriever 

264 FileRetriever implementation to use. 

265 

266 Raises 

267 ------ 

268 FileNotFound: 

269 If no files are found for the model, across all tried paths. 

270 """ 

271 v = _get_needed_environment_variables() 

272 

273 # Prepare output directory. 

274 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}" 

275 os.makedirs(cycle_data_dir, exist_ok=True) 

276 logging.debug("Output directory: %s", cycle_data_dir) 

277 

278 # Get file paths. 

279 paths = _template_file_path( 

280 v["raw_path"], 

281 v["date_type"], 

282 v["data_time"], 

283 v["forecast_length"], 

284 v["forecast_offset"], 

285 v["data_period"], 

286 ) 

287 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

288 

289 # Use file retriever to transfer data with multiple threads. 

290 with file_retriever() as retriever, ThreadPoolExecutor() as executor: 

291 files_found = executor.map( 

292 retriever.get_file, paths, itertools.repeat(cycle_data_dir) 

293 ) 

294 # Exhaust the iterator with list so all futures get resolved before we 

295 # exit the with block, ensuring all files are retrieved. 

296 any_files_found = any(list(files_found)) 

297 if not any_files_found: 

298 raise FileNotFoundError("No files found for model!") 

299 

300 

301def fetch_obs(obs_retriever: FileRetrieverABC): 

302 """Fetch the observations corresponding to a model run. 

303 

304 The following environment variables need to be set: 

305 * ANALYSIS_OFFSET 

306 * ANALYSIS_LENGTH 

307 * CYLC_TASK_CYCLE_POINT 

308 * DATA_PATH 

309 * DATA_PERIOD 

310 * DATE_TYPE 

311 * MODEL_IDENTIFIER 

312 * ROSE_DATAC 

313 

314 Parameters 

315 ---------- 

316 obs_retriever: ObsRetriever 

317 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever. 

318 

319 Raises 

320 ------ 

321 FileNotFound: 

322 If no observations are available. 

323 """ 

324 v = _get_needed_environment_variables_obs() 

325 

326 # Prepare output directory. 

327 cycle_obs_dir = f"{v['rose_datac']}/data/OBS" 

328 os.makedirs(cycle_obs_dir, exist_ok=True) 

329 logging.debug("Output directory: %s", cycle_obs_dir) 

330 

331 # We will get just one file for now, but follow the templating 

332 # syntax for the model for consistency. 

333 obs_base_path = ( 

334 v["subtype"] 

335 + "_" 

336 + "%Y%m%dT%H%MZ_dt_" 

337 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3) 

338 + ".nc" 

339 ) 

340 paths = _template_file_path( 

341 obs_base_path, 

342 "initiation", 

343 v["data_time"], 

344 v["forecast_length"], 

345 timedelta(seconds=0), 

346 v["obs_interval"], 

347 ) 

348 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

349 

350 # Use obs retriever to transfer data with multiple threads. 

351 # We shouldn't need to iterate as we do for the forecast data 

352 # because these files will be smaller. 

353 try: 

354 obs_retriever.get_file( 

355 paths[0], 

356 v["subtype"], 

357 v["obs_fields"], 

358 v["data_time"], 

359 v["obs_offset"], 

360 v["forecast_length"], 

361 v["obs_interval"], 

362 cycle_obs_dir, 

363 wmo_nmbrs=v["wmo_nmbrs"], 

364 subarea_extent=v["subarea_extent"], 

365 ) 

366 except Exception as exc: 

367 raise ValueError("No observations available.") from exc