Coverage for src/CSET/operators/__init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-01 15:05 +0000

1# Copyright 2022-2023 Met Office and contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21from pathlib import Path 

22from typing import Union 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET._common import parse_recipe 

29from CSET.operators import ( 

30 aggregate, 

31 collapse, 

32 constraints, 

33 convection, 

34 filters, 

35 misc, 

36 plot, 

37 read, 

38 regrid, 

39 write, 

40) 

41 

42# Exported operators & functions to use elsewhere. 

43__all__ = [ 

44 "aggregate", 

45 "collapse", 

46 "constraints", 

47 "convection", 

48 "execute_recipe_parallel", 

49 "execute_recipe_collate", 

50 "filters", 

51 "get_operator", 

52 "misc", 

53 "plot", 

54 "read", 

55 "regrid", 

56 "write", 

57] 

58 

59# Stop iris giving a warning whenever it loads something. 

60FUTURE.datum_support = True 

61# Stop iris giving a warning whenever it saves something. 

62FUTURE.save_split_attrs = True 

63 

64 

65def get_operator(name: str): 

66 """Get an operator by its name. 

67 

68 Parameters 

69 ---------- 

70 name: str 

71 The name of the desired operator. 

72 

73 Returns 

74 ------- 

75 function 

76 The named operator. 

77 

78 Raises 

79 ------ 

80 ValueError 

81 If name is not an operator. 

82 

83 Examples 

84 -------- 

85 >>> CSET.operators.get_operator("read.read_cubes") 

86 <function read_cubes at 0x7fcf9353c8b0> 

87 """ 

88 logging.debug("get_operator(%s)", name) 

89 try: 

90 name_sections = name.split(".") 

91 operator = CSET.operators 

92 for section in name_sections: 

93 operator = getattr(operator, section) 

94 if callable(operator): 

95 return operator 

96 else: 

97 raise AttributeError 

98 except (AttributeError, TypeError) as err: 

99 raise ValueError(f"Unknown operator: {name}") from err 

100 

101 

102def _write_metadata(recipe: dict): 

103 """Write a meta.json file in the CWD.""" 

104 # TODO: Investigate whether we might be better served by an SQLite database. 

105 metadata = recipe.copy() 

106 # Remove steps, as not needed, and might contain non-serialisable types. 

107 metadata.pop("parallel", None) 

108 metadata.pop("collate", None) 

109 with open("meta.json", "wt", encoding="UTF-8") as fp: 

110 json.dump(metadata, fp) 

111 os.sync() 

112 # Stat directory to force NFS to synchronise metadata. 

113 os.stat(Path.cwd()) 

114 

115 

116def _step_parser(step: dict, step_input: any) -> str: 

117 """Execute a recipe step, recursively executing any sub-steps.""" 

118 logging.debug("Executing step: %s", step) 

119 kwargs = {} 

120 for key in step.keys(): 

121 if key == "operator": 

122 operator = get_operator(step["operator"]) 

123 logging.info("operator: %s", step["operator"]) 

124 elif isinstance(step[key], dict) and "operator" in step[key]: 

125 logging.debug("Recursing into argument: %s", key) 

126 kwargs[key] = _step_parser(step[key], step_input) 

127 else: 

128 kwargs[key] = step[key] 

129 logging.debug("args: %s", kwargs) 

130 logging.debug("step_input: %s", step_input) 

131 # If first argument of operator is explicitly defined, use that rather 

132 # than step_input. This is known through introspection of the operator. 

133 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

134 logging.debug("first_arg: %s", first_arg) 

135 if first_arg not in kwargs: 

136 logging.debug("first_arg not in kwargs, using step_input.") 

137 return operator(step_input, **kwargs) 

138 else: 

139 logging.debug("first_arg in kwargs.") 

140 return operator(**kwargs) 

141 

142 

143def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path): 

144 """Execute the steps in a recipe.""" 

