Coverage for src / CSET / cset_workflow / app / fetch_fcst / bin / fetch_data.py: 79%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:48 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Retrieve the files from the filesystem for the current cycle point."""
17import abc
18import ast
19import glob
20import itertools
21import logging
22import os
23import ssl
24import sys
25import urllib.parse
26import urllib.request
27from concurrent.futures import ThreadPoolExecutor
28from datetime import datetime, timedelta
29from pathlib import Path
30from typing import Literal
32import isodate
34logging.basicConfig(
35 level=os.getenv("LOGLEVEL", "INFO"),
36 format="%(asctime)s %(levelname)s %(message)s",
37 stream=sys.stdout,
38)
41class FileRetrieverABC(abc.ABC):
42 """Abstract base class for retrieving files from a data source.
44 The `get_file` method must be defined. Optionally the __enter__ and __exit__
45 methods maybe be overridden to add setup or cleanup code.
47 The class is designed to be used as a context manager, so that resources can
48 be cleaned up after the retrieval is complete. All the files of a model are
49 retrieved within a single context manager block, within which the `get_file`
50 method is called for each file path.
51 """
53 def __enter__(self) -> "FileRetrieverABC":
54 """Initialise the file retriever."""
55 logging.debug("Initialising FileRetriever.")
56 return self
58 def __exit__(self, exc_type, exc_value, traceback):
59 """Clean up the file retriever."""
60 logging.debug("Tearing down FileRetriever.")
62 @abc.abstractmethod
63 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover
64 """Save a file from the data source to the output directory.
66 Not all of the given paths will exist, so FileNotFoundErrors should be
67 logged, but not raised.
69 Implementations should be thread safe, as the method is called from
70 multiple threads.
72 Parameters
73 ----------
74 file_path: str
75 Path of the file to copy on the data source. It may contain patterns
76 like globs, which will be expanded in a system specific manner.
77 output_dir: str
78 Path to filesystem directory into which the file should be copied.
80 Returns
81 -------
82 bool:
83 True if files were transferred, otherwise False.
84 """
85 raise NotImplementedError
88class FilesystemFileRetriever(FileRetrieverABC):
89 """Retrieve files from the filesystem."""
91 def get_file(self, file_path: str, output_dir: str) -> bool:
92 """Save a file from the filesystem to the output directory.
94 Parameters
95 ----------
96 file_path: str
97 Path of the file to copy on the filesystem. It may contain patterns
98 like globs, which will be expanded in a system specific manner.
99 output_dir: str
100 Path to filesystem directory into which the file should be copied.
102 Returns
103 -------
104 bool:
105 True if files were transferred, otherwise False.
106 """
107 file_paths = glob.glob(os.path.expanduser(file_path))
108 logging.debug("Copying files:\n%s", "\n".join(file_paths))
109 if not file_paths:
110 logging.warning("file_path does not match any files: %s", file_path)
111 any_files_copied = False
112 for f in file_paths:
113 file = Path(f).absolute()
114 try:
115 # Save to a filename derived from the full path, to
116 # differentiate similarly named files from different
117 # directories.
118 # `)` replaces `/` as it can be in file names.
119 os.symlink(file, f"{output_dir}/{')'.join(file.parts)}")
120 any_files_copied = True
121 except OSError as err:
122 logging.warning("Failed to copy %s, error: %s", file, err)
123 return any_files_copied
126class HTTPFileRetriever(FileRetrieverABC):
127 """Retrieve files via HTTP."""
129 def get_file(self, file_path: str, output_dir: str) -> bool:
130 """Save a file from a HTTP address to the output directory.
132 Parameters
133 ----------
134 file_path: str
135 Path of the file to copy on MASS. It may contain patterns like
136 globs, which will be expanded in a system specific manner.
137 output_dir: str
138 Path to filesystem directory into which the file should be copied.
140 Returns
141 -------
142 bool:
143 True if files were transferred, otherwise False.
144 """
145 ctx = ssl.create_default_context()
146 # Needed to enable compatibility with malformed iBoss TLS certificates.
147 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
148 save_path = (
149 f"{output_dir.removesuffix('/')}/"
150 + urllib.parse.urlparse(file_path).path.split("/")[-1]
151 )
152 any_files_copied = False
153 try:
154 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response:
155 with open(save_path, "wb") as fp:
156 # Read in 1 MiB chunks so data needn't fit in memory.
157 while data := response.read(1024 * 1024):
158 fp.write(data)
159 any_files_copied = True
160 except OSError as err:
161 logging.warning("Failed to retrieve %s, error: %s", file_path, err)
162 return any_files_copied
165def _get_needed_environment_variables() -> dict:
166 """Load the needed variables from the environment."""
167 variables = {
168 "raw_path": os.environ["DATA_PATH"],
169 "date_type": os.environ["DATE_TYPE"],
170 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
171 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
172 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]),
173 "model_identifier": os.environ["MODEL_IDENTIFIER"],
174 "rose_datac": os.environ["ROSE_DATAC"],
175 }
176 try:
177 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"])
178 except KeyError:
179 # Data period is not needed for initiation time.
180 if variables["date_type"] != "initiation":
181 raise
182 variables["data_period"] = None
183 logging.debug("Environment variables loaded: %s", variables)
184 return variables
187def _get_needed_environment_variables_obs() -> dict:
188 """Load the needed variables from the environment."""
189 variables = {
190 "subtype": os.environ["OBS_SUBTYPE"],
191 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
192 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
193 "obs_fields": ast.literal_eval(os.environ["SURFACE_SYNOP_FIELDS"]),
194 "model_identifier": "OBS",
195 "wmo_nmbrs": ast.literal_eval(os.environ.get("WMO_BLOCK_STTN_NMBRS"))
196 if len(os.environ.get("WMO_BLOCK_STTN_NMBRS")) > 0
197 else None,
198 "subarea_extent": ast.literal_eval(os.environ.get("SUBAREA_EXTENT"))
199 if len(os.environ.get("SUBAREA_EXTENT")) > 0
200 else None,
201 "obs_interval": isodate.parse_duration(os.environ["SURFACE_SYNOP_INTERVAL"]),
202 "obs_offset": isodate.parse_duration(os.environ["SURFACE_SYNOP_OFFSET"]),
203 "rose_datac": os.environ["ROSE_DATAC"],
204 }
205 logging.debug("Environment variables loaded: %s", variables)
206 return variables
209def _template_file_path(
210 raw_path: str,
211 date_type: Literal["validity", "initiation"],
212 data_time: datetime,
213 forecast_length: timedelta,
214 forecast_offset: timedelta,
215 data_period: timedelta,
216) -> list[str]:
217 """Fill time placeholders to generate a file path to fetch."""
218 placeholder_times: list[datetime] = []
219 lead_times: list[timedelta] = []
220 match date_type:
221 case "validity":
222 date = data_time
223 while date < data_time + forecast_length:
224 placeholder_times.append(date)
225 date += data_period
226 case "initiation":
227 placeholder_times.append(data_time)
228 lead_time = forecast_offset
229 while lead_time < forecast_length:
230 lead_times.append(lead_time)
231 lead_time += data_period
232 case _:
233 raise ValueError(f"Invalid date type: {date_type}")
235 paths: set[str] = set()
236 for placeholder_time in placeholder_times:
237 # Expand out all other format strings.
238 path = placeholder_time.strftime(os.path.expandvars(raw_path))
239 if lead_times:
240 # Expand out lead time format strings, %N.
241 for lead_time in lead_times:
242 # BUG: Will not respect escaped % signs, e.g: %%N.
243 paths.add(
244 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}")
245 )
246 else:
247 paths.add(path)
248 return sorted(paths)
251def fetch_data(file_retriever: FileRetrieverABC):
252 """Fetch the data for a model.
254 The following environment variables need to be set:
255 * ANALYSIS_OFFSET
256 * ANALYSIS_LENGTH
257 * CYLC_TASK_CYCLE_POINT
258 * DATA_PATH
259 * DATA_PERIOD
260 * DATE_TYPE
261 * MODEL_IDENTIFIER
262 * ROSE_DATAC
264 Parameters
265 ----------
266 file_retriever: FileRetriever
267 FileRetriever implementation to use.
269 Raises
270 ------
271 FileNotFound:
272 If no files are found for the model, across all tried paths.
273 """
274 v = _get_needed_environment_variables()
276 # Prepare output directory.
277 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}"
278 os.makedirs(cycle_data_dir, exist_ok=True)
279 logging.debug("Output directory: %s", cycle_data_dir)
281 # Get file paths.
282 paths = _template_file_path(
283 v["raw_path"],
284 v["date_type"],
285 v["data_time"],
286 v["forecast_length"],
287 v["forecast_offset"],
288 v["data_period"],
289 )
290 logging.info("Retrieving paths:\n%s", "\n".join(paths))
292 # Use file retriever to transfer data with multiple threads.
293 with file_retriever() as retriever, ThreadPoolExecutor() as executor:
294 files_found = executor.map(
295 retriever.get_file, paths, itertools.repeat(cycle_data_dir)
296 )
297 # Exhaust the iterator with list so all futures get resolved before we
298 # exit the with block, ensuring all files are retrieved.
299 any_files_found = any(list(files_found))
300 if not any_files_found:
301 raise FileNotFoundError("No files found for model!")
304def fetch_obs(obs_retriever: FileRetrieverABC):
305 """Fetch the observations corresponding to a model run.
307 The following environment variables need to be set:
308 * ANALYSIS_OFFSET
309 * ANALYSIS_LENGTH
310 * CYLC_TASK_CYCLE_POINT
311 * DATA_PATH
312 * DATA_PERIOD
313 * DATE_TYPE
314 * MODEL_IDENTIFIER
315 * ROSE_DATAC
317 Parameters
318 ----------
319 obs_retriever: ObsRetriever
320 ObsRetriever implementation to use. Defaults to FilesystemFileRetriever.
322 Raises
323 ------
324 FileNotFound:
325 If no observations are available.
326 """
327 v = _get_needed_environment_variables_obs()
329 # Prepare output directory.
330 cycle_obs_dir = f"{v['rose_datac']}/data/OBS"
331 os.makedirs(cycle_obs_dir, exist_ok=True)
332 logging.debug("Output directory: %s", cycle_obs_dir)
334 # We will get just one file for now, but follow the templating
335 # syntax for the model for consistency.
336 obs_base_path = (
337 v["subtype"]
338 + "_"
339 + "%Y%m%dT%H%MZ_dt_"
340 + str(int(v["forecast_length"].total_seconds() // 3600)).zfill(3)
341 + ".nc"
342 )
343 paths = _template_file_path(
344 obs_base_path,
345 "initiation",
346 v["data_time"],
347 v["forecast_length"],
348 timedelta(seconds=0),
349 v["obs_interval"],
350 )
351 logging.info("Retrieving paths:\n%s", "\n".join(paths))
353 # Use obs retriever to transfer data with multiple threads.
354 # We shouldn't need to iterate as we do for the forecast data
355 # because these files will be smaller.
356 try:
357 obs_retriever.get_file(
358 paths[0],
359 v["subtype"],
360 v["obs_fields"],
361 v["data_time"],
362 v["obs_offset"],
363 v["forecast_length"],
364 v["obs_interval"],
365 cycle_obs_dir,
366 wmo_nmbrs=v["wmo_nmbrs"],
367 subarea_extent=v["subarea_extent"],
368 )
369 except Exception as exc:
370 raise ValueError("No observations available.") from exc