Coverage for src/CSET/operators/__init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 15:17 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 aviation, 

32 collapse, 

33 constraints, 

34 convection, 

35 ensembles, 

36 filters, 

37 mesoscale, 

38 misc, 

39 plot, 

40 read, 

41 regrid, 

42 transect, 

43 wind, 

44 write, 

45) 

46 

47# Exported operators & functions to use elsewhere. 

48__all__ = [ 

49 "ageofair", 

50 "aggregate", 

51 "aviation", 

52 "collapse", 

53 "constraints", 

54 "convection", 

55 "ensembles", 

56 "execute_recipe", 

57 "filters", 

58 "get_operator", 

59 "mesoscale", 

60 "misc", 

61 "plot", 

62 "read", 

63 "regrid", 

64 "transect", 

65 "wind", 

66 "write", 

67] 

68 

69# Stop iris giving a warning whenever it loads something. 

70FUTURE.datum_support = True 

71# Stop iris giving a warning whenever it saves something. 

72FUTURE.save_split_attrs = True 

73# Accept microsecond precision in iris times. 

74FUTURE.date_microseconds = True 

75 

76 

77def get_operator(name: str): 

78 """Get an operator by its name. 

79 

80 Parameters 

81 ---------- 

82 name: str 

83 The name of the desired operator. 

84 

85 Returns 

86 ------- 

87 function 

88 The named operator. 

89 

90 Raises 

91 ------ 

92 ValueError 

93 If name is not an operator. 

94 

95 Examples 

96 -------- 

97 >>> CSET.operators.get_operator("read.read_cubes") 

98 <function read_cubes at 0x7fcf9353c8b0> 

99 """ 

100 logging.debug("get_operator(%s)", name) 

101 try: 

102 name_sections = name.split(".") 

103 operator = CSET.operators 

104 for section in name_sections: 

105 operator = getattr(operator, section) 

106 if callable(operator): 

107 return operator 

108 else: 

109 raise AttributeError 

110 except (AttributeError, TypeError) as err: 

111 raise ValueError(f"Unknown operator: {name}") from err 

112 

113 

114def _write_metadata(recipe: dict): 

115 """Write a meta.json file in the CWD.""" 

116 metadata = recipe.copy() 

117 # Remove steps, as not needed, and might contain non-serialisable types. 

118 metadata.pop("steps", None) 

119 # To remove long variable names with suffix 

120 if "title" in metadata: 

121 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

122 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

123 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

124 with open("meta.json", "wt", encoding="UTF-8") as fp: 

125 json.dump(metadata, fp, indent=2) 

126 

127 

128def _step_parser(step: dict, step_input: any) -> str: 

129 """Execute a recipe step, recursively executing any sub-steps.""" 

130 logging.debug("Executing step: %s", step) 

131 kwargs = {} 

132 for key in step.keys(): 

133 if key == "operator": 

134 operator = get_operator(step["operator"]) 

135 logging.info("operator: %s", step["operator"]) 

136 elif isinstance(step[key], dict) and "operator" in step[key]: 

137 logging.debug("Recursing into argument: %s", key) 

138 kwargs[key] = _step_parser(step[key], step_input) 

139 else: 

140 kwargs[key] = step[key] 

141 logging.debug("args: %s", kwargs) 

142 logging.debug("step_input: %s", step_input) 

143 # If first argument of operator is explicitly defined, use that rather 

144 # than step_input. This is known through introspection of the operator. 

145 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

146 logging.debug("first_arg: %s", first_arg) 

147 if first_arg not in kwargs: 

148 logging.debug("first_arg not in kwargs, using step_input.") 

149 return operator(step_input, **kwargs) 

150 else: 

151 logging.debug("first_arg in kwargs.") 

152 return operator(**kwargs) 

153 

154 

155def create_diagnostic_archive(): 

156 """Create archive for easy download of plots and data.""" 

157 output_directory: Path = Path.cwd() 

158 archive_path = output_directory / "diagnostic.zip" 

159 with zipfile.ZipFile( 

160 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

161 ) as archive: 

162 for file in output_directory.rglob("*"): 

163 # Check the archive doesn't add itself. 

164 if not file.samefile(archive_path): 

165 archive.write(file, arcname=file.relative_to(output_directory)) 

166 

167 

168def execute_recipe( 

169 recipe: dict, 

170 output_directory: Path, 

171 style_file: Path = None, 

172 plot_resolution: int = None, 

173 skip_write: bool = None, 

174) -> None: 

175 """Parse and executes the steps from a recipe file. 

176 

177 Parameters 

178 ---------- 

179 recipe: dict 

180 Parsed recipe. 

181 output_directory: Path 

182 Pathlike indicating desired location of output. 

183 style_file: Path, optional 

184 Path to a style file. 

185 plot_resolution: int, optional 

186 Resolution of plots in dpi. 

187 skip_write: bool, optional 

188 Skip saving processed output alongside plots. 

189 

190 Raises 

191 ------ 

192 FileNotFoundError 

193 The recipe or input file cannot be found. 

194 FileExistsError 

195 The output directory as actually a file. 

196 ValueError 

197 The recipe is not well formed. 

198 TypeError 

199 The provided recipe is not a stream or Path. 

200 """ 

201 # Create output directory. 

202 try: 

203 output_directory.mkdir(parents=True, exist_ok=True) 

204 except (FileExistsError, NotADirectoryError) as err: 

205 logging.error("Output directory is a file. %s", output_directory) 

206 raise err 

207 steps = recipe["steps"] 

208 

209 # Execute the steps in a recipe. 

210 original_working_directory = Path.cwd() 

211 try: 

212 os.chdir(output_directory) 

213 logger = logging.getLogger(__name__) 

214 diagnostic_log = logging.FileHandler( 

215 filename="CSET.log", mode="w", encoding="UTF-8" 

216 ) 

217 diagnostic_log.setFormatter( 

218 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

219 ) 

220 logger.addHandler(diagnostic_log) 

221 # Create metadata file used by some steps. 

222 if style_file: 

223 recipe["style_file_path"] = str(style_file) 

224 if plot_resolution: 

225 recipe["plot_resolution"] = plot_resolution 

226 if skip_write: 

227 recipe["skip_write"] = skip_write 

228 _write_metadata(recipe) 

229 

230 # Execute the recipe. 

231 step_input = None 

232 for step in steps: 

233 step_input = _step_parser(step, step_input) 

234 logger.info("Recipe output:\n%s", step_input) 

235 

236 logger.info("Creating diagnostic archive.") 

237 create_diagnostic_archive() 

238 finally: 

239 os.chdir(original_working_directory)