Coverage for src / CSET / operators / __init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 09:52 +0000

1# © Crown copyright, Met Office (2022-2026) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 humidity, 

38 imageprocessing, 

39 mesoscale, 

40 misc, 

41 plot, 

42 precipitation, 

43 pressure, 

44 read, 

45 regrid, 

46 temperature, 

47 transect, 

48 wind, 

49 write, 

50) 

51 

52# Exported operators & functions to use elsewhere. 

53__all__ = [ 

54 "ageofair", 

55 "aggregate", 

56 "aviation", 

57 "collapse", 

58 "constraints", 

59 "convection", 

60 "ensembles", 

61 "execute_recipe", 

62 "filters", 

63 "humidity", 

64 "get_operator", 

65 "imageprocessing", 

66 "mesoscale", 

67 "misc", 

68 "plot", 

69 "precipitation", 

70 "pressure", 

71 "read", 

72 "regrid", 

73 "temperature", 

74 "transect", 

75 "wind", 

76 "write", 

77] 

78 

79# Stop iris giving a warning whenever it loads something. 

80FUTURE.datum_support = True 

81# Stop iris giving a warning whenever it saves something. 

82FUTURE.save_split_attrs = True 

83# Accept microsecond precision in iris times. 

84FUTURE.date_microseconds = True 

85 

86 

87def get_operator(name: str): 

88 """Get an operator by its name. 

89 

90 Parameters 

91 ---------- 

92 name: str 

93 The name of the desired operator. 

94 

95 Returns 

96 ------- 

97 function 

98 The named operator. 

99 

100 Raises 

101 ------ 

102 ValueError 

103 If name is not an operator. 

104 

105 Examples 

106 -------- 

107 >>> CSET.operators.get_operator("read.read_cubes") 

108 <function read_cubes at 0x7fcf9353c8b0> 

109 """ 

110 logging.debug("get_operator(%s)", name) 

111 try: 

112 name_sections = name.split(".") 

113 operator = CSET.operators 

114 for section in name_sections: 

115 operator = getattr(operator, section) 

116 if callable(operator): 

117 return operator 

118 else: 

119 raise AttributeError 

120 except (AttributeError, TypeError) as err: 

121 raise ValueError(f"Unknown operator: {name}") from err 

122 

123 

124def _write_metadata(recipe: dict): 

125 """Write a meta.json file in the CWD.""" 

126 metadata = recipe.copy() 

127 # Remove steps, as not needed, and might contain non-serialisable types. 

128 metadata.pop("steps", None) 

129 # To remove long variable names with suffix 

130 if "title" in metadata: 

131 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

132 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

133 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

134 with open("meta.json", "wt", encoding="UTF-8") as fp: 

135 json.dump(metadata, fp, indent=2) 

136 

137 

138def _step_parser(step: dict, step_input: any) -> str: 

139 """Execute a recipe step, recursively executing any sub-steps.""" 

140 logging.debug("Executing step: %s", step) 

141 kwargs = {} 

142 for key in step.keys(): 

143 if key == "operator": 

144 operator = get_operator(step["operator"]) 

145 logging.info("operator: %s", step["operator"]) 

146 elif isinstance(step[key], dict) and "operator" in step[key]: 

147 logging.debug("Recursing into argument: %s", key) 

148 kwargs[key] = _step_parser(step[key], step_input) 

149 else: 

150 kwargs[key] = step[key] 

151 logging.debug("args: %s", kwargs) 

152 logging.debug("step_input: %s", step_input) 

153 # If first argument of operator is explicitly defined, use that rather 

154 # than step_input. This is known through introspection of the operator. 

155 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

156 logging.debug("first_arg: %s", first_arg) 

157 if first_arg not in kwargs: 

158 logging.debug("first_arg not in kwargs, using step_input.") 

159 return operator(step_input, **kwargs) 

160 else: 

161 logging.debug("first_arg in kwargs.") 

162 return operator(**kwargs) 

163 

164 

165def create_diagnostic_archive(): 

166 """Create archive for easy download of plots and data.""" 

167 output_directory: Path = Path.cwd() 

168 archive_path = output_directory / "diagnostic.zip" 

169 with zipfile.ZipFile( 

170 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

171 ) as archive: 

172 for file in output_directory.rglob("*"): 

173 # Check the archive doesn't add itself. 

174 if not file.samefile(archive_path): 

175 archive.write(file, arcname=file.relative_to(output_directory)) 

176 

177 

178def execute_recipe( 

179 recipe: dict, 

180 output_directory: Path, 

181 style_file: Path = None, 

182 plot_resolution: int = None, 

183 skip_write: bool = None, 

184) -> None: 

185 """Parse and executes the steps from a recipe file. 

186 

187 Parameters 

188 ---------- 

189 recipe: dict 

190 Parsed recipe. 

191 output_directory: Path 

192 Pathlike indicating desired location of output. 

193 style_file: Path, optional 

194 Path to a style file. 

195 plot_resolution: int, optional 

196 Resolution of plots in dpi. 

197 skip_write: bool, optional 

198 Skip saving processed output alongside plots. 

199 

200 Raises 

201 ------ 

202 FileNotFoundError 

203 The recipe or input file cannot be found. 

204 FileExistsError 

205 The output directory as actually a file. 

206 ValueError 

207 The recipe is not well formed. 

208 TypeError 

209 The provided recipe is not a stream or Path. 

210 """ 

211 # Create output directory. 

212 try: 

213 output_directory.mkdir(parents=True, exist_ok=True) 

214 except (FileExistsError, NotADirectoryError) as err: 

215 logging.error("Output directory is a file. %s", output_directory) 

216 raise err 

217 steps = recipe["steps"] 

218 

219 # Execute the steps in a recipe. 

220 original_working_directory = Path.cwd() 

221 try: 

222 os.chdir(output_directory) 

223 logger = logging.getLogger(__name__) 

224 diagnostic_log = logging.FileHandler( 

225 filename="CSET.log", mode="w", encoding="UTF-8" 

226 ) 

227 diagnostic_log.setFormatter( 

228 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

229 ) 

230 logger.addHandler(diagnostic_log) 

231 # Create metadata file used by some steps. 

232 if style_file: 

233 recipe["style_file_path"] = str(style_file) 

234 if plot_resolution: 

235 recipe["plot_resolution"] = plot_resolution 

236 if skip_write: 

237 recipe["skip_write"] = skip_write 

238 _write_metadata(recipe) 

239 

240 # Execute the recipe. 

241 step_input = None 

242 for step in steps: 

243 step_input = _step_parser(step, step_input) 

244 logger.info("Recipe output:\n%s", step_input) 

245 

246 logger.info("Creating diagnostic archive.") 

247 create_diagnostic_archive() 

248 finally: 

249 os.chdir(original_working_directory)