Coverage for src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 21:08 +0000

1# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Retrieve the files from the filesystem for the current cycle point.""" 

16 

17import abc 

18import glob 

19import itertools 

20import logging 

21import os 

22import ssl 

23import urllib.parse 

24import urllib.request 

25from concurrent.futures import ThreadPoolExecutor 

26from datetime import datetime, timedelta 

27from pathlib import Path 

28from typing import Literal 

29 

30import isodate 

31 

32logging.basicConfig( 

33 level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" 

34) 

35 

36 

37class FileRetrieverABC(abc.ABC): 

38 """Abstract base class for retrieving files from a data source. 

39 

40 The `get_file` method must be defined. Optionally the __enter__ and __exit__ 

41 methods maybe be overridden to add setup or cleanup code. 

42 

43 The class is designed to be used as a context manager, so that resources can 

44 be cleaned up after the retrieval is complete. All the files of a model are 

45 retrieved within a single context manager block, within which the `get_file` 

46 method is called for each file path. 

47 """ 

48 

49 def __enter__(self) -> "FileRetrieverABC": 

50 """Initialise the file retriever.""" 

51 logging.debug("Initialising FileRetriever.") 

52 return self 

53 

54 def __exit__(self, exc_type, exc_value, traceback): 

55 """Clean up the file retriever.""" 

56 logging.debug("Tearing down FileRetriever.") 

57 

58 @abc.abstractmethod 

59 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover 

60 """Save a file from the data source to the output directory. 

61 

62 Not all of the given paths will exist, so FileNotFoundErrors should be 

63 logged, but not raised. 

64 

65 Implementations should be thread safe, as the method is called from 

66 multiple threads. 

67 

68 Parameters 

69 ---------- 

70 file_path: str 

71 Path of the file to copy on the data source. It may contain patterns 

72 like globs, which will be expanded in a system specific manner. 

73 output_dir: str 

74 Path to filesystem directory into which the file should be copied. 

75 

76 Returns 

77 ------- 

78 bool: 

79 True if files were transferred, otherwise False. 

80 """ 

81 raise NotImplementedError 

82 

83 

84class FilesystemFileRetriever(FileRetrieverABC): 

85 """Retrieve files from the filesystem.""" 

86 

87 def get_file(self, file_path: str, output_dir: str) -> bool: 

88 """Save a file from the filesystem to the output directory. 

89 

90 Parameters 

91 ---------- 

92 file_path: str 

93 Path of the file to copy on the filesystem. It may contain patterns 

94 like globs, which will be expanded in a system specific manner. 

95 output_dir: str 

96 Path to filesystem directory into which the file should be copied. 

97 

98 Returns 

99 ------- 

100 bool: 

101 True if files were transferred, otherwise False. 

102 """ 

103 file_paths = glob.glob(os.path.expanduser(file_path)) 

104 logging.debug("Copying files:\n%s", "\n".join(file_paths)) 

105 if not file_paths: 

106 logging.warning("file_path does not match any files: %s", file_path) 

107 any_files_copied = False 

108 for f in file_paths: 

109 file = Path(f) 

110 try: 

111 # We know file exists from glob. 

112 os.symlink(file.absolute(), f"{output_dir}/{file.name}") 

113 any_files_copied = True 

114 except OSError as err: 

115 logging.warning("Failed to copy %s, error: %s", file, err) 

116 return any_files_copied 

117 

118 

119class HTTPFileRetriever(FileRetrieverABC): 

120 """Retrieve files via HTTP.""" 

121 

122 def get_file(self, file_path: str, output_dir: str) -> bool: 

123 """Save a file from a HTTP address to the output directory. 

124 

125 Parameters 

126 ---------- 

127 file_path: str 

128 Path of the file to copy on MASS. It may contain patterns like 

129 globs, which will be expanded in a system specific manner. 

130 output_dir: str 

131 Path to filesystem directory into which the file should be copied. 

132 

133 Returns 

134 ------- 

135 bool: 

136 True if files were transferred, otherwise False. 

137 """ 

138 ctx = ssl.create_default_context() 

139 # Needed to enable compatibility with malformed iBoss TLS certificates. 

140 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT 

141 save_path = ( 

142 f"{output_dir.removesuffix('/')}/" 

143 + urllib.parse.urlparse(file_path).path.split("/")[-1] 

144 ) 

145 any_files_copied = False 

146 try: 

147 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response: 

