Coverage for src / CSET / cset_workflow / app / parbake_recipes / bin / parbake.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-24 08:36 +0000

1#!/usr/bin/env python3 

2# © Crown copyright, Met Office (2022-2025) and CSET contributors. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16"""Run a recipe with the CSET CLI.""" 

17 

18import json 

19import os 

20import subprocess 

21from base64 import b64decode 

22from pathlib import Path 

23 

24from CSET.recipes import load_recipes 

25 

26 

27def parbake_all( 

28 variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool 

29) -> int: 

30 """Generate and parbake recipes from configuration.""" 

31 # Gather all recipes into a big list. 

32 recipes = list(load_recipes(variables)) 

33 # Check we have some recipes enabled. 

34 if not recipes: 

35 raise ValueError("At least one recipe should be enabled.") 

36 # Parbake all recipes remaining after filtering aggregation recipes. 

37 recipe_count = 0 

38 for recipe in filter(lambda r: r.aggregation == aggregation, recipes): 

39 print(f"Parbaking {recipe}", flush=True) 

40 recipe.parbake(rose_datac, share_dir) 

41 recipe_count += 1 

42 return recipe_count 

43 

44 

45def main(): 

46 """Program entry point.""" 

47 # Gather configuration from environment. 

48 variables = json.loads(b64decode(os.environ["ENCODED_ROSE_SUITE_VARIABLES"])) 

49 rose_datac = Path(os.environ["ROSE_DATAC"]) 

50 share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"]) 

51 aggregation = bool(os.getenv("DO_CASE_AGGREGATION")) 

52 # Parbake recipes for cycle. 

53 recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation) 

54 

55 # If running under cylc, notify cylc of task completion. 

56 cylc_workflow_id = os.getenv("CYLC_WORKFLOW_ID") 

57 cylc_task_job = os.getenv("CYLC_TASK_JOB") 

58 if cylc_workflow_id and cylc_task_job: 

59 message_command = [ 

60 "cylc", 

61 "message", 

62 "--", 

63 cylc_workflow_id, 

64 cylc_task_job, 

65 ] 

66 if recipe_count: 

67 subprocess.run(message_command + ["start baking"], check=True) 

68 else: 

69 subprocess.run(message_command + ["skip baking"], check=True) 

70 

71 

72if __name__ == "__main__": # pragma: no cover 

73 main()