|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import click |
| 4 | +import json |
| 5 | +import sys |
| 6 | +import time |
| 7 | +import traceback |
| 8 | +import requests |
| 9 | +from typing import List, Mapping, Tuple, Dict |
| 10 | +from flytekit.remote import FlyteRemote |
| 11 | +from flytekit.models.core.execution import WorkflowExecutionPhase |
| 12 | +from flytekit.configuration import Config, ImageConfig, SerializationSettings |
| 13 | + |
| 14 | + |
| 15 | +WAIT_TIME = 10 |
| 16 | +MAX_ATTEMPTS = 60 |
| 17 | + |
| 18 | +# This dictionary maps the names found in the flytesnacks manifest to a list of workflow names and |
| 19 | +# inputs. This is so we can progressively cover all priorities in the original flytesnacks manifest, |
| 20 | +# starting with "core". |
| 21 | +FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = { |
| 22 | + "core": [ |
| 23 | + ("core.control_flow.chain_tasks.chain_tasks_wf", {}), |
| 24 | + ("core.control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), |
| 25 | + ("core.control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), |
| 26 | + # Workflows that use nested executions cannot be launched via flyteremote. |
| 27 | + # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482. |
| 28 | + # ("core.control_flow.run_conditions.multiplier", {"my_input": 0.5}), |
| 29 | + # ("core.control_flow.run_conditions.multiplier_2", {"my_input": 10}), |
| 30 | + # ("core.control_flow.run_conditions.multiplier_3", {"my_input": 5}), |
| 31 | + # ("core.control_flow.run_conditions.basic_boolean_wf", {"seed": 5}), |
| 32 | + # ("core.control_flow.run_conditions.bool_input_wf", {"b": True}), |
| 33 | + # ("core.control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), |
| 34 | + # ("core.control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), |
| 35 | + # ("core.control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), |
| 36 | + ("core.control_flow.subworkflows.parent_wf", {"a": 3}), |
| 37 | + ("core.control_flow.subworkflows.nested_parent_wf", {"a": 3}), |
| 38 | + ("core.flyte_basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), |
| 39 | + # Getting a 403 for the wikipedia image |
| 40 | + # ("core.flyte_basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), |
| 41 | + ("core.flyte_basics.folders.download_and_rotate", {}), |
| 42 | + ("core.flyte_basics.hello_world.my_wf", {}), |
| 43 | + ("core.flyte_basics.lp.my_wf", {"val": 4}), |
| 44 | + ("core.flyte_basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), |
| 45 | + ("core.flyte_basics.named_outputs.my_wf", {}), |
| 46 | + # # Getting a 403 for the wikipedia image |
| 47 | + # # ("core.flyte_basics.reference_task.wf", {}), |
| 48 | + ("core.type_system.custom_objects.wf", {"x": 10, "y": 20}), |
| 49 | + # Enums are not supported in flyteremote |
| 50 | + # ("core.type_system.enums.enum_wf", {"c": "red"}), |
| 51 | + ("core.type_system.schema.df_wf", {"a": 42}), |
| 52 | + ("core.type_system.typed_schema.wf", {}), |
| 53 | + ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), |
| 54 | + ], |
| 55 | +} |
| 56 | + |
| 57 | + |
| 58 | +def run_launch_plan(remote, version, workflow_name, inputs): |
| 59 | + print(f"Fetching workflow={workflow_name} and version={version}") |
| 60 | + lp = remote.fetch_workflow(name=workflow_name, version=version) |
| 61 | + return remote.execute(lp, inputs=inputs, wait=False) |
| 62 | + |
| 63 | + |
| 64 | +def schedule_workflow_group( |
| 65 | + tag: str, |
| 66 | + workflow_group: str, |
| 67 | + remote: FlyteRemote, |
| 68 | + terminate_workflow_on_failure: bool, |
| 69 | +) -> bool: |
| 70 | + """ |
| 71 | + Schedule all workflows executions and return True if all executions succeed, otherwise |
| 72 | + return False. |
| 73 | + """ |
| 74 | + workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(workflow_group, []) |
| 75 | + |
| 76 | + launch_plans = [ |
| 77 | + run_launch_plan(remote, tag, workflow[0], workflow[1]) for workflow in workflows |
| 78 | + ] |
| 79 | + |
| 80 | + # Wait for all launch plans to finish |
| 81 | + attempt = 0 |
| 82 | + while attempt == 0 or ( |
| 83 | + not all([lp.is_complete for lp in launch_plans]) and attempt < MAX_ATTEMPTS |
| 84 | + ): |
| 85 | + attempt += 1 |
| 86 | + print( |
| 87 | + f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" |
| 88 | + ) |
| 89 | + time.sleep(WAIT_TIME) |
| 90 | + # Need to sync to refresh status of executions |
| 91 | + for lp in launch_plans: |
| 92 | + print(f"About to sync execution_id={lp.id.name}") |
| 93 | + remote.sync(lp) |
| 94 | + |
| 95 | + # Report result of each launch plan |
| 96 | + for lp in launch_plans: |
| 97 | + print(lp) |
| 98 | + |
| 99 | + # Collect all failing launch plans |
| 100 | + non_succeeded_lps = [ |
| 101 | + lp |
| 102 | + for lp in launch_plans |
| 103 | + if lp.closure.phase != WorkflowExecutionPhase.SUCCEEDED |
| 104 | + ] |
| 105 | + |
| 106 | + if len(non_succeeded_lps) == 0: |
| 107 | + print("All executions succeeded.") |
| 108 | + return True |
| 109 | + |
| 110 | + print("Failed executions:") |
| 111 | + # Report failing cases |
| 112 | + for lp in non_succeeded_lps: |
| 113 | + print(f" workflow={lp.spec.launch_plan.name}, execution_id={lp.id.name}") |
| 114 | + if terminate_workflow_on_failure: |
| 115 | + remote.terminate(lp, "aborting execution scheduled in functional test") |
| 116 | + return False |
| 117 | + |
| 118 | + |
| 119 | +def valid(workflow_group): |
| 120 | + """ |
| 121 | + Return True if a workflow group is contained in FLYTESNACKS_WORKFLOW_GROUPS, |
| 122 | + False otherwise. |
| 123 | + """ |
| 124 | + return workflow_group in FLYTESNACKS_WORKFLOW_GROUPS.keys() |
| 125 | + |
| 126 | + |
| 127 | +def run( |
| 128 | + flytesnacks_release_tag: str, |
| 129 | + priorities: List[str], |
| 130 | + config_file_path, |
| 131 | + terminate_workflow_on_failure: bool, |
| 132 | +) -> List[Dict[str, str]]: |
| 133 | + remote = FlyteRemote( |
| 134 | + Config.auto(config_file=config_file_path), |
| 135 | + default_project="flytesnacks", |
| 136 | + default_domain="development", |
| 137 | + ) |
| 138 | + |
| 139 | + # For a given release tag and priority, this function filters the workflow groups from the flytesnacks |
| 140 | + # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. |
| 141 | + manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \ |
| 142 | + f"{flytesnacks_release_tag}/cookbook/flyte_tests_manifest.json" |
| 143 | + r = requests.get(manifest_url) |
| 144 | + parsed_manifest = r.json() |
| 145 | + |
| 146 | + workflow_groups = [ |
| 147 | + group["name"] for group in parsed_manifest if group["priority"] in priorities |
| 148 | + ] |
| 149 | + results = [] |
| 150 | + for workflow_group in workflow_groups: |
| 151 | + if not valid(workflow_group): |
| 152 | + results.append( |
| 153 | + { |
| 154 | + "label": workflow_group, |
| 155 | + "status": "coming soon", |
| 156 | + "color": "grey", |
| 157 | + } |
| 158 | + ) |
| 159 | + continue |
| 160 | + |
| 161 | + try: |
| 162 | + workflows_succeeded = schedule_workflow_group( |
| 163 | + flytesnacks_release_tag, |
| 164 | + workflow_group, |
| 165 | + remote, |
| 166 | + terminate_workflow_on_failure, |
| 167 | + ) |
| 168 | + except Exception: |
| 169 | + print(traceback.format_exc()) |
| 170 | + workflows_succeeded = False |
| 171 | + |
| 172 | + if workflows_succeeded: |
| 173 | + background_color = "green" |
| 174 | + status = "passing" |
| 175 | + else: |
| 176 | + background_color = "red" |
| 177 | + status = "failing" |
| 178 | + |
| 179 | + # Workflow groups can be only in one of three states: |
| 180 | + # 1. passing: this indicates all the workflow executions for that workflow group |
| 181 | + # executed successfully |
| 182 | + # 2. failing: this state indicates that at least one execution failed in that |
| 183 | + # workflow group |
| 184 | + # 3. coming soon: this state is used to indicate that the workflow group was not |
| 185 | + # implemented yet. |
| 186 | + # |
| 187 | + # Each state has a corresponding status and color to be used in the badge for that |
| 188 | + # workflow group. |
| 189 | + result = { |
| 190 | + "label": workflow_group, |
| 191 | + "status": status, |
| 192 | + "color": background_color, |
| 193 | + } |
| 194 | + results.append(result) |
| 195 | + return results |
| 196 | + |
| 197 | + |
| 198 | +@click.command() |
| 199 | +@click.option( |
| 200 | + "--return_non_zero_on_failure", |
| 201 | + default=False, |
| 202 | + is_flag=True, |
| 203 | + help="Return a non-zero exit status if any workflow fails", |
| 204 | +) |
| 205 | +@click.option( |
| 206 | + "--terminate_workflow_on_failure", |
| 207 | + default=False, |
| 208 | + is_flag=True, |
| 209 | + help="Abort failing workflows upon exit", |
| 210 | +) |
| 211 | +@click.argument("flytesnacks_release_tag") |
| 212 | +@click.argument("priorities") |
| 213 | +@click.argument("config_file") |
| 214 | +def cli( |
| 215 | + flytesnacks_release_tag, |
| 216 | + priorities, |
| 217 | + config_file, |
| 218 | + return_non_zero_on_failure, |
| 219 | + terminate_workflow_on_failure, |
| 220 | +): |
| 221 | + print(f"return_non_zero_on_failure={return_non_zero_on_failure}") |
| 222 | + results = run( |
| 223 | + flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure |
| 224 | + ) |
| 225 | + |
| 226 | + # Write a json object in its own line describing the result of this run to stdout |
| 227 | + print(f"Result of run:\n{json.dumps(results)}") |
| 228 | + |
| 229 | + # Return a non-zero exit code if core fails |
| 230 | + if return_non_zero_on_failure: |
| 231 | + for result in results: |
| 232 | + if result["status"] not in ("passing", "coming soon"): |
| 233 | + sys.exit(1) |
| 234 | + |
| 235 | + |
| 236 | +if __name__ == "__main__": |
| 237 | + cli() |
0 commit comments