Coverage for src / CSET / cset_workflow / app / parbake_recipes / bin / parbake.py: 100%
29 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 08:36 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 08:36 +0000
1#!/usr/bin/env python3
2# © Crown copyright, Met Office (2022-2025) and CSET contributors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16"""Run a recipe with the CSET CLI."""
18import json
19import os
20import subprocess
21from base64 import b64decode
22from pathlib import Path
24from CSET.recipes import load_recipes
27def parbake_all(
28 variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool
29) -> int:
30 """Generate and parbake recipes from configuration."""
31 # Gather all recipes into a big list.
32 recipes = list(load_recipes(variables))
33 # Check we have some recipes enabled.
34 if not recipes:
35 raise ValueError("At least one recipe should be enabled.")
36 # Parbake all recipes remaining after filtering aggregation recipes.
37 recipe_count = 0
38 for recipe in filter(lambda r: r.aggregation == aggregation, recipes):
39 print(f"Parbaking {recipe}", flush=True)
40 recipe.parbake(rose_datac, share_dir)
41 recipe_count += 1
42 return recipe_count
45def main():
46 """Program entry point."""
47 # Gather configuration from environment.
48 variables = json.loads(b64decode(os.environ["ENCODED_ROSE_SUITE_VARIABLES"]))
49 rose_datac = Path(os.environ["ROSE_DATAC"])
50 share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"])
51 aggregation = bool(os.getenv("DO_CASE_AGGREGATION"))
52 # Parbake recipes for cycle.
53 recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation)
55 # If running under cylc, notify cylc of task completion.
56 cylc_workflow_id = os.getenv("CYLC_WORKFLOW_ID")
57 cylc_task_job = os.getenv("CYLC_TASK_JOB")
58 if cylc_workflow_id and cylc_task_job:
59 message_command = [
60 "cylc",
61 "message",
62 "--",
63 cylc_workflow_id,
64 cylc_task_job,
65 ]
66 if recipe_count:
67 subprocess.run(message_command + ["start baking"], check=True)
68 else:
69 subprocess.run(message_command + ["skip baking"], check=True)
72if __name__ == "__main__": # pragma: no cover
73 main()