Coverage for src / CSET / cset_workflow / app / parbake_recipes / bin / parbake.py: 100%
28 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-10 16:20 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-10 16:20 +0000
1#!/usr/bin/env python3
2# © Crown copyright, Met Office (2022-2025) and CSET contributors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16"""Run a recipe with the CSET CLI."""
18import json
19import os
20import subprocess
21from base64 import b64decode
22from pathlib import Path
24from CSET.recipes import load_recipes
27def parbake_all(
28 variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool
29) -> int:
30 """Generate and parbake recipes from configuration."""
31 # Gather all recipes into a big list.
32 recipes = list(load_recipes(variables))
33 # Check we have some recipes enabled.
34 if not recipes:
35 raise ValueError("At least one recipe should be enabled.")
36 # Parbake all recipes remaining after filtering aggregation recipes.
37 recipe_count = 0
38 for recipe in filter(lambda r: r.aggregation == aggregation, recipes):
39 print(f"Parbaking {recipe}", flush=True)
40 recipe.parbake(rose_datac, share_dir)
41 recipe_count += 1
42 return recipe_count
45def main():
46 """Program entry point."""
47 # Gather configuration from environment.
48 variables = json.loads(b64decode(os.environ["ENCODED_ROSE_SUITE_VARIABLES"]))
49 rose_datac = Path(os.environ["ROSE_DATAC"])
50 share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"])
51 aggregation = bool(os.getenv("DO_CASE_AGGREGATION"))
52 # Parbake recipes for cycle.
53 recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation)
55 # If running under cylc, skip baking if we have no recipes.
56 cylc_workflow_id = os.getenv("CYLC_WORKFLOW_ID")
57 cylc_cycle = os.getenv("CYLC_TASK_CYCLE_POINT")
58 if cylc_workflow_id and cylc_cycle and not recipe_count:
59 print("No recipes needing to be baked; asking cylc to skip baking.")
60 broadcast_command = [
61 "cylc",
62 "broadcast",
63 "--point",
64 cylc_cycle,
65 "--namespace",
66 "bake_aggregation_recipes" if aggregation else "bake_recipes",
67 "--set",
68 "run mode = skip",
69 cylc_workflow_id,
70 ]
71 subprocess.run(broadcast_command, check=True)
74if __name__ == "__main__": # pragma: no cover
75 main()