148 with open(save_path, "wb") as fp: 

149 # Read in 1 MiB chunks so data needn't fit in memory. 

150 while data := response.read(1024 * 1024): 

151 fp.write(data) 

152 any_files_copied = True 

153 except OSError as err: 

154 logging.warning("Failed to retrieve %s, error: %s", file_path, err) 

155 return any_files_copied 

156 

157 

158def _get_needed_environment_variables() -> dict: 

159 """Load the needed variables from the environment.""" 

160 variables = { 

161 "raw_path": os.environ["DATA_PATH"], 

162 "date_type": os.environ["DATE_TYPE"], 

163 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]), 

164 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]), 

165 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]), 

166 "model_identifier": os.environ["MODEL_IDENTIFIER"], 

167 "rose_datac": os.environ["ROSE_DATAC"], 

168 } 

169 try: 

170 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"]) 

171 except KeyError: 

172 # Data period is not needed for initiation time. 

173 if variables["date_type"] != "initiation": 

174 raise 

175 variables["data_period"] = None 

176 logging.debug("Environment variables loaded: %s", variables) 

177 return variables 

178 

179 

180def _template_file_path( 

181 raw_path: str, 

182 date_type: Literal["validity", "initiation"], 

183 data_time: datetime, 

184 forecast_length: timedelta, 

185 forecast_offset: timedelta, 

186 data_period: timedelta, 

187) -> list[str]: 

188 """Fill time placeholders to generate a file path to fetch.""" 

189 placeholder_times: list[datetime] = [] 

190 lead_times: list[timedelta] = [] 

191 match date_type: 

192 case "validity": 

193 date = data_time 

194 while date < data_time + forecast_length: 

195 placeholder_times.append(date) 

196 date += data_period 

197 case "initiation": 

198 placeholder_times.append(data_time) 

199 lead_time = forecast_offset 

200 while lead_time < forecast_length: 

201 lead_times.append(lead_time) 

202 lead_time += data_period 

203 case _: 

204 raise ValueError(f"Invalid date type: {date_type}") 

205 

206 paths: set[str] = set() 

207 for placeholder_time in placeholder_times: 

208 # Expand out all other format strings. 

209 path = placeholder_time.strftime(os.path.expandvars(raw_path)) 

210 if lead_times: 

211 # Expand out lead time format strings, %N. 

212 for lead_time in lead_times: 

213 # BUG: Will not respect escaped % signs, e.g: %%N. 

214 paths.add( 

215 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}") 

216 ) 

217 else: 

218 paths.add(path) 

219 return sorted(paths) 

220 

221 

222def fetch_data(file_retriever: FileRetrieverABC): 

223 """Fetch the data for a model. 

224 

225 The following environment variables need to be set: 

226 * ANALYSIS_OFFSET 

227 * ANALYSIS_LENGTH 

228 * CYLC_TASK_CYCLE_POINT 

229 * DATA_PATH 

230 * DATA_PERIOD 

231 * DATE_TYPE 

232 * MODEL_IDENTIFIER 

233 * ROSE_DATAC 

234 

235 Parameters 

236 ---------- 

237 file_retriever: FileRetriever 

238 FileRetriever implementation to use. 

239 

240 Raises 

241 ------ 

242 FileNotFound: 

243 If no files are found for the model, across all tried paths. 

244 """ 

245 v = _get_needed_environment_variables() 

246 

247 # Prepare output directory. 

248 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}" 

249 os.makedirs(cycle_data_dir, exist_ok=True) 

250 logging.debug("Output directory: %s", cycle_data_dir) 

251 

252 # Get file paths. 

253 paths = _template_file_path( 

254 v["raw_path"], 

255 v["date_type"], 

256 v["data_time"], 

257 v["forecast_length"], 

258 v["forecast_offset"], 

259 v["data_period"], 

260 ) 

261 logging.info("Retrieving paths:\n%s", "\n".join(paths)) 

262 

263 # Use file retriever to transfer data with multiple threads. 

264 with file_retriever() as retriever, ThreadPoolExecutor() as executor: 

265 files_found = any( 

266 executor.map(retriever.get_file, paths, itertools.repeat(cycle_data_dir)) 

267 ) 

268 # We don't need to exhaust the iterator, as all futures are submitted 

269 # before map yields anything. Therefore they will all be resolved upon 

270 # exiting the with block. 

271 if not files_found: 

272 raise FileNotFoundError("No files found for model!")