Coverage for src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py: 100%
96 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 21:08 +0000
1# © Crown copyright, Met Office (2022-2025) and CSET contributors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Retrieve the files from the filesystem for the current cycle point."""
17import abc
18import glob
19import itertools
20import logging
21import os
22import ssl
23import urllib.parse
24import urllib.request
25from concurrent.futures import ThreadPoolExecutor
26from datetime import datetime, timedelta
27from pathlib import Path
28from typing import Literal
30import isodate
32logging.basicConfig(
33 level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s"
34)
37class FileRetrieverABC(abc.ABC):
38 """Abstract base class for retrieving files from a data source.
40 The `get_file` method must be defined. Optionally the __enter__ and __exit__
41 methods maybe be overridden to add setup or cleanup code.
43 The class is designed to be used as a context manager, so that resources can
44 be cleaned up after the retrieval is complete. All the files of a model are
45 retrieved within a single context manager block, within which the `get_file`
46 method is called for each file path.
47 """
49 def __enter__(self) -> "FileRetrieverABC":
50 """Initialise the file retriever."""
51 logging.debug("Initialising FileRetriever.")
52 return self
54 def __exit__(self, exc_type, exc_value, traceback):
55 """Clean up the file retriever."""
56 logging.debug("Tearing down FileRetriever.")
58 @abc.abstractmethod
59 def get_file(self, file_path: str, output_dir: str) -> bool: # pragma: no cover
60 """Save a file from the data source to the output directory.
62 Not all of the given paths will exist, so FileNotFoundErrors should be
63 logged, but not raised.
65 Implementations should be thread safe, as the method is called from
66 multiple threads.
68 Parameters
69 ----------
70 file_path: str
71 Path of the file to copy on the data source. It may contain patterns
72 like globs, which will be expanded in a system specific manner.
73 output_dir: str
74 Path to filesystem directory into which the file should be copied.
76 Returns
77 -------
78 bool:
79 True if files were transferred, otherwise False.
80 """
81 raise NotImplementedError
84class FilesystemFileRetriever(FileRetrieverABC):
85 """Retrieve files from the filesystem."""
87 def get_file(self, file_path: str, output_dir: str) -> bool:
88 """Save a file from the filesystem to the output directory.
90 Parameters
91 ----------
92 file_path: str
93 Path of the file to copy on the filesystem. It may contain patterns
94 like globs, which will be expanded in a system specific manner.
95 output_dir: str
96 Path to filesystem directory into which the file should be copied.
98 Returns
99 -------
100 bool:
101 True if files were transferred, otherwise False.
102 """
103 file_paths = glob.glob(os.path.expanduser(file_path))
104 logging.debug("Copying files:\n%s", "\n".join(file_paths))
105 if not file_paths:
106 logging.warning("file_path does not match any files: %s", file_path)
107 any_files_copied = False
108 for f in file_paths:
109 file = Path(f)
110 try:
111 # We know file exists from glob.
112 os.symlink(file.absolute(), f"{output_dir}/{file.name}")
113 any_files_copied = True
114 except OSError as err:
115 logging.warning("Failed to copy %s, error: %s", file, err)
116 return any_files_copied
119class HTTPFileRetriever(FileRetrieverABC):
120 """Retrieve files via HTTP."""
122 def get_file(self, file_path: str, output_dir: str) -> bool:
123 """Save a file from a HTTP address to the output directory.
125 Parameters
126 ----------
127 file_path: str
128 Path of the file to copy on MASS. It may contain patterns like
129 globs, which will be expanded in a system specific manner.
130 output_dir: str
131 Path to filesystem directory into which the file should be copied.
133 Returns
134 -------
135 bool:
136 True if files were transferred, otherwise False.
137 """
138 ctx = ssl.create_default_context()
139 # Needed to enable compatibility with malformed iBoss TLS certificates.
140 ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
141 save_path = (
142 f"{output_dir.removesuffix('/')}/"
143 + urllib.parse.urlparse(file_path).path.split("/")[-1]
144 )
145 any_files_copied = False
146 try:
147 with urllib.request.urlopen(file_path, timeout=30, context=ctx) as response:
148 with open(save_path, "wb") as fp:
149 # Read in 1 MiB chunks so data needn't fit in memory.
150 while data := response.read(1024 * 1024):
151 fp.write(data)
152 any_files_copied = True
153 except OSError as err:
154 logging.warning("Failed to retrieve %s, error: %s", file_path, err)
155 return any_files_copied
158def _get_needed_environment_variables() -> dict:
159 """Load the needed variables from the environment."""
160 variables = {
161 "raw_path": os.environ["DATA_PATH"],
162 "date_type": os.environ["DATE_TYPE"],
163 "data_time": datetime.fromisoformat(os.environ["CYLC_TASK_CYCLE_POINT"]),
164 "forecast_length": isodate.parse_duration(os.environ["ANALYSIS_LENGTH"]),
165 "forecast_offset": isodate.parse_duration(os.environ["ANALYSIS_OFFSET"]),
166 "model_identifier": os.environ["MODEL_IDENTIFIER"],
167 "rose_datac": os.environ["ROSE_DATAC"],
168 }
169 try:
170 variables["data_period"] = isodate.parse_duration(os.environ["DATA_PERIOD"])
171 except KeyError:
172 # Data period is not needed for initiation time.
173 if variables["date_type"] != "initiation":
174 raise
175 variables["data_period"] = None
176 logging.debug("Environment variables loaded: %s", variables)
177 return variables
180def _template_file_path(
181 raw_path: str,
182 date_type: Literal["validity", "initiation"],
183 data_time: datetime,
184 forecast_length: timedelta,
185 forecast_offset: timedelta,
186 data_period: timedelta,
187) -> list[str]:
188 """Fill time placeholders to generate a file path to fetch."""
189 placeholder_times: list[datetime] = []
190 lead_times: list[timedelta] = []
191 match date_type:
192 case "validity":
193 date = data_time
194 while date < data_time + forecast_length:
195 placeholder_times.append(date)
196 date += data_period
197 case "initiation":
198 placeholder_times.append(data_time)
199 lead_time = forecast_offset
200 while lead_time < forecast_length:
201 lead_times.append(lead_time)
202 lead_time += data_period
203 case _:
204 raise ValueError(f"Invalid date type: {date_type}")
206 paths: set[str] = set()
207 for placeholder_time in placeholder_times:
208 # Expand out all other format strings.
209 path = placeholder_time.strftime(os.path.expandvars(raw_path))
210 if lead_times:
211 # Expand out lead time format strings, %N.
212 for lead_time in lead_times:
213 # BUG: Will not respect escaped % signs, e.g: %%N.
214 paths.add(
215 path.replace("%N", f"{int(lead_time.total_seconds()) // 3600:03d}")
216 )
217 else:
218 paths.add(path)
219 return sorted(paths)
222def fetch_data(file_retriever: FileRetrieverABC):
223 """Fetch the data for a model.
225 The following environment variables need to be set:
226 * ANALYSIS_OFFSET
227 * ANALYSIS_LENGTH
228 * CYLC_TASK_CYCLE_POINT
229 * DATA_PATH
230 * DATA_PERIOD
231 * DATE_TYPE
232 * MODEL_IDENTIFIER
233 * ROSE_DATAC
235 Parameters
236 ----------
237 file_retriever: FileRetriever
238 FileRetriever implementation to use.
240 Raises
241 ------
242 FileNotFound:
243 If no files are found for the model, across all tried paths.
244 """
245 v = _get_needed_environment_variables()
247 # Prepare output directory.
248 cycle_data_dir = f"{v['rose_datac']}/data/{v['model_identifier']}"
249 os.makedirs(cycle_data_dir, exist_ok=True)
250 logging.debug("Output directory: %s", cycle_data_dir)
252 # Get file paths.
253 paths = _template_file_path(
254 v["raw_path"],
255 v["date_type"],
256 v["data_time"],
257 v["forecast_length"],
258 v["forecast_offset"],
259 v["data_period"],
260 )
261 logging.info("Retrieving paths:\n%s", "\n".join(paths))
263 # Use file retriever to transfer data with multiple threads.
264 with file_retriever() as retriever, ThreadPoolExecutor() as executor:
265 files_found = any(
266 executor.map(retriever.get_file, paths, itertools.repeat(cycle_data_dir))
267 )
268 # We don't need to exhaust the iterator, as all futures are submitted
269 # before map yields anything. Therefore they will all be resolved upon
270 # exiting the with block.
271 if not files_found:
272 raise FileNotFoundError("No files found for model!")