Coverage for src / CSET / operators / __init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-02 17:30 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 imageprocessing, 

38 mesoscale, 

39 misc, 

40 plot, 

41 power_spectrum, 

42 read, 

43 regrid, 

44 transect, 

45 wind, 

46 write, 

47) 

48 

49# Exported operators & functions to use elsewhere. 

50__all__ = [ 

51 "ageofair", 

52 "aggregate", 

53 "aviation", 

54 "collapse", 

55 "constraints", 

56 "convection", 

57 "ensembles", 

58 "execute_recipe", 

59 "filters", 

60 "get_operator", 

61 "imageprocessing", 

62 "mesoscale", 

63 "misc", 

64 "plot", 

65 "power_spectrum", 

66 "read", 

67 "regrid", 

68 "transect", 

69 "wind", 

70 "write", 

71] 

72 

73# Stop iris giving a warning whenever it loads something. 

74FUTURE.datum_support = True 

75# Stop iris giving a warning whenever it saves something. 

76FUTURE.save_split_attrs = True 

77# Accept microsecond precision in iris times. 

78FUTURE.date_microseconds = True 

79 

80 

81def get_operator(name: str): 

82 """Get an operator by its name. 

83 

84 Parameters 

85 ---------- 

86 name: str 

87 The name of the desired operator. 

88 

89 Returns 

90 ------- 

91 function 

92 The named operator. 

93 

94 Raises 

95 ------ 

96 ValueError 

97 If name is not an operator. 

98 

99 Examples 

100 -------- 

101 >>> CSET.operators.get_operator("read.read_cubes") 

102 <function read_cubes at 0x7fcf9353c8b0> 

103 """ 

104 logging.debug("get_operator(%s)", name) 

105 try: 

106 name_sections = name.split(".") 

107 operator = CSET.operators 

108 for section in name_sections: 

109 operator = getattr(operator, section) 

110 if callable(operator): 

111 return operator 

112 else: 

113 raise AttributeError 

114 except (AttributeError, TypeError) as err: 

115 raise ValueError(f"Unknown operator: {name}") from err 

116 

117 

118def _write_metadata(recipe: dict): 

119 """Write a meta.json file in the CWD.""" 

120 metadata = recipe.copy() 

121 # Remove steps, as not needed, and might contain non-serialisable types. 

122 metadata.pop("steps", None) 

123 # To remove long variable names with suffix 

124 if "title" in metadata: 

125 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

126 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

127 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

128 with open("meta.json", "wt", encoding="UTF-8") as fp: 

129 json.dump(metadata, fp, indent=2) 

130 

131 

132def _step_parser(step: dict, step_input: any) -> str: 

133 """Execute a recipe step, recursively executing any sub-steps.""" 

134 logging.debug("Executing step: %s", step) 

135 kwargs = {} 

136 for key in step.keys(): 

137 if key == "operator": 

138 operator = get_operator(step["operator"]) 

139 logging.info("operator: %s", step["operator"]) 

140 elif isinstance(step[key], dict) and "operator" in step[key]: 

141 logging.debug("Recursing into argument: %s", key) 

142 kwargs[key] = _step_parser(step[key], step_input) 

143 else: 

144 kwargs[key] = step[key] 

145 logging.debug("args: %s", kwargs) 

146 logging.debug("step_input: %s", step_input) 

147 # If first argument of operator is explicitly defined, use that rather 

148 # than step_input. This is known through introspection of the operator. 

149 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

150 logging.debug("first_arg: %s", first_arg) 

151 if first_arg not in kwargs: 

152 logging.debug("first_arg not in kwargs, using step_input.") 

153 return operator(step_input, **kwargs) 

154 else: 

155 logging.debug("first_arg in kwargs.") 

156 return operator(**kwargs) 

157 

158 

159def create_diagnostic_archive(): 

160 """Create archive for easy download of plots and data.""" 

161 output_directory: Path = Path.cwd() 

162 archive_path = output_directory / "diagnostic.zip" 

163 with zipfile.ZipFile( 

164 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

165 ) as archive: 

166 for file in output_directory.rglob("*"): 

167 # Check the archive doesn't add itself. 

168 if not file.samefile(archive_path): 

169 archive.write(file, arcname=file.relative_to(output_directory)) 

170 

171 

172def execute_recipe( 

173 recipe: dict, 

174 output_directory: Path, 

175 style_file: Path = None, 

176 plot_resolution: int = None, 

177 skip_write: bool = None, 

178) -> None: 

179 """Parse and executes the steps from a recipe file. 

180 

181 Parameters 

182 ---------- 

183 recipe: dict 

184 Parsed recipe. 

185 output_directory: Path 

186 Pathlike indicating desired location of output. 

187 style_file: Path, optional 

188 Path to a style file. 

189 plot_resolution: int, optional 

190 Resolution of plots in dpi. 

191 skip_write: bool, optional 

192 Skip saving processed output alongside plots. 

193 

194 Raises 

195 ------ 

196 FileNotFoundError 

197 The recipe or input file cannot be found. 

198 FileExistsError 

199 The output directory as actually a file. 

200 ValueError 

201 The recipe is not well formed. 

202 TypeError 

203 The provided recipe is not a stream or Path. 

204 """ 

205 # Create output directory. 

206 try: 

207 output_directory.mkdir(parents=True, exist_ok=True) 

208 except (FileExistsError, NotADirectoryError) as err: 

209 logging.error("Output directory is a file. %s", output_directory) 

210 raise err 

211 steps = recipe["steps"] 

212 

213 # Execute the steps in a recipe. 

214 original_working_directory = Path.cwd() 

215 try: 

216 os.chdir(output_directory) 

217 logger = logging.getLogger(__name__) 

218 diagnostic_log = logging.FileHandler( 

219 filename="CSET.log", mode="w", encoding="UTF-8" 

220 ) 

221 diagnostic_log.setFormatter( 

222 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

223 ) 

224 logger.addHandler(diagnostic_log) 

225 # Create metadata file used by some steps. 

226 if style_file: 

227 recipe["style_file_path"] = str(style_file) 

228 if plot_resolution: 

229 recipe["plot_resolution"] = plot_resolution 

230 if skip_write: 

231 recipe["skip_write"] = skip_write 

232 _write_metadata(recipe) 

233 

234 # Execute the recipe. 

235 step_input = None 

236 for step in steps: 

237 step_input = _step_parser(step, step_input) 

238 logger.info("Recipe output:\n%s", step_input) 

239 

240 logger.info("Creating diagnostic archive.") 

241 create_diagnostic_archive() 

242 finally: 

243 os.chdir(original_working_directory)