Coverage for src/CSET/_common.py: 100%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 21:08 +0000

1# © Crown copyright, Met Office (2022-2024) and CSET contributors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Common functionality used across CSET.""" 

16 

17import ast 

18import io 

19import json 

20import logging 

21import re 

22from collections.abc import Iterable 

23from pathlib import Path 

24from textwrap import dedent 

25from typing import Any 

26 

27import ruamel.yaml 

28 

29 

30class ArgumentError(ValueError): 

31 """Provided arguments are not understood.""" 

32 

33 

34def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict: 

35 """Parse a recipe into a python dictionary. 

36 

37 Parameters 

38 ---------- 

39 recipe_yaml: Path | str 

40 Path to a file containing, or a string of, a recipe's YAML describing 

41 the operators that need running. If a Path is provided it is opened and 

42 read. 

43 variables: dict 

44 Dictionary of recipe variables. If None templating is not attempted. 

45 

46 Returns 

47 ------- 

48 recipe: dict 

49 The recipe as a python dictionary. 

50 

51 Raises 

52 ------ 

53 ValueError 

54 If the recipe is invalid. E.g. invalid YAML, missing any steps, etc. 

55 TypeError 

56 If recipe_yaml isn't a Path or string. 

57 KeyError 

58 If needed recipe variables are not supplied. 

59 

60 Examples 

61 -------- 

62 >>> CSET._common.parse_recipe(Path("myrecipe.yaml")) 

63 {'steps': [{'operator': 'misc.noop'}]} 

64 """ 

65 # Ensure recipe_yaml is something the YAML parser can read. 

66 if isinstance(recipe_yaml, str): 

67 recipe_yaml = io.StringIO(recipe_yaml) 

68 elif not isinstance(recipe_yaml, Path): 

69 raise TypeError("recipe_yaml must be a str or Path.") 

70 

71 # Parse the recipe YAML. 

72 with ruamel.yaml.YAML(typ="safe", pure=True) as yaml: 

73 try: 

74 recipe = yaml.load(recipe_yaml) 

75 except ruamel.yaml.parser.ParserError as err: 

76 raise ValueError("ParserError: Invalid YAML") from err 

77 

78 logging.debug("Recipe before templating:\n%s", recipe) 

79 check_recipe_has_steps(recipe) 

80 

81 if variables is not None: 

82 logging.debug("Recipe variables: %s", variables) 

83 recipe = template_variables(recipe, variables) 

84 

85 logging.debug("Recipe after templating:\n%s", recipe) 

86 return recipe 

87 

88 

89def check_recipe_has_steps(recipe: dict): 

90 """Check a recipe has the minimum required steps. 

91 

92 Checking that the recipe actually has some steps, and providing helpful 

93 error messages otherwise. We must have at least a steps step, as that 

94 reads the raw data. 

95 

96 Parameters 

97 ---------- 

98 recipe: dict 

99 The recipe as a python dictionary. 

100 

101 Raises 

102 ------ 

103 ValueError 

104 If the recipe is invalid. E.g. invalid YAML, missing any steps, etc. 

105 TypeError 

106 If recipe isn't a dict. 

107 KeyError 

108 If needed recipe variables are not supplied. 

109 """ 

110 if not isinstance(recipe, dict): 

111 raise TypeError("Recipe must contain a mapping.") 

112 if "steps" not in recipe: 

113 raise ValueError("Recipe must contain a 'steps' key.") 

114 try: 

115 if len(recipe["steps"]) < 1: 

116 raise ValueError("Recipe must have at least 1 step.") 

117 except TypeError as err: 

118 raise ValueError("'steps' key must contain a sequence of steps.") from err 

119 

120 

121def slugify(s: str) -> str: 

122 """Turn a string into a version that can be used everywhere. 

123 

124 The resultant string will only consist of a-z, 0-9, dots, dashes, and 

125 underscores. 

126 """ 

127 return re.sub(r"[^a-z0-9\._-]+", "_", s.casefold()).strip("_") 

128 

129 

130def get_recipe_metadata() -> dict: 

131 """Get the metadata of the running recipe.""" 

132 try: 

133 with open("meta.json", "rt", encoding="UTF-8") as fp: 

134 return json.load(fp) 

135 except FileNotFoundError: 

136 meta = {} 

137 with open("meta.json", "wt", encoding="UTF-8") as fp: 

138 json.dump(meta, fp, indent=2) 

139 return {} 

140 

141 

142def parse_variable_options( 

143 arguments: list[str], input_dir: str | list[str] | None = None 

144) -> dict: 

