Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 90%
114 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 12:47 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 12:47 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Retrieve the files from the filesystem for the current cycle point."""
17import abc
18import ast
19import glob
20import itertools
21import logging
22import os
23import ssl
24import urllib.parse
25import urllib.request
26from concurrent.futures import ThreadPoolExecutor
27from datetime import datetime, timedelta
28from pathlib import Path
29from typing import Literal
31import isodate
33logging.basicConfig(
34 level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s"
35)
38class FileRetrieverABC(abc.ABC):
39 """Abstract base class for retrieving files from a data source.
41 The `get_file` method must be defined. Optionally the __enter__ and __exit__
42 methods maybe be overridden to add setup or cleanup code.
44 The class is designed to be used as a context manager, so that resources can
45 be cleaned up after the retrieval is complete. All the files of a model are
46 retrieved within a single context manager block, within which the `get_file`
47 method is called for each file path.
48 """
50 def __enter__(self) -> "FileRetrieverABC":
51 """Initialise the file retriever."""
52 logging.debug("Initialising FileRetriever.")
53 return self
55 def __exit__(self, exc_type, exc_value, traceback):
56 """Clean up the file retriever."""
57 logging.debug("Tearing down FileRetriever.")
59 @abc.abstractmethod
60 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover
61 """Save a file from the data source to the output directory.
63 Not all of the given paths will exist, so FileNotFoundErrors should be
64 logged, but not raised.
66 Implementations should be thread safe, as the method is called from
67 multiple threads.
69 Parameters
70 ----------
71 file_path: str
72 Path of the file to copy on the data source. It may contain patterns
73 like globs, which will be expanded in a system specific manner.
74 output_dir: str
75 Path to filesystem directory into which the file should be copied.
77 Returns
78 -------
79 bool:
80 True if files were transferred, otherwise False.
81 """
82 raise NotImplementedError
85class FilesystemFileRetriever(FileRetrieverABC):
86 """Retrieve files from the filesystem."""
88 def get_file(self, file_path: str, output_dir: str) -> bool:
89 """Save a file from the filesystem to the output directory.
91 Parameters
92 ----------
93 file_path: str
94 Path of the file to copy on the filesystem. It may contain patterns
95 like globs, which will be expanded in a system specific manner.
96 output_dir: str
97 Path to filesystem directory into which the file should be copied.
99 Returns
100 -------
101 bool:
102 True if files were transferred, otherwise False.
103 """
104 file_paths = glob.glob(os.path.expanduser(file_path))
105 logging.debug("Copying files:\n%s", "\n".join(file_paths))
106 if not file_paths:
107 logging.warning("file_path does not match any files: %s", file_path)
108 any_files_copied = False
109 for f in file_paths:
110 file = Path(f)
111 try:
112 # We know file exists from glob.
113 os.symlink(file.absolute(), f"{output_dir}/{file.name}")
114 any_files_copied = True
115 except OSError as err:
116 logging.warning("Failed to copy %s, error: %s", file, err)
117 return any_files_copied
120class HTTPFileRetriever(FileRetrieverABC):
121 """Retrieve files via HTTP."""
123 def get_file(self, file_path: str, output_dir: str) -> bool:
124 """Save a file from a HTTP address to the output directory.
126 Parameters
127 ----------
128 file_path: str
129 Path of the file to copy on MASS. It may contain patterns like
130 globs, which will be expanded in a system specific manner.
131 output_dir: str
132 Path to filesystem directory into which the file should be copied.
134 Returns
135 -------
136 bool:
137 True if files were transferred, otherwise False.
138 """
139 ctx = ssl.create_default_context()
140 # Needed to enable compatibility with malformed iBoss TLS certificates.
141 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
142 save_path = (
143 f"{output_dir.removesuffix('/')}/"
144 + urllib.parse.urlparse(file_path).path.split("/")[-1]
145 )
146 any_files_copied = False
147 try:
148 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response:
149 with open(save_path, "wb") as fp:
150 # Read in 1 MiB chunks so data needn't fit in memory.
151 while data := response.read(1024 * 1024):
152 fp.write(data)
153 any_files_copied = True
154 except OSError as err:
155 logging.warning("Failed to retrieve %s, error: %s", file_path, err)
156 return any_files_copied
159def _get_needed_environment_variables() -> dict:
160 """Load the needed variables from the environment."""
161 variables = {
162 "raw_path": os.environ["DATA_PATH"],
163 "date_type": os.environ["DATE_TYPE"],
164 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
165 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
166 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]),
167 "model_identifier": os.environ["MODEL_IDENTIFIER"],
168 "rose_datac": os.environ["ROSE_DATAC"],
169 }
170 try:
171 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"])
172 except KeyError:
173 # Data period is not needed for initiation time.
174 if variables["date_type"] != "initiation":
175 raise
176 variables["data_period"] = None
177 logging.debug("Environment variables loaded: %s", variables)
178 return variables
181def _get_needed_environment_variables_obs() -> dict:
182 """Load the needed variables from the environment."""
183 variables = {
184 "subtype": os.environ["OBS_SUBTYPE"],
185 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
186 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
187 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]),
188 "model_identifier": "OBS",
189 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS"))
190 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0
191 else None,
192 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT"))
193 if len(os.environ.get("SUBAREA_EXTENT")) > 0
194 else None,
195 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]),
196 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]),
197 "rose_datac": os.environ["ROSE_DATAC"],
198 }
199 logging.debug("Environment variables loaded: %s", variables)
200 return variables
203def _template_file_path(
204 raw_path: str,
205 date_type: Literal["validity", "initiation"],
206 data_time: datetime,
207 forecast_length: timedelta,
208 forecast_offset: timedelta,
209 data_period: timedelta,
210) -> list[str]:
211 """Fill time placeholders to generate a file path to fetch."""
212 placeholder_times: list[datetime] = []
213 lead_times: list[timedelta] = []
214 match date_type:
215 case "validity":
216 date = data_time
217 while date < data_time + forecast_length:
218 placeholder_times.append(date)
219 date += data_period
220 case "initiation":
221 placeholder_times.append(data_time)
222 lead_time = forecast_offset
223 while lead_time < forecast_length:
224 lead_times.append(lead_time)
225 lead_time += data_period
226 case _:
227 raise ValueError(f"Invalid date type: {date_type}")
229 paths: set[str] = set()
230 for placeholder_time in placeholder_times:
231 # Expand out all other format strings.
232 path = placeholder_time.strftime(os.path.expandvars(raw_path))
233 if lead_times:
234 # Expand out lead time format strings, %N.
235 for lead_time in lead_times:
236 # BUG: Will not respect escaped % signs, e.g: %%N.
237 paths.add(
238 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}")
239 )
240 else:
241 paths.add(path)
242 return sorted(paths)
245def fetch_data(file_retriever: FileRetrieverABC):
246 """Fetch the data for a model.
248 The following environment variables need to be set:
249 * ANALYSIS_OFFSET
250 * ANALYSIS_LENGTH
251 * CYLC_TASK_CYCLE_POINT
252 * DATA_PATH
253 * DATA_PERIOD
254 * DATE_TYPE
255 * MODEL_IDENTIFIER
256 * ROSE_DATAC
258 Parameters
259 ----------
260 file_retriever: FileRetriever
261 FileRetriever implementation to use.
263 Raises
264 ------
265 FileNotFound:
266 If no files are found for the model, across all tried paths.
267 """
268 v = _get_needed_environment_variables()
270 # Prepare output directory.
271 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}"
272 os.makedirs(cycle_data_dir, exist_ok=True)
273 logging.debug("Output directory: %s", cycle_data_dir)
275 # Get file paths.
276 paths = _template_file_path(
277 v["raw_path"],
278 v["date_type"],
279 v["data_time"],
280 v["forecast_length"],
281 v["forecast_offset"],
282 v["data_period"],
283 )
284 logging.info("Retrieving paths:\n%s", "\n".join(paths))
286 # Use file retriever to transfer data with multiple threads.
287 with file_retriever() as retriever, ThreadPoolExecutor() as executor:
288 files_found = executor.map(
289 retriever.get_file, paths, itertools.repeat(cycle_data_dir)
290 )
291 # Exhaust the iterator with list so all futures get resolved before we
292 # exit the with block, ensuring all files are retrieved.
293 any_files_found = any(list(files_found))
294 if not any_files_found:
295 raise FileNotFoundError("No files found for model!")
298def fetch_obs(obs_retriever: FileRetrieverABC):
299 """Fetch the observations corresponding to a model run.
301 The following environment variables need to be set:
302 * ANALYSIS_OFFSET
303 * ANALYSIS_LENGTH
304 * CYLC_TASK_CYCLE_POINT
305 * DATA_PATH
306 * DATA_PERIOD
307 * DATE_TYPE
308 * MODEL_IDENTIFIER
309 * ROSE_DATAC
311 Parameters
312 ----------
313 obs_retriever: ObsRetriever
314 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever.
316 Raises
317 ------
318 FileNotFound:
319 If no observations are available.
320 """
321 v = _get_needed_environment_variables_obs()
323 # Prepare output directory.
324 cycle_obs_dir = f"{v['rose_datac']}/data/OBS"
325 os.makedirs(cycle_obs_dir, exist_ok=True)
326 logging.debug("Output directory: %s", cycle_obs_dir)
328 # We will get just one file for now, but follow the templating
329 # syntax for the model for consistency.
330 obs_base_path = (
331 v["subtype"]
332 + "_"
333 + "%Y%m%dT%H%MZ_dt_"
334 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3)
335 + ".nc"
336 )
337 paths = _template_file_path(
338 obs_base_path,
339 "initiation",
340 v["data_time"],
341 v["forecast_length"],
342 timedelta(seconds=0),
343 v["obs_interval"],
344 )
345 logging.info("Retrieving paths:\n%s", "\n".join(paths))
347 # Use obs retriever to transfer data with multiple threads.
348 # We shouldn't need to iterate as we do for the forecast data
349 # because these files will be smaller.
350 try:
351 obs_retriever.get_file(
352 paths[0],
353 v["subtype"],
354 v["obs_fields"],
355 v["data_time"],
356 v["obs_offset"],
357 v["forecast_length"],
358 v["obs_interval"],
359 cycle_obs_dir,
360 wmo_nmbrs=v["wmo_nmbrs"],
361 subarea_extent=v["subarea_extent"],
362 )
363 except Exception as exc:
364 raise ValueError("No observations available.") from exc