145 original_working_directory = Path.cwd() 

146 os.chdir(output_directory) 

147 try: 

148 logger = logging.getLogger() 

149 diagnostic_log = logging.FileHandler( 

150 filename="CSET.log", mode="w", encoding="UTF-8" 

151 ) 

152 diagnostic_log.addFilter(lambda record: record.levelno >= logging.INFO) 

153 diagnostic_log.setFormatter( 

154 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 

155 ) 

156 logger.addHandler(diagnostic_log) 

157 # Create metadata file used by some steps. 

158 if style_file: 

159 recipe["style_file_path"] = str(style_file) 

160 _write_metadata(recipe) 

161 # Execute the recipe. 

162 for step in steps: 

163 step_input = _step_parser(step, step_input) 

164 logging.info("Recipe output:\n%s", step_input) 

165 finally: 

166 os.chdir(original_working_directory) 

167 

168 

169def execute_recipe_parallel( 

170 recipe_yaml: Union[Path, str], 

171 input_directory: Path, 

172 output_directory: Path, 

173 recipe_variables: dict = None, 

174 style_file: Path = None, 

175) -> None: 

176 """Parse and executes the parallel steps from a recipe file. 

177 

178 Parameters 

179 ---------- 

180 recipe_yaml: Path or str 

181 Path to a file containing, or string of, a recipe's YAML describing the 

182 operators that need running. If a Path is provided it is opened and 

183 read. 

184 input_file: Path 

185 Pathlike to netCDF (or something else that iris read) file to be used as 

186 input. 

187 output_directory: Path 

188 Pathlike indicating desired location of output. 

189 recipe_variables: dict 

190 Dictionary of variables for the recipe. 

191 

192 Raises 

193 ------ 

194 FileNotFoundError 

195 The recipe or input file cannot be found. 

196 FileExistsError 

197 The output directory as actually a file. 

198 ValueError 

199 The recipe is not well formed. 

200 TypeError 

201 The provided recipe is not a stream or Path. 

202 """ 

203 if recipe_variables is None: 

204 recipe_variables = {} 

205 recipe = parse_recipe(recipe_yaml, recipe_variables) 

206 step_input = Path(input_directory).absolute() 

207 # Create output directory, and an inter-cycle intermediate directory. 

208 try: 

209 (output_directory / "intermediate").mkdir(parents=True, exist_ok=True) 

210 except (FileExistsError, NotADirectoryError) as err: 

211 logging.error("Output directory is a file. %s", output_directory) 

212 raise err 

213 steps = recipe["parallel"] 

214 _run_steps(recipe, steps, step_input, output_directory, style_file) 

215 

216 

217def execute_recipe_collate( 

218 recipe_yaml: Union[Path, str], 

219 output_directory: Path, 

220 recipe_variables: dict = None, 

221 style_file: Path = None, 

222) -> None: 

223 """Parse and execute the collation steps from a recipe file. 

224 

225 Parameters 

226 ---------- 

227 recipe_yaml: Path or str 

228 Path to a file containing, or string of, a recipe's YAML describing the 

229 operators that need running. If a Path is provided it is opened and 

230 read. 

231 output_directory: Path 

232 Pathlike indicating desired location of output. Must already exist. 

233 recipe_variables: dict 

234 Dictionary of variables for the recipe. 

235 

236 Raises 

237 ------ 

238 ValueError 

239 The recipe is not well formed. 

240 TypeError 

241 The provided recipe is not a stream or Path. 

242 """ 

243 if recipe_variables is None: 

244 recipe_variables = {} 

245 output_directory = Path(output_directory).resolve() 

246 assert output_directory.is_dir() 

247 recipe = parse_recipe(recipe_yaml, recipe_variables) 

248 # If collate doesn't exist treat it as having no steps. 

249 steps = recipe.get("collate", []) 

250 _run_steps(recipe, steps, output_directory, output_directory, style_file)