Coverage for src / CSET / operators / __init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 11:58 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 imageprocessing, 

38 mesoscale, 

39 misc, 

40 plot, 

41 read, 

42 regrid, 

43 transect, 

44 wind, 

45 write, 

46) 

47 

48# Exported operators & functions to use elsewhere. 

49__all__ = [ 

50 "ageofair", 

51 "aggregate", 

52 "aviation", 

53 "collapse", 

54 "constraints", 

55 "convection", 

56 "ensembles", 

57 "execute_recipe", 

58 "filters", 

59 "get_operator", 

60 "imageprocessing", 

61 "mesoscale", 

62 "misc", 

63 "plot", 

64 "read", 

65 "regrid", 

66 "transect", 

67 "wind", 

68 "write", 

69] 

70 

71# Stop iris giving a warning whenever it loads something. 

72FUTURE.datum_support = True 

73# Stop iris giving a warning whenever it saves something. 

74FUTURE.save_split_attrs = True 

75# Accept microsecond precision in iris times. 

76FUTURE.date_microseconds = True 

77 

78 

79def get_operator(name: str): 

80 """Get an operator by its name. 

81 

82 Parameters 

83 ---------- 

84 name: str 

85 The name of the desired operator. 

86 

87 Returns 

88 ------- 

89 function 

90 The named operator. 

91 

92 Raises 

93 ------ 

94 ValueError 

95 If name is not an operator. 

96 

97 Examples 

98 -------- 

99 >>> CSET.operators.get_operator("read.read_cubes") 

100 <function read_cubes at 0x7fcf9353c8b0> 

101 """ 

102 logging.debug("get_operator(%s)", name) 

103 try: 

104 name_sections = name.split(".") 

105 operator = CSET.operators 

106 for section in name_sections: 

107 operator = getattr(operator, section) 

108 if callable(operator): 

109 return operator 

110 else: 

111 raise AttributeError 

112 except (AttributeError, TypeError) as err: 

113 raise ValueError(f"Unknown operator: {name}") from err 

114 

115 

116def _write_metadata(recipe: dict): 

117 """Write a meta.json file in the CWD.""" 

118 metadata = recipe.copy() 

119 # Remove steps, as not needed, and might contain non-serialisable types. 

120 metadata.pop("steps", None) 

121 # To remove long variable names with suffix 

122 if "title" in metadata: 

123 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

124 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

125 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

126 with open("meta.json", "wt", encoding="UTF-8") as fp: 

127 json.dump(metadata, fp, indent=2) 

128 

129 

130def _step_parser(step: dict, step_input: any) -> str: 

131 """Execute a recipe step, recursively executing any sub-steps.""" 

132 logging.debug("Executing step: %s", step) 

133 kwargs = {} 

134 for key in step.keys(): 

135 if key == "operator": 

136 operator = get_operator(step["operator"]) 

137 logging.info("operator: %s", step["operator"]) 

138 elif isinstance(step[key], dict) and "operator" in step[key]: 

139 logging.debug("Recursing into argument: %s", key) 

140 kwargs[key] = _step_parser(step[key], step_input) 

141 else: 

142 kwargs[key] = step[key] 

143 logging.debug("args: %s", kwargs) 

144 logging.debug("step_input: %s", step_input) 

145 # If first argument of operator is explicitly defined, use that rather 

146 # than step_input. This is known through introspection of the operator. 

147 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

148 logging.debug("first_arg: %s", first_arg) 

149 if first_arg not in kwargs: 

150 logging.debug("first_arg not in kwargs, using step_input.") 

151 return operator(step_input, **kwargs) 

152 else: 

153 logging.debug("first_arg in kwargs.") 

154 return operator(**kwargs) 

155 

156 

157def create_diagnostic_archive(): 

158 """Create archive for easy download of plots and data.""" 

159 output_directory: Path = Path.cwd() 

160 archive_path = output_directory / "diagnostic.zip" 

161 with zipfile.ZipFile( 

162 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

163 ) as archive: 

164 for file in output_directory.rglob("*"): 

165 # Check the archive doesn't add itself. 

166 if not file.samefile(archive_path): 

167 archive.write(file, arcname=file.relative_to(output_directory)) 

168 

169 

170def execute_recipe( 

171 recipe: dict, 

172 output_directory: Path, 

173 style_file: Path = None, 

174 plot_resolution: int = None, 

175 skip_write: bool = None, 

176) -> None: 

177 """Parse and executes the steps from a recipe file. 

178 

179 Parameters 

180 ---------- 

181 recipe: dict 

182 Parsed recipe. 

183 output_directory: Path 

184 Pathlike indicating desired location of output. 

185 style_file: Path, optional 

186 Path to a style file. 

187 plot_resolution: int, optional 

188 Resolution of plots in dpi. 

189 skip_write: bool, optional 

190 Skip saving processed output alongside plots. 

191 

192 Raises 

193 ------ 

194 FileNotFoundError 

195 The recipe or input file cannot be found. 

196 FileExistsError 

197 The output directory as actually a file. 

198 ValueError 

199 The recipe is not well formed. 

200 TypeError 

201 The provided recipe is not a stream or Path. 

202 """ 

203 # Create output directory. 

204 try: 

205 output_directory.mkdir(parents=True, exist_ok=True) 

206 except (FileExistsError, NotADirectoryError) as err: 

207 logging.error("Output directory is a file. %s", output_directory) 

208 raise err 

209 steps = recipe["steps"] 

210 

211 # Execute the steps in a recipe. 

212 original_working_directory = Path.cwd() 

213 try: 

214 os.chdir(output_directory) 

215 logger = logging.getLogger(__name__) 

216 diagnostic_log = logging.FileHandler( 

217 filename="CSET.log", mode="w", encoding="UTF-8" 

218 ) 

219 diagnostic_log.setFormatter( 

220 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

221 ) 

222 logger.addHandler(diagnostic_log) 

223 # Create metadata file used by some steps. 

224 if style_file: 

225 recipe["style_file_path"] = str(style_file) 

226 if plot_resolution: 

227 recipe["plot_resolution"] = plot_resolution 

228 if skip_write: 

229 recipe["skip_write"] = skip_write 

230 _write_metadata(recipe) 

231 

232 # Execute the recipe. 

233 step_input = None 

234 for step in steps: 

235 step_input = _step_parser(step, step_input) 

236 logger.info("Recipe output:\n%s", step_input) 

237 

238 logger.info("Creating diagnostic archive.") 

239 create_diagnostic_archive() 

240 finally: 

241 os.chdir(original_working_directory)