Coverage for src/CSET/operators/__init__.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 21:08 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Subpackage contains all of CSET's operators.""" 

16 

17import inspect 

18import json 

19import logging 

20import os 

21import zipfile 

22from pathlib import Path 

23 

24from iris import FUTURE 

25 

26# Import operators here so they are exported for use by recipes. 

27import CSET.operators 

28from CSET.operators import ( 

29 ageofair, 

30 aggregate, 

31 collapse, 

32 constraints, 

33 convection, 

34 ensembles, 

35 filters, 

36 mesoscale, 

37 misc, 

38 plot, 

39 read, 

40 regrid, 

41 transect, 

42 wind, 

43 write, 

44) 

45 

46# Exported operators & functions to use elsewhere. 

47__all__ = [ 

48 "ageofair", 

49 "aggregate", 

50 "collapse", 

51 "constraints", 

52 "convection", 

53 "ensembles", 

54 "execute_recipe", 

55 "filters", 

56 "get_operator", 

57 "mesoscale", 

58 "misc", 

59 "plot", 

60 "read", 

61 "regrid", 

62 "transect", 

63 "wind", 

64 "write", 

65] 

66 

67# Stop iris giving a warning whenever it loads something. 

68FUTURE.datum_support = True 

69# Stop iris giving a warning whenever it saves something. 

70FUTURE.save_split_attrs = True 

71# Accept microsecond precision in iris times. 

72FUTURE.date_microseconds = True 

73 

74 

75def get_operator(name: str): 

76 """Get an operator by its name. 

77 

78 Parameters 

79 ---------- 

80 name: str 

81 The name of the desired operator. 

82 

83 Returns 

84 ------- 

85 function 

86 The named operator. 

87 

88 Raises 

89 ------ 

90 ValueError 

91 If name is not an operator. 

92 

93 Examples 

94 -------- 

95 >>> CSET.operators.get_operator("read.read_cubes") 

96 <function read_cubes at 0x7fcf9353c8b0> 

97 """ 

98 logging.debug("get_operator(%s)", name) 

99 try: 

100 name_sections = name.split(".") 

101 operator = CSET.operators 

102 for section in name_sections: 

103 operator = getattr(operator, section) 

104 if callable(operator): 

105 return operator 

106 else: 

107 raise AttributeError 

108 except (AttributeError, TypeError) as err: 

109 raise ValueError(f"Unknown operator: {name}") from err 

110 

111 

112def _write_metadata(recipe: dict): 

113 """Write a meta.json file in the CWD.""" 

114 metadata = recipe.copy() 

115 # Remove steps, as not needed, and might contain non-serialisable types. 

116 metadata.pop("steps", None) 

117 # To remove long variable names with suffix 

118 if "title" in metadata: 

119 metadata["title"] = metadata["title"].replace("_for_climate_averaging", "") 

120 metadata["title"] = metadata["title"].replace("_radiative_timestep", "") 

121 metadata["title"] = metadata["title"].replace("_maximum_random_overlap", "") 

122 with open("meta.json", "wt", encoding="UTF-8") as fp: 

123 json.dump(metadata, fp, indent=2) 

124 

125 

126def _step_parser(step: dict, step_input: any) -> str: 

127 """Execute a recipe step, recursively executing any sub-steps.""" 

128 logging.debug("Executing step: %s", step) 

129 kwargs = {} 

130 for key in step.keys(): 

131 if key == "operator": 

132 operator = get_operator(step["operator"]) 

133 logging.info("operator: %s", step["operator"]) 

134 elif isinstance(step[key], dict) and "operator" in step[key]: 

135 logging.debug("Recursing into argument: %s", key) 

136 kwargs[key] = _step_parser(step[key], step_input) 

137 else: 

138 kwargs[key] = step[key] 

139 logging.debug("args: %s", kwargs) 

140 logging.debug("step_input: %s", step_input) 

141 # If first argument of operator is explicitly defined, use that rather 

142 # than step_input. This is known through introspection of the operator. 

143 first_arg = next(iter(inspect.signature(operator).parameters.keys())) 

144 logging.debug("first_arg: %s", first_arg) 

145 if first_arg not in kwargs: 

146 logging.debug("first_arg not in kwargs, using step_input.") 

147 return operator(step_input, **kwargs) 

148 else: 

149 logging.debug("first_arg in kwargs.") 

150 return operator(**kwargs) 

151 

152 

153def create_diagnostic_archive(): 

154 """Create archive for easy download of plots and data.""" 

155 output_directory: Path = Path.cwd() 

156 archive_path = output_directory / "diagnostic.zip" 

157 with zipfile.ZipFile( 

158 archive_path, "w", compression=zipfile.ZIP_DEFLATED 

159 ) as archive: 

160 for file in output_directory.rglob("*"): 

161 # Check the archive doesn't add itself. 

162 if not file.samefile(archive_path): 

163 archive.write(file, arcname=file.relative_to(output_directory)) 

164 

165 

166def execute_recipe( 

167 recipe: dict, 

168 output_directory: Path, 

169 style_file: Path = None, 

170 plot_resolution: int = None, 

171 skip_write: bool = None, 

172) -> None: 

173 """Parse and executes the steps from a recipe file. 

174 

175 Parameters 

176 ---------- 

177 recipe: dict 

178 Parsed recipe. 

179 output_directory: Path 

180 Pathlike indicating desired location of output. 

181 style_file: Path, optional 

182 Path to a style file. 

183 plot_resolution: int, optional 

184 Resolution of plots in dpi. 

185 skip_write: bool, optional 

186 Skip saving processed output alongside plots. 

187 

188 Raises 

189 ------ 

190 FileNotFoundError 

191 The recipe or input file cannot be found. 

192 FileExistsError 

193 The output directory as actually a file. 

194 ValueError 

195 The recipe is not well formed. 

196 TypeError 

197 The provided recipe is not a stream or Path. 

198 """ 

199 # Create output directory. 

200 try: 

201 output_directory.mkdir(parents=True, exist_ok=True) 

202 except (FileExistsError, NotADirectoryError) as err: 

203 logging.error("Output directory is a file. %s", output_directory) 

204 raise err 

205 steps = recipe["steps"] 

206 

207 # Execute the steps in a recipe. 

208 original_working_directory = Path.cwd() 

209 try: 

210 os.chdir(output_directory) 

211 logger = logging.getLogger(__name__) 

212 diagnostic_log = logging.FileHandler( 

213 filename="CSET.log", mode="w", encoding="UTF-8" 

214 ) 

215 diagnostic_log.setFormatter( 

216 logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") 

217 ) 

218 logger.addHandler(diagnostic_log) 

219 # Create metadata file used by some steps. 

220 if style_file: 

221 recipe["style_file_path"] = str(style_file) 

222 if plot_resolution: 

223 recipe["plot_resolution"] = plot_resolution 

224 if skip_write: 

225 recipe["skip_write"] = skip_write 

226 _write_metadata(recipe) 

227 

228 # Execute the recipe. 

229 step_input = None 

230 for step in steps: 

231 step_input = _step_parser(step, step_input) 

232 logger.info("Recipe output:\n%s", step_input) 

233 

234 logger.info("Creating diagnostic archive.") 

235 create_diagnostic_archive() 

236 finally: 

237 os.chdir(original_working_directory)