Coverage for src / CSET / operators / __init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-08 16:49 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 humidity, 

38 imageprocessing, 

39 mesoscale, 

40 misc, 

41 plot, 

42 pressure, 

43 read, 

44 regrid, 

45 temperature, 

46 transect, 

47 wind, 

48 write, 

49) 

50 

51# Exported operators & functions to use elsewhere. 

52__all__ = [ 

53 "ageofair", 

54 "aggregate", 

55 "aviation", 

56 "collapse", 

57 "constraints", 

58 "convection", 

59 "ensembles", 

60 "execute_recipe", 

61 "filters", 

62 "humidity", 

63 "get_operator", 

64 "imageprocessing", 

65 "mesoscale", 

66 "misc", 

67 "plot", 

68 "pressure", 

69 "read", 

70 "regrid", 

71 "temperature", 

72 "transect", 

73 "wind", 

74 "write", 

75] 

76 

77# Stop iris giving a warning whenever it loads something. 

78FUTURE.datum_support = True 

79# Stop iris giving a warning whenever it saves something. 

80FUTURE.save_split_attrs = True 

81# Accept microsecond precision in iris times. 

82FUTURE.date_microseconds = True 

83 

84 

85def get_operator(name: str): 

86 """Get an operator by its name. 

87 

88 Parameters 

89 ---------- 

90 name: str 

91 The name of the desired operator. 

92 

93 Returns 

94 ------- 

95 function 

96 The named operator. 

97 

98 Raises 

99 ------ 

100 ValueError 

101 If name is not an operator. 

102 

103 Examples 

104 -------- 

105 >>> CSET.operators.get_operator("read.read_cubes") 

106 <function read_cubes at 0x7fcf9353c8b0> 

107 """ 

108 logging.debug("get_operator(%s)", name) 

109 try: 

110 name_sections = name.split(".") 

111 operator = CSET.operators 

112 for section in name_sections: 

113 operator = getattr(operator, section) 

114 if callable(operator): 

115 return operator 

116 else: 

117 raise AttributeError 

118 except (AttributeError, TypeError) as err: 

119 raise ValueError(f"Unknown operator: {name}") from err 

120 

121 

122def _write_metadata(recipe: dict): 

123 """Write a meta.json file in the CWD.""" 

124 metadata = recipe.copy() 

125 # Remove steps, as not needed, and might contain non-serialisable types. 

126 metadata.pop("steps", None) 

127 # To remove long variable names with suffix 

128 if "title" in metadata: 

129 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

130 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

131 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

132 with open("meta.json", "wt", encoding="UTF-8") as fp: 

133 json.dump(metadata, fp, indent=2) 

134 

135 

136def _step_parser(step: dict, step_input: any) -> str: 

137 """Execute a recipe step, recursively executing any sub-steps.""" 

138 logging.debug("Executing step: %s", step) 

139 kwargs = {} 

140 for key in step.keys(): 

141 if key == "operator": 

142 operator = get_operator(step["operator"]) 

143 logging.info("operator: %s", step["operator"]) 

144 elif isinstance(step[key], dict) and "operator" in step[key]: 

145 logging.debug("Recursing into argument: %s", key) 

146 kwargs[key] = _step_parser(step[key], step_input) 

147 else: 

148 kwargs[key] = step[key] 

149 logging.debug("args: %s", kwargs) 

150 logging.debug("step_input: %s", step_input) 

151 # If first argument of operator is explicitly defined, use that rather 

152 # than step_input. This is known through introspection of the operator. 

153 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

154 logging.debug("first_arg: %s", first_arg) 

155 if first_arg not in kwargs: 

156 logging.debug("first_arg not in kwargs, using step_input.") 

157 return operator(step_input, **kwargs) 

158 else: 

159 logging.debug("first_arg in kwargs.") 

160 return operator(**kwargs) 

161 

162 

163def create_diagnostic_archive(): 

164 """Create archive for easy download of plots and data.""" 

165 output_directory: Path = Path.cwd() 

166 archive_path = output_directory / "diagnostic.zip" 

167 with zipfile.ZipFile( 

168 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

169 ) as archive: 

170 for file in output_directory.rglob("*"): 

171 # Check the archive doesn't add itself. 

172 if not file.samefile(archive_path): 

173 archive.write(file, arcname=file.relative_to(output_directory)) 

174 

175 

176def execute_recipe( 

177 recipe: dict, 

178 output_directory: Path, 

179 style_file: Path = None, 

180 plot_resolution: int = None, 

181 skip_write: bool = None, 

182) -> None: 

183 """Parse and executes the steps from a recipe file. 

184 

185 Parameters 

186 ---------- 

187 recipe: dict 

188 Parsed recipe. 

189 output_directory: Path 

190 Pathlike indicating desired location of output. 

191 style_file: Path, optional 

192 Path to a style file. 

193 plot_resolution: int, optional 

194 Resolution of plots in dpi. 

195 skip_write: bool, optional 

196 Skip saving processed output alongside plots. 

197 

198 Raises 

199 ------ 

200 FileNotFoundError 

201 The recipe or input file cannot be found. 

202 FileExistsError 

203 The output directory as actually a file. 

204 ValueError 

205 The recipe is not well formed. 

206 TypeError 

207 The provided recipe is not a stream or Path. 

208 """ 

209 # Create output directory. 

210 try: 

211 output_directory.mkdir(parents=True, exist_ok=True) 

212 except (FileExistsError, NotADirectoryError) as err: 

213 logging.error("Output directory is a file. %s", output_directory) 

214 raise err 

215 steps = recipe["steps"] 

216 

217 # Execute the steps in a recipe. 

218 original_working_directory = Path.cwd() 

219 try: 

220 os.chdir(output_directory) 

221 logger = logging.getLogger(__name__) 

222 diagnostic_log = logging.FileHandler( 

223 filename="CSET.log", mode="w", encoding="UTF-8" 

224 ) 

225 diagnostic_log.setFormatter( 

226 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

227 ) 

228 logger.addHandler(diagnostic_log) 

229 # Create metadata file used by some steps. 

230 if style_file: 

231 recipe["style_file_path"] = str(style_file) 

232 if plot_resolution: 

233 recipe["plot_resolution"] = plot_resolution 

234 if skip_write: 

235 recipe["skip_write"] = skip_write 

236 _write_metadata(recipe) 

237 

238 # Execute the recipe. 

239 step_input = None 

240 for step in steps: 

241 step_input = _step_parser(step, step_input) 

242 logger.info("Recipe output:\n%s", step_input) 

243 

244 logger.info("Creating diagnostic archive.") 

245 create_diagnostic_archive() 

246 finally: 

247 os.chdir(original_working_directory)