Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 90%
115 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-17 11:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-17 11:22 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Retrieve the files from the filesystem for the current cycle point."""
17import abc
18import ast
19import glob
20import itertools
21import logging
22import os
23import ssl
24import sys
25import urllib.parse
26import urllib.request
27from concurrent.futures import ThreadPoolExecutor
28from datetime import datetime, timedelta
29from pathlib import Path
30from typing import Literal
32import isodate
34logging.basicConfig(
35 level=os.getenv("LOGLEVEL", "INFO"),
36 format="%(asctime)s %(levelname)s %(message)s",
37 stream=sys.stdout,
38)
41class FileRetrieverABC(abc.ABC):
42 """Abstract base class for retrieving files from a data source.
44 The `get_file` method must be defined. Optionally the __enter__ and __exit__
45 methods maybe be overridden to add setup or cleanup code.
47 The class is designed to be used as a context manager, so that resources can
48 be cleaned up after the retrieval is complete. All the files of a model are
49 retrieved within a single context manager block, within which the `get_file`
50 method is called for each file path.
51 """
53 def __enter__(self) -> "FileRetrieverABC":
54 """Initialise the file retriever."""
55 logging.debug("Initialising FileRetriever.")
56 return self
58 def __exit__(self, exc_type, exc_value, traceback):
59 """Clean up the file retriever."""
60 logging.debug("Tearing down FileRetriever.")
62 @abc.abstractmethod
63 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover
64 """Save a file from the data source to the output directory.
66 Not all of the given paths will exist, so FileNotFoundErrors should be
67 logged, but not raised.
69 Implementations should be thread safe, as the method is called from
70 multiple threads.
72 Parameters
73 ----------
74 file_path: str
75 Path of the file to copy on the data source. It may contain patterns
76 like globs, which will be expanded in a system specific manner.
77 output_dir: str
78 Path to filesystem directory into which the file should be copied.
80 Returns
81 -------
82 bool:
83 True if files were transferred, otherwise False.
84 """
85 raise NotImplementedError
88class FilesystemFileRetriever(FileRetrieverABC):
89 """Retrieve files from the filesystem."""
91 def get_file(self, file_path: str, output_dir: str) -> bool:
92 """Save a file from the filesystem to the output directory.
94 Parameters
95 ----------
96 file_path: str
97 Path of the file to copy on the filesystem. It may contain patterns
98 like globs, which will be expanded in a system specific manner.
99 output_dir: str
100 Path to filesystem directory into which the file should be copied.
102 Returns
103 -------
104 bool:
105 True if files were transferred, otherwise False.
106 """
107 file_paths = glob.glob(os.path.expanduser(file_path))
108 logging.debug("Copying files:\n%s", "\n".join(file_paths))
109 if not file_paths:
110 logging.warning("file_path does not match any files: %s", file_path)
111 any_files_copied = False
112 for f in file_paths:
113 file = Path(f)
114 try:
115 # We know file exists from glob.
116 os.symlink(file.absolute(), f"{output_dir}/{file.name}")
117 any_files_copied = True
118 except OSError as err:
119 logging.warning("Failed to copy %s, error: %s", file, err)
120 return any_files_copied
123class HTTPFileRetriever(FileRetrieverABC):
124 """Retrieve files via HTTP."""
126 def get_file(self, file_path: str, output_dir: str) -> bool:
127 """Save a file from a HTTP address to the output directory.
129 Parameters
130 ----------
131 file_path: str
132 Path of the file to copy on MASS. It may contain patterns like
133 globs, which will be expanded in a system specific manner.
134 output_dir: str
135 Path to filesystem directory into which the file should be copied.
137 Returns
138 -------
139 bool:
140 True if files were transferred, otherwise False.
141 """
142 ctx = ssl.create_default_context()
143 # Needed to enable compatibility with malformed iBoss TLS certificates.
144 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
145 save_path = (
146 f"{output_dir.removesuffix('/')}/"
147 + urllib.parse.urlparse(file_path).path.split("/")[-1]
148 )
149 any_files_copied = False
150 try:
151 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response:
152 with open(save_path, "wb") as fp:
153 # Read in 1 MiB chunks so data needn't fit in memory.
154 while data := response.read(1024 * 1024):
155 fp.write(data)
156 any_files_copied = True
157 except OSError as err:
158 logging.warning("Failed to retrieve %s, error: %s", file_path, err)
159 return any_files_copied
162def _get_needed_environment_variables() -> dict:
163 """Load the needed variables from the environment."""
164 variables = {
165 "raw_path": os.environ["DATA_PATH"],
166 "date_type": os.environ["DATE_TYPE"],
167 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
168 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
169 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]),
170 "model_identifier": os.environ["MODEL_IDENTIFIER"],
171 "rose_datac": os.environ["ROSE_DATAC"],
172 }
173 try:
174 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"])
175 except KeyError:
176 # Data period is not needed for initiation time.
177 if variables["date_type"] != "initiation":
178 raise
179 variables["data_period"] = None
180 logging.debug("Environment variables loaded: %s", variables)
181 return variables
184def _get_needed_environment_variables_obs() -> dict:
185 """Load the needed variables from the environment."""
186 variables = {
187 "subtype": os.environ["OBS_SUBTYPE"],
188 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
189 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
190 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]),
191 "model_identifier": "OBS",
192 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS"))
193 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0
194 else None,
195 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT"))
196 if len(os.environ.get("SUBAREA_EXTENT")) > 0
197 else None,
198 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]),
199 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]),
200 "rose_datac": os.environ["ROSE_DATAC"],
201 }
202 logging.debug("Environment variables loaded: %s", variables)
203 return variables
206def _template_file_path(
207 raw_path: str,
208 date_type: Literal["validity", "initiation"],
209 data_time: datetime,
210 forecast_length: timedelta,
211 forecast_offset: timedelta,
212 data_period: timedelta,
213) -> list[str]:
214 """Fill time placeholders to generate a file path to fetch."""
215 placeholder_times: list[datetime] = []
216 lead_times: list[timedelta] = []
217 match date_type:
218 case "validity":
219 date = data_time
220 while date <= data_time + forecast_length:
221 placeholder_times.append(date)
222 date += data_period
223 case "initiation":
224 placeholder_times.append(data_time)
225 lead_time = forecast_offset
226 while lead_time <= forecast_length:
227 lead_times.append(lead_time)
228 lead_time += data_period
229 case _:
230 raise ValueError(f"Invalid date type: {date_type}")
232 paths: set[str] = set()
233 for placeholder_time in placeholder_times:
234 # Expand out all other format strings.
235 path = placeholder_time.strftime(os.path.expandvars(raw_path))
236 if lead_times:
237 # Expand out lead time format strings, %N.
238 for lead_time in lead_times:
239 # BUG: Will not respect escaped % signs, e.g: %%N.
240 paths.add(
241 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}")
242 )
243 else:
244 paths.add(path)
245 return sorted(paths)
248def fetch_data(file_retriever: FileRetrieverABC):
249 """Fetch the data for a model.
251 The following environment variables need to be set:
252 * ANALYSIS_OFFSET
253 * ANALYSIS_LENGTH
254 * CYLC_TASK_CYCLE_POINT
255 * DATA_PATH
256 * DATA_PERIOD
257 * DATE_TYPE
258 * MODEL_IDENTIFIER
259 * ROSE_DATAC
261 Parameters
262 ----------
263 file_retriever: FileRetriever
264 FileRetriever implementation to use.
266 Raises
267 ------
268 FileNotFound:
269 If no files are found for the model, across all tried paths.
270 """
271 v = _get_needed_environment_variables()
273 # Prepare output directory.
274 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}"
275 os.makedirs(cycle_data_dir, exist_ok=True)
276 logging.debug("Output directory: %s", cycle_data_dir)
278 # Get file paths.
279 paths = _template_file_path(
280 v["raw_path"],
281 v["date_type"],
282 v["data_time"],
283 v["forecast_length"],
284 v["forecast_offset"],
285 v["data_period"],
286 )
287 logging.info("Retrieving paths:\n%s", "\n".join(paths))
289 # Use file retriever to transfer data with multiple threads.
290 with file_retriever() as retriever, ThreadPoolExecutor() as executor:
291 files_found = executor.map(
292 retriever.get_file, paths, itertools.repeat(cycle_data_dir)
293 )
294 # Exhaust the iterator with list so all futures get resolved before we
295 # exit the with block, ensuring all files are retrieved.
296 any_files_found = any(list(files_found))
297 if not any_files_found:
298 raise FileNotFoundError("No files found for model!")
301def fetch_obs(obs_retriever: FileRetrieverABC):
302 """Fetch the observations corresponding to a model run.
304 The following environment variables need to be set:
305 * ANALYSIS_OFFSET
306 * ANALYSIS_LENGTH
307 * CYLC_TASK_CYCLE_POINT
308 * DATA_PATH
309 * DATA_PERIOD
310 * DATE_TYPE
311 * MODEL_IDENTIFIER
312 * ROSE_DATAC
314 Parameters
315 ----------
316 obs_retriever: ObsRetriever
317 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever.
319 Raises
320 ------
321 FileNotFound:
322 If no observations are available.
323 """
324 v = _get_needed_environment_variables_obs()
326 # Prepare output directory.
327 cycle_obs_dir = f"{v['rose_datac']}/data/OBS"
328 os.makedirs(cycle_obs_dir, exist_ok=True)
329 logging.debug("Output directory: %s", cycle_obs_dir)
331 # We will get just one file for now, but follow the templating
332 # syntax for the model for consistency.
333 obs_base_path = (
334 v["subtype"]
335 + "_"
336 + "%Y%m%dT%H%MZ_dt_"
337 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3)
338 + ".nc"
339 )
340 paths = _template_file_path(
341 obs_base_path,
342 "initiation",
343 v["data_time"],
344 v["forecast_length"],
345 timedelta(seconds=0),
346 v["obs_interval"],
347 )
348 logging.info("Retrieving paths:\n%s", "\n".join(paths))
350 # Use obs retriever to transfer data with multiple threads.
351 # We shouldn't need to iterate as we do for the forecast data
352 # because these files will be smaller.
353 try:
354 obs_retriever.get_file(
355 paths[0],
356 v["subtype"],
357 v["obs_fields"],
358 v["data_time"],
359 v["obs_offset"],
360 v["forecast_length"],
361 v["obs_interval"],
362 cycle_obs_dir,
363 wmo_nmbrs=v["wmo_nmbrs"],
364 subarea_extent=v["subarea_extent"],
365 )
366 except Exception as exc:
367 raise ValueError("No observations available.") from exc