Coverage for src/CSET/operators/__init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-18 10:49 +0000

1# © Crown copyright, Met Office (2022-2026) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 curvature, 

36 ensembles, 

37 filters, 

38 humidity, 

39 imageprocessing, 

40 mesoscale, 

41 misc, 

42 plot, 

43 precipitation, 

44 pressure, 

45 read, 

46 regrid, 

47 scoreswrappers, 

48 temperature, 

49 transect, 

50 wind, 

51 write, 

52) 

53 

54# Exported operators & functions to use elsewhere. 

55__all__ = [ 

56 "ageofair", 

57 "aggregate", 

58 "aviation", 

59 "collapse", 

60 "constraints", 

61 "convection", 

62 "curvature", 

63 "ensembles", 

64 "execute_recipe", 

65 "filters", 

66 "humidity", 

67 "get_operator", 

68 "imageprocessing", 

69 "mesoscale", 

70 "misc", 

71 "plot", 

72 "precipitation", 

73 "pressure", 

74 "read", 

75 "regrid", 

76 "temperature", 

77 "scoreswrappers", 

78 "transect", 

79 "wind", 

80 "write", 

81] 

82 

83# Stop iris giving a warning whenever it loads something. 

84FUTURE.datum_support = True 

85# Stop iris giving a warning whenever it saves something. 

86FUTURE.save_split_attrs = True 

87# Accept microsecond precision in iris times. 

88FUTURE.date_microseconds = True 

89 

90 

91def get_operator(name: str): 

92 """Get an operator by its name. 

93 

94 Parameters 

95 ---------- 

96 name: str 

97 The name of the desired operator. 

98 

99 Returns 

100 ------- 

101 function 

102 The named operator. 

103 

104 Raises 

105 ------ 

106 ValueError 

107 If name is not an operator. 

108 

109 Examples 

110 -------- 

111 >>> CSET.operators.get_operator("read.read_cubes") 

112 <function read_cubes at 0x7fcf9353c8b0> 

113 """ 

114 logging.debug("get_operator(%s)", name) 

115 try: 

116 name_sections = name.split(".") 

117 operator = CSET.operators 

118 for section in name_sections: 

119 operator = getattr(operator, section) 

120 if callable(operator): 

121 return operator 

122 else: 

123 raise AttributeError 

124 except (AttributeError, TypeError) as err: 

125 raise ValueError(f"Unknown operator: {name}") from err 

126 

127 

128def _write_metadata(recipe: dict): 

129 """Write a meta.json file in the CWD.""" 

130 metadata = recipe.copy() 

131 # Remove steps, as not needed, and might contain non-serialisable types. 

132 metadata.pop("steps", None) 

133 # To remove long variable names with suffix 

134 if "title" in metadata: 

135 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

136 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

137 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

138 with open("meta.json", "wt", encoding="UTF-8") as fp: 

139 json.dump(metadata, fp, indent=2) 

140 

141 

142def _step_parser(step: dict, step_input: any) -> str: 

143 """Execute a recipe step, recursively executing any sub-steps.""" 

144 logging.debug("Executing step: %s", step) 

145 kwargs = {} 

146 for key in step.keys(): 

147 if key == "operator": 

148 operator = get_operator(step["operator"]) 

149 logging.info("operator: %s", step["operator"]) 

150 elif isinstance(step[key], dict) and "operator" in step[key]: 

151 logging.debug("Recursing into argument: %s", key) 

152 kwargs[key] = _step_parser(step[key], step_input) 

153 else: 

154 kwargs[key] = step[key] 

155 logging.debug("args: %s", kwargs) 

156 logging.debug("step_input: %s", step_input) 

157 # If first argument of operator is explicitly defined, use that rather 

158 # than step_input. This is known through introspection of the operator. 

159 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

160 logging.debug("first_arg: %s", first_arg) 

161 if first_arg not in kwargs: 

162 logging.debug("first_arg not in kwargs, using step_input.") 

163 return operator(step_input, **kwargs) 

164 else: 

165 logging.debug("first_arg in kwargs.") 

166 return operator(**kwargs) 

167 

168 

169def create_diagnostic_archive(): 

170 """Create archive for easy download of plots and data.""" 

171 output_directory: Path = Path.cwd() 

172 archive_path = output_directory / "diagnostic.zip" 

173 with zipfile.ZipFile( 

174 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

175 ) as archive: 

176 for file in output_directory.rglob("*"): 

177 # Check the archive doesn't add itself. 

178 if not file.samefile(archive_path): 

179 archive.write(file, arcname=file.relative_to(output_directory)) 

180 

181 

182def execute_recipe( 

183 recipe: dict, 

184 output_directory: Path, 

185 style_file: Path = None, 

186 plot_resolution: int = None, 

187 skip_write: bool = None, 

188) -> None: 

189 """Parse and executes the steps from a recipe file. 

190 

191 Parameters 

192 ---------- 

193 recipe: dict 

194 Parsed recipe. 

195 output_directory: Path 

196 Pathlike indicating desired location of output. 

197 style_file: Path, optional 

198 Path to a style file. 

199 plot_resolution: int, optional 

200 Resolution of plots in dpi. 

201 skip_write: bool, optional 

202 Skip saving processed output alongside plots. 

203 

204 Raises 

205 ------ 

206 FileNotFoundError 

207 The recipe or input file cannot be found. 

208 FileExistsError 

209 The output directory as actually a file. 

210 ValueError 

211 The recipe is not well formed. 

212 TypeError 

213 The provided recipe is not a stream or Path. 

214 """ 

215 # Create output directory. 

216 try: 

217 output_directory.mkdir(parents=True, exist_ok=True) 

218 except (FileExistsError, NotADirectoryError) as err: 

219 logging.error("Output directory is a file. %s", output_directory) 

220 raise err 

221 steps = recipe["steps"] 

222 

223 # Execute the steps in a recipe. 

224 original_working_directory = Path.cwd() 

225 try: 

226 os.chdir(output_directory) 

227 logger = logging.getLogger(__name__) 

228 diagnostic_log = logging.FileHandler( 

229 filename="CSET.log", mode="w", encoding="UTF-8" 

230 ) 

231 diagnostic_log.setFormatter( 

232 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

233 ) 

234 logger.addHandler(diagnostic_log) 

235 # Create metadata file used by some steps. 

236 if style_file: 

237 recipe["style_file_path"] = str(style_file) 

238 if plot_resolution: 

239 recipe["plot_resolution"] = plot_resolution 

240 if skip_write: 

241 recipe["skip_write"] = skip_write 

242 _write_metadata(recipe) 

243 

244 # Execute the recipe. 

245 step_input = None 

246 for step in steps: 

247 step_input = _step_parser(step, step_input) 

248 logger.info("Recipe output:\n%s", step_input) 

249 

250 logger.info("Creating diagnostic archive.") 

251 create_diagnostic_archive() 

252 finally: 

253 os.chdir(original_working_directory)