145 """Parse a list of arguments into a dictionary of variables. 

146 

147 The variable name arguments start with two hyphen-minus (`--`), consisting 

148 of only capital letters (`A`-`Z`) and underscores (`_`). While the variable 

149 name is restricted, the value of the variable can be any string. 

150 

151 Parameters 

152 ---------- 

153 arguments: list[str] 

154 List of arguments, e.g: `["--LEVEL", "2", "--STASH=m01s01i001"]` 

155 input_dir: str | list[str], optional 

156 List of input directories to add into the returned variables. 

157 

158 Returns 

159 ------- 

160 recipe_variables: dict 

161 Dictionary keyed with the variable names. 

162 

163 Raises 

164 ------ 

165 ValueError 

166 If any arguments cannot be parsed. 

167 """ 

168 # Convert --input_dir=... to INPUT_PATHS recipe variable. 

169 if input_dir is not None: 

170 abs_paths = [str(Path(p).absolute()) for p in iter_maybe(input_dir)] 

171 arguments.append(f"--INPUT_PATHS={abs_paths}") 

172 recipe_variables = {} 

173 i = 0 

174 while i < len(arguments): 

175 if re.fullmatch(r"--[A-Z_]+=.*", arguments[i]): 

176 key, value = arguments[i].split("=", 1) 

177 elif re.fullmatch(r"--[A-Z_]+", arguments[i]): 

178 try: 

179 key = arguments[i].strip("-") 

180 value = arguments[i + 1] 

181 except IndexError as err: 

182 raise ArgumentError(f"No value for variable {arguments[i]}") from err 

183 i += 1 

184 else: 

185 raise ArgumentError(f"Unknown argument: {arguments[i]}") 

186 try: 

187 # Remove quotes from arguments, in case left in CSET_ADDOPTS. 

188 if re.fullmatch(r"""["'].+["']""", value): 

189 value = value[1:-1] 

190 recipe_variables[key.strip("-")] = ast.literal_eval(value) 

191 # Capture the many possible exceptions from ast.literal_eval 

192 except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): 

193 recipe_variables[key.strip("-")] = value 

194 i += 1 

195 return recipe_variables 

196 

197 

198def template_variables(recipe: dict | list, variables: dict) -> dict: 

199 """Insert variables into recipe. 

200 

201 Parameters 

202 ---------- 

203 recipe: dict | list 

204 The recipe as a python dictionary. It is updated in-place. 

205 variables: dict 

206 Dictionary of variables for the recipe. 

207 

208 Returns 

209 ------- 

210 recipe: dict 

211 Filled recipe as a python dictionary. 

212 

213 Raises 

214 ------ 

215 KeyError 

216 If needed recipe variables are not supplied. 

217 """ 

218 if isinstance(recipe, dict): 

219 index = recipe.keys() 

220 elif isinstance(recipe, list): 

221 # We have to handle lists for when we have one inside a recipe. 

222 index = range(len(recipe)) 

223 else: 

224 raise TypeError("recipe must be a dict or list.", recipe) 

225 

226 for i in index: 

227 if isinstance(recipe[i], (dict, list)): 

228 recipe[i] = template_variables(recipe[i], variables) 

229 elif isinstance(recipe[i], str): 

230 recipe[i] = replace_template_variable(recipe[i], variables) 

231 return recipe 

232 

233 

234def replace_template_variable(s: str, variables: dict[str, Any]): 

235 """Fill all variable placeholders in the string.""" 

236 for var_name, var_value in variables.items(): 

237 placeholder = f"${var_name}" 

238 # If the value is just the placeholder we directly overwrite it 

239 # to keep the value type. 

240 if s == placeholder: 

241 # Specially handle Paths and lists of Paths. 

242 if isinstance(var_value, Path): 

243 var_value = str(var_value) 

244 if ( 

245 isinstance(var_value, list) 

246 and var_value 

247 and isinstance(var_value[0], Path) 

248 ): 

249 var_value = [str(p) for p in var_value] 

250 s = var_value 

251 # We have replaced the whole string, so stop here to avoid 

252 # interpreting the new value. 

253 break 

254 else: 

255 s = s.replace(placeholder, str(var_value)) 

256 if isinstance(s, str) and re.match(r"^.*\$[A-Z_].*", s): 

257 raise KeyError("Variable without a value.", s) 

258 return s 

259 

260 

261################################################################################ 

262# Templating code taken from the simple_template package under the 0BSD licence. 

263# Original at https://github.com/Fraetor/simple_template 

264################################################################################ 

265 

266 

267class TemplateError(KeyError): 

268 """Rendering a template failed due a placeholder without a value.""" 

269 

270 

271def render(template: str, /, **variables) -> str: 

