Coverage for src/CSET/operators/__init__.py: 89%

102 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-01 10:31 +0000

1# Copyright 2022-2023 Met Office and contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import warnings 

22from pathlib import Path 

23from typing import Union 

24 

25from iris import FUTURE 

26 

27# Import operators here so they are exported for use by recipes. 

28import CSET.operators 

29from CSET._common import parse_recipe 

30from CSET.operators import ( 

31 aggregate, 

32 collapse, 

33 constraints, 

34 convection, 

35 filters, 

36 misc, 

37 plot, 

38 read, 

39 regrid, 

40 write, 

41) 

42 

43# Exported operators & functions to use elsewhere. 

44__all__ = [ 

45 "aggregate", 

46 "collapse", 

47 "constraints", 

48 "convection", 

49 "execute_recipe_parallel", 

50 "execute_recipe_collate", 

51 "filters", 

52 "get_operator", 

53 "misc", 

54 "plot", 

55 "read", 

56 "regrid", 

57 "write", 

58] 

59 

60# Stop iris giving a warning whenever it loads something. 

61FUTURE.datum_support = True 

62# Stop iris giving a warning whenever it saves something. 

63FUTURE.save_split_attrs = True 

64 

65 

66def get_operator(name: str): 

67 """Get an operator by its name. 

68 

69 Parameters 

70 ---------- 

71 name: str 

72 The name of the desired operator. 

73 

74 Returns 

75 ------- 

76 function 

77 The named operator. 

78 

79 Raises 

80 ------ 

81 ValueError 

82 If name is not an operator. 

83 

84 Examples 

85 -------- 

86 >>> CSET.operators.get_operator("read.read_cubes") 

87 <function read_cubes at 0x7fcf9353c8b0> 

88 """ 

89 logging.debug("get_operator(%s)", name) 

90 try: 

91 name_sections = name.split(".") 

92 operator = CSET.operators 

93 for section in name_sections: 

94 operator = getattr(operator, section) 

95 if callable(operator): 

96 return operator 

97 else: 

98 raise AttributeError 

99 except (AttributeError, TypeError) as err: 

100 raise ValueError(f"Unknown operator: {name}") from err 

101 

102 

103def _write_metadata(recipe: dict): 

104 """Write a meta.json file in the CWD.""" 

105 # TODO: Investigate whether we might be better served by an SQLite database. 

106 metadata = recipe.copy() 

107 # Remove steps, as not needed, and might contain non-serialisable types. 

108 metadata.pop("parallel", None) 

109 metadata.pop("steps", None) 

110 metadata.pop("collate", None) 

111 metadata.pop("post-steps", None) 

112 with open("meta.json", "wt", encoding="UTF-8") as fp: 

113 json.dump(metadata, fp) 

114 os.sync() 

115 # Stat directory to force NFS to synchronise metadata. 

116 os.stat(Path.cwd()) 

117 

118 

119def _step_parser(step: dict, step_input: any) -> str: 

120 """Execute a recipe step, recursively executing any sub-steps.""" 

121 logging.debug("Executing step: %s", step) 

122 kwargs = {} 

123 for key in step.keys(): 

124 if key == "operator": 

125 operator = get_operator(step["operator"]) 

126 logging.info("operator: %s", step["operator"]) 

127 elif isinstance(step[key], dict) and "operator" in step[key]: 

128 logging.debug("Recursing into argument: %s", key) 

129 kwargs[key] = _step_parser(step[key], step_input) 

130 else: 

131 kwargs[key] = step[key] 

132 logging.debug("args: %s", kwargs) 

133 logging.debug("step_input: %s", step_input) 

134 # If first argument of operator is explicitly defined, use that rather 

135 # than step_input. This is known through introspection of the operator. 

136 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

137 logging.debug("first_arg: %s", first_arg) 

138 if first_arg not in kwargs: 

139 logging.debug("first_arg not in kwargs, using step_input.") 

140 return operator(step_input, **kwargs) 

141 else: 

142 logging.debug("first_arg in kwargs.") 

143 return operator(**kwargs) 

144 

145 

146def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path): 

147 """Execute the steps in a recipe.""" 

148 original_working_directory = Path.cwd() 

149 os.chdir(output_directory) 

150 try: 

151 logger = logging.getLogger() 

152 diagnostic_log = logging.FileHandler( 

153 filename="CSET.log", mode="w", encoding="UTF-8" 

154 ) 

