Coverage for src/CSET/operators/__init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 13:58 +0000

1# © Crown copyright, Met Office (2022-2026) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 humidity, 

38 imageprocessing, 

39 mesoscale, 

40 misc, 

41 plot, 

42 precipitation, 

43 pressure, 

44 read, 

45 regrid, 

46 scoreswrappers, 

47 temperature, 

48 transect, 

49 wind, 

50 write, 

51) 

52 

53# Exported operators & functions to use elsewhere. 

54__all__ = [ 

55 "ageofair", 

56 "aggregate", 

57 "aviation", 

58 "collapse", 

59 "constraints", 

60 "convection", 

61 "ensembles", 

62 "execute_recipe", 

63 "filters", 

64 "humidity", 

65 "get_operator", 

66 "imageprocessing", 

67 "mesoscale", 

68 "misc", 

69 "plot", 

70 "precipitation", 

71 "pressure", 

72 "read", 

73 "regrid", 

74 "temperature", 

75 "scoreswrappers", 

76 "transect", 

77 "wind", 

78 "write", 

79] 

80 

81# Stop iris giving a warning whenever it loads something. 

82FUTURE.datum_support = True 

83# Stop iris giving a warning whenever it saves something. 

84FUTURE.save_split_attrs = True 

85# Accept microsecond precision in iris times. 

86FUTURE.date_microseconds = True 

87 

88 

89def get_operator(name: str): 

90 """Get an operator by its name. 

91 

92 Parameters 

93 ---------- 

94 name: str 

95 The name of the desired operator. 

96 

97 Returns 

98 ------- 

99 function 

100 The named operator. 

101 

102 Raises 

103 ------ 

104 ValueError 

105 If name is not an operator. 

106 

107 Examples 

108 -------- 

109 >>> CSET.operators.get_operator("read.read_cubes") 

110 <function read_cubes at 0x7fcf9353c8b0> 

111 """ 

112 logging.debug("get_operator(%s)", name) 

113 try: 

114 name_sections = name.split(".") 

115 operator = CSET.operators 

116 for section in name_sections: 

117 operator = getattr(operator, section) 

118 if callable(operator): 

119 return operator 

120 else: 

121 raise AttributeError 

122 except (AttributeError, TypeError) as err: 

123 raise ValueError(f"Unknown operator: {name}") from err 

124 

125 

126def _write_metadata(recipe: dict): 

127 """Write a meta.json file in the CWD.""" 

128 metadata = recipe.copy() 

129 # Remove steps, as not needed, and might contain non-serialisable types. 

130 metadata.pop("steps", None) 

131 # To remove long variable names with suffix 

132 if "title" in metadata: 

133 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

134 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

135 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

136 with open("meta.json", "wt", encoding="UTF-8") as fp: 

137 json.dump(metadata, fp, indent=2) 

138 

139 

140def _step_parser(step: dict, step_input: any) -> str: 

141 """Execute a recipe step, recursively executing any sub-steps.""" 

142 logging.debug("Executing step: %s", step) 

143 kwargs = {} 

144 for key in step.keys(): 

145 if key == "operator": 

146 operator = get_operator(step["operator"]) 

147 logging.info("operator: %s", step["operator"]) 

148 elif isinstance(step[key], dict) and "operator" in step[key]: 

149 logging.debug("Recursing into argument: %s", key) 

150 kwargs[key] = _step_parser(step[key], step_input) 

151 else: 

152 kwargs[key] = step[key] 

153 logging.debug("args: %s", kwargs) 

154 logging.debug("step_input: %s", step_input) 

155 # If first argument of operator is explicitly defined, use that rather 

156 # than step_input. This is known through introspection of the operator. 

157 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

158 logging.debug("first_arg: %s", first_arg) 

159 if first_arg not in kwargs: 

160 logging.debug("first_arg not in kwargs, using step_input.") 

161 return operator(step_input, **kwargs) 

162 else: 

163 logging.debug("first_arg in kwargs.") 

164 return operator(**kwargs) 

165 

166 

167def create_diagnostic_archive(): 

168 """Create archive for easy download of plots and data.""" 

169 output_directory: Path = Path.cwd() 

170 archive_path = output_directory / "diagnostic.zip" 

171 with zipfile.ZipFile( 

172 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

173 ) as archive: 

174 for file in output_directory.rglob("*"): 

175 # Check the archive doesn't add itself. 

176 if not file.samefile(archive_path): 

177 archive.write(file, arcname=file.relative_to(output_directory)) 

178 

179 

180def execute_recipe( 

181 recipe: dict, 

182 output_directory: Path, 

183 style_file: Path = None, 

184 plot_resolution: int = None, 

185 skip_write: bool = None, 

186) -> None: 

187 """Parse and executes the steps from a recipe file. 

188 

189 Parameters 

190 ---------- 

191 recipe: dict 

192 Parsed recipe. 

193 output_directory: Path 

194 Pathlike indicating desired location of output. 

195 style_file: Path, optional 

196 Path to a style file. 

197 plot_resolution: int, optional 

198 Resolution of plots in dpi. 

199 skip_write: bool, optional 

200 Skip saving processed output alongside plots. 

201 

202 Raises 

203 ------ 

204 FileNotFoundError 

205 The recipe or input file cannot be found. 

206 FileExistsError 

207 The output directory as actually a file. 

208 ValueError 

209 The recipe is not well formed. 

210 TypeError 

211 The provided recipe is not a stream or Path. 

212 """ 

213 # Create output directory. 

214 try: 

215 output_directory.mkdir(parents=True, exist_ok=True) 

216 except (FileExistsError, NotADirectoryError) as err: 

217 logging.error("Output directory is a file. %s", output_directory) 

218 raise err 

219 steps = recipe["steps"] 

220 

221 # Execute the steps in a recipe. 

222 original_working_directory = Path.cwd() 

223 try: 

224 os.chdir(output_directory) 

225 logger = logging.getLogger(__name__) 

226 diagnostic_log = logging.FileHandler( 

227 filename="CSET.log", mode="w", encoding="UTF-8" 

228 ) 

229 diagnostic_log.setFormatter( 

230 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

231 ) 

232 logger.addHandler(diagnostic_log) 

233 # Create metadata file used by some steps. 

234 if style_file: 

235 recipe["style_file_path"] = str(style_file) 

236 if plot_resolution: 

237 recipe["plot_resolution"] = plot_resolution 

238 if skip_write: 

239 recipe["skip_write"] = skip_write 

240 _write_metadata(recipe) 

241 

242 # Execute the recipe. 

243 step_input = None 

244 for step in steps: 

245 step_input = _step_parser(step, step_input) 

246 logger.info("Recipe output:\n%s", step_input) 

247 

248 logger.info("Creating diagnostic archive.") 

249 create_diagnostic_archive() 

250 finally: 

251 os.chdir(original_working_directory)