272 """Render the template with the provided variables. 

273 

274 The template should contain placeholders that will be replaced. These 

275 placeholders consist of the placeholder name within double curly braces. The 

276 name of the placeholder should be a valid python identifier. Whitespace 

277 between the braces and the name is ignored. E.g.: `{{ placeholder_name }}` 

278 

279 An exception will be raised if there are placeholders without corresponding 

280 values. It is acceptable to provide unused values; they will be ignored. 

281 

282 Parameters 

283 ---------- 

284 template: str 

285 Template to fill with variables. 

286 

287 **variables: Any 

288 Keyword arguments for the placeholder values. The argument name should 

289 be the same as the placeholder's name. You can unpack a dictionary of 

290 value with `render(template, **my_dict)`. 

291 

292 Returns 

293 ------- 

294 rendered_template: str 

295 Filled template. 

296 

297 Raises 

298 ------ 

299 TemplateError 

300 Value not given for a placeholder in the template. 

301 TypeError 

302 If the template is not a string, or a variable cannot be casted to a 

303 string. 

304 

305 Examples 

306 -------- 

307 >>> template = "<p>Hello {{myplaceholder}}!</p>" 

308 >>> simple_template.render(template, myplaceholder="World") 

309 "<p>Hello World!</p>" 

310 """ 

311 

312 def isidentifier(s: str): 

313 return s.isidentifier() 

314 

315 def extract_placeholders(): 

316 matches = re.finditer(r"{{\s*([^}]+)\s*}}", template) 

317 unique_names = {match.group(1) for match in matches} 

318 return filter(isidentifier, unique_names) 

319 

320 def substitute_placeholder(name): 

321 try: 

322 value = str(variables[name]) 

323 except KeyError as err: 

324 raise TemplateError("Placeholder missing value", name) from err 

325 pattern = r"{{\s*%s\s*}}" % re.escape(name) 

326 return re.sub(pattern, value, template) 

327 

328 for name in extract_placeholders(): 

329 template = substitute_placeholder(name) 

330 return template 

331 

332 

333def render_file(template_path: str, /, **variables) -> str: 

334 """Render a template directly from a file. 

335 

336 Otherwise the same as `simple_template.render()`. 

337 

338 Examples 

339 -------- 

340 >>> simple_template.render_file("/path/to/template.html", myplaceholder="World") 

341 "<p>Hello World!</p>" 

342 """ 

343 with open(template_path, "rt", encoding="UTF-8") as fp: 

344 template = fp.read() 

345 return render(template, **variables) 

346 

347 

348def iter_maybe(thing) -> Iterable: 

349 """Ensure thing is Iterable. Strings count as atoms.""" 

350 if isinstance(thing, Iterable) and not isinstance(thing, str): 

351 return thing 

352 return (thing,) 

353 

354 

355def human_sorted(iterable: Iterable, reverse: bool = False) -> list: 

356 """Sort such numbers within strings are sorted correctly.""" 

357 # Adapted from https://nedbatchelder.com/blog/200712/human_sorting.html 

358 

359 def alphanum_key(s): 

360 """Turn a string into a list of string and number chunks. 

361 

362 >>> alphanum_key("z23a") 

363 ["z", 23, "a"] 

364 """ 

365 try: 

366 return [int(c) if c.isdecimal() else c for c in re.split(r"(\d+)", s)] 

367 except TypeError: 

368 return s 

369 

370 return sorted(iterable, key=alphanum_key, reverse=reverse) 

371 

372 

373def combine_dicts(d1: dict, d2: dict) -> dict: 

374 """Recursively combines two dictionaries. 

375 

376 Duplicate atoms favour the second dictionary. 

377 """ 

378 # Update existing keys. 

379 for key in d1.keys() & d2.keys(): 

380 if isinstance(d1[key], dict): 

381 d1[key] = combine_dicts(d1[key], d2[key]) 

382 else: 

383 d1[key] = d2[key] 

384 # Add any new keys. 

385 for key in d2.keys() - d1.keys(): 

386 d1[key] = d2[key] 

387 return d1 

388 

389 

390def sort_dict(d: dict) -> dict: 

391 """Recursively sort a dictionary.""" 

392 # Thank you to https://stackoverflow.com/a/47882384 

393 return { 

394 k: sort_dict(v) if isinstance(v, dict) else v 

395 for k, v in human_sorted(d.items()) 

396 } 

397 

398 

399def sstrip(text): 

400 """Dedent and strip text. 

401 

402 Parameters 

403 ---------- 

404 text: str 

405 The string to strip. 

406 

407 Examples 

408 -------- 

409 >>> print(sstrip(''' 

410 ... foo 

411 ... bar 

412 ... baz 

413 ... ''')) 

414 foo 

415 bar 

416 baz 

417 """ 

418 return dedent(text).strip()