155 diagnostic_log.addFilter(lambda record: record.levelno >= logging.INFO) 

156 diagnostic_log.setFormatter( 

157 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 

158 ) 

159 logger.addHandler(diagnostic_log) 

160 # Create metadata file used by some steps. 

161 if style_file: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 recipe["style_file_path"] = str(style_file) 

163 _write_metadata(recipe) 

164 # Execute the recipe. 

165 for step in steps: 

166 step_input = _step_parser(step, step_input) 

167 logging.info("Recipe output:\n%s", step_input) 

168 finally: 

169 os.chdir(original_working_directory) 

170 

171 

172def execute_recipe_parallel( 

173 recipe_yaml: Union[Path, str], 

174 input_directory: Path, 

175 output_directory: Path, 

176 recipe_variables: dict = None, 

177 style_file: Path = None, 

178) -> None: 

179 """Parse and executes the parallel steps from a recipe file. 

180 

181 Parameters 

182 ---------- 

183 recipe_yaml: Path or str 

184 Path to a file containing, or string of, a recipe's YAML describing the 

185 operators that need running. If a Path is provided it is opened and 

186 read. 

187 input_file: Path 

188 Pathlike to netCDF (or something else that iris read) file to be used as 

189 input. 

190 output_directory: Path 

191 Pathlike indicating desired location of output. 

192 recipe_variables: dict 

193 Dictionary of variables for the recipe. 

194 

195 Raises 

196 ------ 

197 FileNotFoundError 

198 The recipe or input file cannot be found. 

199 FileExistsError 

200 The output directory as actually a file. 

201 ValueError 

202 The recipe is not well formed. 

203 TypeError 

204 The provided recipe is not a stream or Path. 

205 """ 

206 if recipe_variables is None: 

207 recipe_variables = {} 

208 recipe = parse_recipe(recipe_yaml, recipe_variables) 

209 step_input = Path(input_directory).absolute() 

210 # Create output directory, and an inter-cycle intermediate directory. 

211 try: 

212 (output_directory / "intermediate").mkdir(parents=True, exist_ok=True) 

213 except (FileExistsError, NotADirectoryError) as err: 

214 logging.error("Output directory is a file. %s", output_directory) 

215 raise err 

216 # If parallel doesn't exist try steps. 

217 try: 

218 steps = recipe["parallel"] 

219 except KeyError: 

220 if "steps" in recipe: 

221 warnings.warn( 

222 "'steps' recipe key is deprecated. Use 'parallel' instead.", 

223 DeprecationWarning, 

224 stacklevel=1, 

225 ) 

226 steps = recipe["steps"] 

227 _run_steps(recipe, steps, step_input, output_directory, style_file) 

228 

229 

230def execute_recipe_collate( 

231 recipe_yaml: Union[Path, str], 

232 output_directory: Path, 

233 recipe_variables: dict = None, 

234 style_file: Path = None, 

235) -> None: 

236 """Parse and execute the collation steps from a recipe file. 

237 

238 Parameters 

239 ---------- 

240 recipe_yaml: Path or str 

241 Path to a file containing, or string of, a recipe's YAML describing the 

242 operators that need running. If a Path is provided it is opened and 

243 read. 

244 output_directory: Path 

245 Pathlike indicating desired location of output. Must already exist. 

246 recipe_variables: dict 

247 Dictionary of variables for the recipe. 

248 

249 Raises 

250 ------ 

251 ValueError 

252 The recipe is not well formed. 

253 TypeError 

254 The provided recipe is not a stream or Path. 

255 """ 

256 if recipe_variables is None: 

257 recipe_variables = {} 

258 output_directory = Path(output_directory).resolve() 

259 assert output_directory.is_dir() 

260 recipe = parse_recipe(recipe_yaml, recipe_variables) 

261 # If collate doesn't exist try post-steps, else treat it as having no steps. 

262 try: 

263 steps = recipe["collate"] 

264 except KeyError: 

265 if "post-steps" in recipe: 

266 warnings.warn( 

267 "'post-steps' recipe key is deprecated. Use 'collate' instead.", 

268 DeprecationWarning, 

269 stacklevel=1, 

270 ) 

271 steps = recipe.get("post-steps", tuple()) 

272 _run_steps(recipe, steps, output_directory, output_directory, style_file)