From b5b36a1d315f43370108c70bd1b361630f82707a Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Fri, 29 Aug 2025 07:46:38 +0800 Subject: [PATCH 1/5] add monkeypatch for v2 Signed-off-by: Nelson Chen --- flytekit/clis/sdk_in_container/pyflyte.py | 25 ++++- flytekit/migration/task.py | 131 ++++++++++++++++++++++ flytekit/migration/workflow.py | 13 +++ 3 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 flytekit/migration/task.py create mode 100644 flytekit/migration/workflow.py diff --git a/flytekit/clis/sdk_in_container/pyflyte.py b/flytekit/clis/sdk_in_container/pyflyte.py index a492e1cba8..6c59cc3cf8 100644 --- a/flytekit/clis/sdk_in_container/pyflyte.py +++ b/flytekit/clis/sdk_in_container/pyflyte.py @@ -2,6 +2,11 @@ import typing import rich_click as click +import flyte +import flytekit.core.task +from flyte import Image, Resources, TaskEnvironment +from flyte._doc import Documentation +from flyte._task import AsyncFunctionTaskTemplate, P, R from flytekit import configuration from flytekit.clis.sdk_in_container.backfill import backfill @@ -54,8 +59,15 @@ type=str, help="Path to config file for use within container", ) +@click.option( + "-v2", + "--v2cmd", + required=False, + is_flag=True, + help="Use the v2 command set. This is the default behavior, but this flag is provided for backwards compatibility with older scripts that may have used the v1 command set (which is now deprecated).", +) @click.pass_context -def main(ctx, pkgs: typing.List[str], config: str, verbose: int): +def main(ctx, pkgs: typing.List[str], config: str, verbose: int, v2cmd: bool): """ Entrypoint for all the user commands. """ @@ -77,6 +89,17 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: int): if pkgs is None: pkgs = [] + if v2cmd: + from flytekit.migration.task import task_shim as flyte_sdk_task_shim + flytekit.task = flyte_sdk_task_shim + + env = TaskEnvironment( + name="flytekit", + resources=Resources(cpu=0.8, memory="800Mi"), + image=Image.from_debian_base().with_apt_packages("vim").with_pip_packages("flytekit", "pandas"), + ) + + ctx.obj[CTX_PACKAGES] = pkgs ctx.obj[CTX_VERBOSE] = verbose diff --git a/flytekit/migration/task.py b/flytekit/migration/task.py new file mode 100644 index 0000000000..3735e78ffc --- /dev/null +++ b/flytekit/migration/task.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import datetime +import inspect +import os +from functools import partial, update_wrapper +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, overload +from typing import Literal as L + +from typing_extensions import ParamSpec # type: ignore + +import flytekit +from flytekit.core import launch_plan as _annotated_launchplan +from flytekit.core import workflow as _annotated_workflow +from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin +from flytekit.core.cache import Cache, VersionParameters +from flytekit.core.interface import Interface, output_name_generator, transform_function_to_interface +from flytekit.core.pod_template import PodTemplate +from flytekit.core.python_function_task import AsyncPythonFunctionTask, EagerAsyncPythonFunctionTask, PythonFunctionTask +from flytekit.core.reference_entity import ReferenceEntity, TaskReference +from flytekit.core.resources import Resources +from flytekit.core.utils import str2bool +from flytekit.deck import DeckField +from flytekit.extras.accelerators import BaseAccelerator +from flytekit.image_spec.image_spec import ImageSpec +from flytekit.interactive import vscode +from flytekit.interactive.constants import FLYTE_ENABLE_VSCODE_KEY +from flytekit.models.documentation import Documentation +from flytekit.models.security import Secret + +import flyte +from flyte import Image, Resources, TaskEnvironment +from flyte._doc import Documentation +from flyte._task import AsyncFunctionTaskTemplate, P, R + +P = ParamSpec("P") +T = TypeVar("T") +FuncOut = TypeVar("FuncOut") + +def task_shim( + _task_function: Optional[Callable[P, FuncOut]] = None, + task_config: Optional[T] = None, + cache: Union[bool, Cache] = False, + retries: int = 0, + interruptible: Optional[bool] = None, + deprecated: str = "", + timeout: Union[datetime.timedelta, int] = 0, + container_image: Optional[Union[str, ImageSpec]] = None, + environment: Optional[Dict[str, str]] = None, + requests: Optional[Resources] = None, + limits: Optional[Resources] = None, + secret_requests: Optional[List[Secret]] = None, + execution_mode: PythonFunctionTask.ExecutionBehavior = PythonFunctionTask.ExecutionBehavior.DEFAULT, + node_dependency_hints: Optional[ + Iterable[ + Union[ + PythonFunctionTask, + _annotated_launchplan.LaunchPlan, + _annotated_workflow.WorkflowBase, + ] + ] + ] = None, + task_resolver: Optional[TaskResolverMixin] = None, + docs: Optional[Documentation] = None, + disable_deck: Optional[bool] = None, + enable_deck: Optional[bool] = None, + deck_fields: Optional[Tuple[DeckField, ...]] = ( + DeckField.SOURCE_CODE, + DeckField.DEPENDENCIES, + DeckField.TIMELINE, + DeckField.INPUT, + DeckField.OUTPUT, + ), + pod_template: Optional[PodTemplate] = None, + pod_template_name: Optional[str] = None, + accelerator: Optional[BaseAccelerator] = None, + pickle_untyped: bool = False, + shared_memory: Optional[Union[L[True], str]] = None, + resources: Optional[Resources] = None, + labels: Optional[dict[str, str]] = None, + annotations: Optional[dict[str, str]] = None, + **kwargs, +) -> Union[AsyncFunctionTaskTemplate, Callable[P, R]]: + """ + Shim to allow using flytekit configuration to run flyte-sdk tasks. + Converts flytekit-native config to flyte-sdk compatible task. + """ + def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]: + # Convert flytekit-native types to flyte-sdk compatible types if needed + # (This is a placeholder for actual conversion logic if required) + # For now, we just call the main task function, but this is where bridging logic would go. + plugin_config = task_config + _pod_template = ( + flyte.PodTemplate( + pod_spec=pod_template.pod_spec, + primary_container_name=pod_template.primary_container_name, + labels=pod_template.labels, + annotations=pod_template.annotations, + ) + if pod_template + else None + ) + + if isinstance(container_image, flytekit.ImageSpec): + image = Image.from_debian_base() + if container_image.apt_packages: + image = image.with_apt_packages(*container_image.apt_packages) + pip_packages = ["flytekit"] + if container_image.packages: + pip_packages.extend(container_image.packages) + image = image.with_pip_packages(*pip_packages) + elif isinstance(container_image, str): + image = Image.from_base(container_image).with_pip_packages("flyte") + else: + image = Image.from_debian_base().with_pip_packages("flytekit") + + _docs = Documentation(description=docs.short_description) if docs else None + + env = TaskEnvironment( + name="flytekit", + resources=resources, + image=image, + cache="enabled" if cache else "disable", + plugin_config=plugin_config, + ) + return env.task(retries=retries, pod_template=pod_template_name or _pod_template, docs=_docs)(fn) + + + if _task_function is not None: + return wrapper(_task_function) + return wrapper diff --git a/flytekit/migration/workflow.py b/flytekit/migration/workflow.py new file mode 100644 index 0000000000..cdbce6d542 --- /dev/null +++ b/flytekit/migration/workflow.py @@ -0,0 +1,13 @@ +import flytekit + +from flyte import Image, Resources, TaskEnvironment + +env = TaskEnvironment( + name="flytekit", + resources=Resources(cpu=0.8, memory="800Mi"), + image=Image.from_debian_base().with_apt_packages("vim").with_pip_packages("flytekit", "pandas"), +) + +# TODO: Build subtask's image + +flytekit.workflow = env.task \ No newline at end of file From e996b3494853ed57be5dd682ef928398488b1aeb Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Wed, 17 Sep 2025 01:12:26 +0800 Subject: [PATCH 2/5] v2 migration --- flytekit/clis/sdk_in_container/pyflyte.py | 27 +--- flytekit/clis/sdk_in_container/runv2.py | 96 ++++++++++++ flytekit/migration/task.py | 175 +++++++++++----------- flytekit/migration/workflow.py | 39 ++++- 4 files changed, 222 insertions(+), 115 deletions(-) create mode 100644 flytekit/clis/sdk_in_container/runv2.py diff --git a/flytekit/clis/sdk_in_container/pyflyte.py b/flytekit/clis/sdk_in_container/pyflyte.py index 6c59cc3cf8..65aa3d8b01 100644 --- a/flytekit/clis/sdk_in_container/pyflyte.py +++ b/flytekit/clis/sdk_in_container/pyflyte.py @@ -2,11 +2,6 @@ import typing import rich_click as click -import flyte -import flytekit.core.task -from flyte import Image, Resources, TaskEnvironment -from flyte._doc import Documentation -from flyte._task import AsyncFunctionTaskTemplate, P, R from flytekit import configuration from flytekit.clis.sdk_in_container.backfill import backfill @@ -22,6 +17,7 @@ from flytekit.clis.sdk_in_container.package import package from flytekit.clis.sdk_in_container.register import register from flytekit.clis.sdk_in_container.run import run +from flytekit.clis.sdk_in_container.runv2 import runv2 from flytekit.clis.sdk_in_container.serialize import serialize from flytekit.clis.sdk_in_container.serve import serve from flytekit.clis.sdk_in_container.utils import ErrorHandlingCommand, validate_package @@ -59,15 +55,9 @@ type=str, help="Path to config file for use within container", ) -@click.option( - "-v2", - "--v2cmd", - required=False, - is_flag=True, - help="Use the v2 command set. This is the default behavior, but this flag is provided for backwards compatibility with older scripts that may have used the v1 command set (which is now deprecated).", -) + @click.pass_context -def main(ctx, pkgs: typing.List[str], config: str, verbose: int, v2cmd: bool): +def main(ctx, pkgs: typing.List[str], config: str, verbose: int): """ Entrypoint for all the user commands. """ @@ -89,16 +79,6 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: int, v2cmd: bool): if pkgs is None: pkgs = [] - if v2cmd: - from flytekit.migration.task import task_shim as flyte_sdk_task_shim - flytekit.task = flyte_sdk_task_shim - - env = TaskEnvironment( - name="flytekit", - resources=Resources(cpu=0.8, memory="800Mi"), - image=Image.from_debian_base().with_apt_packages("vim").with_pip_packages("flytekit", "pandas"), - ) - ctx.obj[CTX_PACKAGES] = pkgs ctx.obj[CTX_VERBOSE] = verbose @@ -119,6 +99,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: int, v2cmd: bool): main.add_command(info) main.add_command(get) main.add_command(execute) +main.add_command(runv2) main.epilog get_plugin().configure_pyflyte_cli(main) diff --git a/flytekit/clis/sdk_in_container/runv2.py b/flytekit/clis/sdk_in_container/runv2.py new file mode 100644 index 0000000000..f22904d29e --- /dev/null +++ b/flytekit/clis/sdk_in_container/runv2.py @@ -0,0 +1,96 @@ +from __future__ import annotations +import importlib.util +import json +import sys +from typing import Any, Dict + +import click +import flyte +import flytekit + +from flytekit.migration.task import task_shim +from flytekit.migration.workflow import workflow_shim + + +def _load_module_from_path(path: str): + spec = importlib.util.spec_from_file_location("user_module", path) + if spec is None or spec.loader is None: + raise click.UsageError(f"Cannot load module from: {path}") + mod = importlib.util.module_from_spec(spec) + sys.modules["user_module"] = mod + spec.loader.exec_module(mod) # type: ignore + return mod + + +def _parse_kv(pairs: tuple[str, ...]) -> Dict[str, Any]: + out: Dict[str, Any] = {} + for kv in pairs: + if "=" not in kv: + raise click.UsageError(f"Bad input '{kv}', expected key=value") + k, v = kv.split("=", 1) + # naive coercion + if v.lower() in {"true", "false"}: + out[k] = (v.lower() == "true") + else: + try: + out[k] = int(v) if "." not in v else float(v) + except ValueError: + out[k] = v + return out + + +@click.command("runv2", context_settings={"ignore_unknown_options": True}) +@click.argument("pyfile", type=click.Path(exists=True)) +@click.argument("entity_name") +@click.option("-i", "--input", "inputs_kv", multiple=True, help="key=value pairs") +@click.option("--config", type=click.Path(exists=True), help="Flyte 2 SDK config file") +def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str | None): + """ + pyflyte runv2 xx.py -i a=1 -i b=hello + + Loads the module, applies v2 shims to flytekit decorators, and executes + the selected entity via the Flyte 2 runtime (flyte.run). + """ + # init Flyte 2 + if config: + flyte.init_from_config(config) + else: + flyte.init() + + flytekit.task = task_shim + flytekit.workflow = workflow_shim + + spec = importlib.util.spec_from_file_location("user_module", pyfile) + mod = importlib.util.module_from_spec(spec) + sys.modules["user_module"] = mod + spec.loader.exec_module(mod) # type: ignore + + entity = getattr(mod, entity_name) + inputs = {} + for kv in inputs_kv: + k, v = kv.split("=", 1) + inputs[k] = (v.lower() == "true") if v.lower() in ("true", "false") else ( + float(v) if "." in v else (int(v) if v.isdigit() else v)) + + out = flyte.run(entity, **inputs) + + value = out + for name in ("result", "output", "outputs"): + attr = getattr(value, name, None) + if attr is None: + continue + try: + value = attr() if callable(attr) else attr + except TypeError: + value = attr + + if isinstance(value, dict) and len(value) == 1: + try: + value = next(iter(value.values())) + except Exception: + pass + + try: + click.echo(json.dumps({"result": value}, default=str)) + except Exception: + click.echo(str(value)) \ No newline at end of file diff --git a/flytekit/migration/task.py b/flytekit/migration/task.py index 3735e78ffc..c667d41cb8 100644 --- a/flytekit/migration/task.py +++ b/flytekit/migration/task.py @@ -1,67 +1,80 @@ from __future__ import annotations - import datetime -import inspect -import os -from functools import partial, update_wrapper -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, overload -from typing import Literal as L - -from typing_extensions import ParamSpec # type: ignore +from typing import Callable, Dict, Iterable, List, Literal, Optional, Tuple, Union, Any import flytekit -from flytekit.core import launch_plan as _annotated_launchplan -from flytekit.core import workflow as _annotated_workflow -from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin -from flytekit.core.cache import Cache, VersionParameters -from flytekit.core.interface import Interface, output_name_generator, transform_function_to_interface -from flytekit.core.pod_template import PodTemplate -from flytekit.core.python_function_task import AsyncPythonFunctionTask, EagerAsyncPythonFunctionTask, PythonFunctionTask -from flytekit.core.reference_entity import ReferenceEntity, TaskReference -from flytekit.core.resources import Resources -from flytekit.core.utils import str2bool +from flytekit.core import launch_plan, workflow +from flytekit.core.base_task import T, TaskResolverMixin +from flytekit.core.python_function_task import PythonFunctionTask +from flytekit.core.task import FuncOut from flytekit.deck import DeckField from flytekit.extras.accelerators import BaseAccelerator -from flytekit.image_spec.image_spec import ImageSpec -from flytekit.interactive import vscode -from flytekit.interactive.constants import FLYTE_ENABLE_VSCODE_KEY -from flytekit.models.documentation import Documentation -from flytekit.models.security import Secret -import flyte from flyte import Image, Resources, TaskEnvironment -from flyte._doc import Documentation +from flyte._doc import Documentation as V2Docs from flyte._task import AsyncFunctionTaskTemplate, P, R -P = ParamSpec("P") -T = TypeVar("T") -FuncOut = TypeVar("FuncOut") + +def _to_v2_resources(req: Optional[flytekit.Resources], lim: Optional[flytekit.Resources]) -> Optional[Resources]: + if not req and not lim: + return None + # Pick requests first, then fall back to limits if requests missing. + def pick(getter: Callable[[flytekit.Resources], Any], fallback_getter: Callable[[flytekit.Resources], Any]): + if req and getter(req) is not None: + return getter(req) + if lim and fallback_getter(lim) is not None: + return fallback_getter(lim) + return None + + cpu = pick(lambda r: r.cpu, lambda r: r.cpu) + mem = pick(lambda r: r.mem, lambda r: r.mem) + gpu = pick(lambda r: r.gpu, lambda r: r.gpu) + # Flyte-SDK Resources accepts cpu as float/str, memory as str like "800Mi" + return Resources(cpu=cpu, memory=mem, gpu=gpu) + + +def _to_v2_image(container_image: Optional[Union[str, flytekit.ImageSpec]]) -> Image: + if isinstance(container_image, flytekit.ImageSpec): + img = Image.from_debian_base() + if container_image.apt_packages: + img = img.with_apt_packages(*container_image.apt_packages) + pip_packages = [] + pip_packages.append("flyte") + pip_packages.append("flytekit") + if container_image.packages: + pip_packages.extend(container_image.packages) + return img.with_pip_packages(*pip_packages) + if isinstance(container_image, str): + return Image.from_base(container_image).with_pip_packages("flyte", "flytekit") + # default + return Image.from_debian_base().with_pip_packages("flyte", "flytekit") + def task_shim( _task_function: Optional[Callable[P, FuncOut]] = None, task_config: Optional[T] = None, - cache: Union[bool, Cache] = False, + cache: Union[bool, flytekit.Cache] = False, retries: int = 0, interruptible: Optional[bool] = None, deprecated: str = "", timeout: Union[datetime.timedelta, int] = 0, - container_image: Optional[Union[str, ImageSpec]] = None, + container_image: Optional[Union[str, flytekit.ImageSpec]] = None, environment: Optional[Dict[str, str]] = None, - requests: Optional[Resources] = None, - limits: Optional[Resources] = None, - secret_requests: Optional[List[Secret]] = None, + requests: Optional[flytekit.Resources] = None, + limits: Optional[flytekit.Resources] = None, + secret_requests: Optional[List[flytekit.Secret]] = None, execution_mode: PythonFunctionTask.ExecutionBehavior = PythonFunctionTask.ExecutionBehavior.DEFAULT, node_dependency_hints: Optional[ Iterable[ Union[ - PythonFunctionTask, - _annotated_launchplan.LaunchPlan, - _annotated_workflow.WorkflowBase, + flytekit.PythonFunctionTask, + launch_plan.LaunchPlan, + workflow.WorkflowBase, ] ] ] = None, task_resolver: Optional[TaskResolverMixin] = None, - docs: Optional[Documentation] = None, + docs: Optional[flytekit.Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, deck_fields: Optional[Tuple[DeckField, ...]] = ( @@ -71,61 +84,55 @@ def task_shim( DeckField.INPUT, DeckField.OUTPUT, ), - pod_template: Optional[PodTemplate] = None, + pod_template: Optional[flytekit.PodTemplate] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, pickle_untyped: bool = False, - shared_memory: Optional[Union[L[True], str]] = None, - resources: Optional[Resources] = None, + shared_memory: Optional[Union[Literal[True], str]] = None, + resources: Optional[Resources] = None, # explicit v2 resources passthrough labels: Optional[dict[str, str]] = None, annotations: Optional[dict[str, str]] = None, **kwargs, -) -> Union[AsyncFunctionTaskTemplate, Callable[P, R]]: +) -> Union[AsyncFunctionTaskTemplate, Callable[[Callable[P, R]], AsyncFunctionTaskTemplate]]: """ - Shim to allow using flytekit configuration to run flyte-sdk tasks. - Converts flytekit-native config to flyte-sdk compatible task. + Decorator that mimics flytekit.task but registers a Flyte 2 task under the hood. + Returns a decorator if called with no function; otherwise returns the wrapped task. """ - def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]: - # Convert flytekit-native types to flyte-sdk compatible types if needed - # (This is a placeholder for actual conversion logic if required) - # For now, we just call the main task function, but this is where bridging logic would go. - plugin_config = task_config - _pod_template = ( - flyte.PodTemplate( - pod_spec=pod_template.pod_spec, - primary_container_name=pod_template.primary_container_name, - labels=pod_template.labels, - annotations=pod_template.annotations, - ) - if pod_template - else None - ) - - if isinstance(container_image, flytekit.ImageSpec): - image = Image.from_debian_base() - if container_image.apt_packages: - image = image.with_apt_packages(*container_image.apt_packages) - pip_packages = ["flytekit"] - if container_image.packages: - pip_packages.extend(container_image.packages) - image = image.with_pip_packages(*pip_packages) - elif isinstance(container_image, str): - image = Image.from_base(container_image).with_pip_packages("flyte") - else: - image = Image.from_debian_base().with_pip_packages("flytekit") - - _docs = Documentation(description=docs.short_description) if docs else None - - env = TaskEnvironment( - name="flytekit", - resources=resources, - image=image, - cache="enabled" if cache else "disable", - plugin_config=plugin_config, - ) - return env.task(retries=retries, pod_template=pod_template_name or _pod_template, docs=_docs)(fn) + # Build V2 image/resources + image = _to_v2_image(container_image) + if resources is None: + resources = _to_v2_resources(requests, limits) + + v2_docs = V2Docs(description=getattr(docs, "short_description", None)) if docs else None + + # cache mapping + cache_mode: Literal["enabled", "disabled"] + cache_mode = "enabled" if (cache is True or str(cache).lower() == "true") else "disabled" + + # PodTemplate passthrough: prefer name, else object + pod_tpl = pod_template_name or (pod_template and pod_template.pod_spec and pod_template) or None + + env = TaskEnvironment( + name="flytekit", + resources=resources or Resources(cpu=0.8, memory="800Mi"), + image=image, + cache=cache_mode, + plugin_config=task_config, + env=environment, + ) + + def _decorator(fn: Callable[P, R]) -> AsyncFunctionTaskTemplate: + # You can add retries, timeout, accelerator, secrets mapping here as needed + return env.task( + retries=retries, + pod_template=pod_tpl, + docs=v2_docs, + timeout=timeout if isinstance(timeout, int) else int(timeout.total_seconds()) if timeout else 0, + # You may add accelerator, secrets, interruptible, etc. when Flyte 2 exposes them + )(fn) + # Support both @task and @task() if _task_function is not None: - return wrapper(_task_function) - return wrapper + return _decorator(_task_function) + return _decorator diff --git a/flytekit/migration/workflow.py b/flytekit/migration/workflow.py index cdbce6d542..c956dda8db 100644 --- a/flytekit/migration/workflow.py +++ b/flytekit/migration/workflow.py @@ -1,13 +1,36 @@ -import flytekit +# compat_v2/workflow.py +from __future__ import annotations +from typing import Callable, Optional, Dict, Any +import flytekit +import flyte from flyte import Image, Resources, TaskEnvironment +from flyte._doc import Documentation as V2Docs + -env = TaskEnvironment( - name="flytekit", - resources=Resources(cpu=0.8, memory="800Mi"), - image=Image.from_debian_base().with_apt_packages("vim").with_pip_packages("flytekit", "pandas"), -) +def workflow_shim( + _fn: Optional[Callable[..., Any]] = None, + *, + image: Optional[Image] = None, + resources: Optional[Resources] = None, + docs: Optional[str] = None, +): + """ + Simple replacement for @flytekit.workflow that wraps the Python function + as a Flyte 2 task (pure-Python orchestration). + """ + env = TaskEnvironment( + name="flytekit", + resources=resources or Resources(cpu=0.8, memory="800Mi"), + image=image or Image.from_debian_base().with_pip_packages("flyte", "flytekit"), + ) + v2_docs = V2Docs(description=docs) if docs else None -# TODO: Build subtask's image + def _decorator(fn: Callable[..., Any]): + # Turn the "workflow" into a task that calls user code directly. + # In Flyte 2 you orchestrate with Python (loops/await/gather inside fn). + return env.task(docs=v2_docs)(fn) -flytekit.workflow = env.task \ No newline at end of file + if _fn is not None: + return _decorator(_fn) + return _decorator From 1f0d25c7c216740f15fbb86773b4b427757f0e89 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Wed, 17 Sep 2025 01:29:56 +0800 Subject: [PATCH 3/5] add remote interfaces Signed-off-by: Nelson Chen --- flytekit/clis/sdk_in_container/runv2.py | 32 ++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/flytekit/clis/sdk_in_container/runv2.py b/flytekit/clis/sdk_in_container/runv2.py index f22904d29e..b0464b7eb0 100644 --- a/flytekit/clis/sdk_in_container/runv2.py +++ b/flytekit/clis/sdk_in_container/runv2.py @@ -18,7 +18,7 @@ def _load_module_from_path(path: str): raise click.UsageError(f"Cannot load module from: {path}") mod = importlib.util.module_from_spec(spec) sys.modules["user_module"] = mod - spec.loader.exec_module(mod) # type: ignore + spec.loader.exec_module(mod) return mod @@ -39,12 +39,35 @@ def _parse_kv(pairs: tuple[str, ...]) -> Dict[str, Any]: return out +def _run_remote(entity, inputs): + if hasattr(flyte, "remote") and callable(getattr(flyte, "remote")): + r = flyte.remote() + try: + return r.run(entity, **inputs) + except TypeError: + return r.run(entity, inputs=inputs) + elif hasattr(flyte, "submit"): + return flyte.submit(entity, **inputs) + elif hasattr(flyte, "Runner") and hasattr(flyte.Runner, "remote"): + rr = flyte.Runner.remote() + try: + return rr.run(entity, inputs) + except TypeError: + return rr.run(entity, **inputs) + else: + raise click.UsageError( + "Remote execution is not available in this flyte-sdk build. Please upgrade flyte-sdk or configure a remote backend." + ) + + @click.command("runv2", context_settings={"ignore_unknown_options": True}) +@click.option("--remote", is_flag=True, default=False, + help="Submit via Flyte 2 remote backend if configured; otherwise run locally.") @click.argument("pyfile", type=click.Path(exists=True)) @click.argument("entity_name") @click.option("-i", "--input", "inputs_kv", multiple=True, help="key=value pairs") @click.option("--config", type=click.Path(exists=True), help="Flyte 2 SDK config file") -def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str | None): +def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str | None,remote: bool): """ pyflyte runv2 xx.py -i a=1 -i b=hello @@ -72,7 +95,10 @@ def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str inputs[k] = (v.lower() == "true") if v.lower() in ("true", "false") else ( float(v) if "." in v else (int(v) if v.isdigit() else v)) - out = flyte.run(entity, **inputs) + if remote: + out = _run_remote(entity, inputs) + else: + out = flyte.run(entity, **inputs) value = out for name in ("result", "output", "outputs"): From 28e8fd8bc6fb8bb717a78309bd5df4dbdd74a9bd Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Wed, 17 Sep 2025 01:34:38 +0800 Subject: [PATCH 4/5] lint Signed-off-by: Nelson Chen --- flytekit/clis/sdk_in_container/pyflyte.py | 2 -- flytekit/clis/sdk_in_container/runv2.py | 24 +++++++++++++++-------- flytekit/migration/task.py | 14 +++++++------ flytekit/migration/workflow.py | 5 ++--- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/flytekit/clis/sdk_in_container/pyflyte.py b/flytekit/clis/sdk_in_container/pyflyte.py index 65aa3d8b01..52cdc1b0f8 100644 --- a/flytekit/clis/sdk_in_container/pyflyte.py +++ b/flytekit/clis/sdk_in_container/pyflyte.py @@ -55,7 +55,6 @@ type=str, help="Path to config file for use within container", ) - @click.pass_context def main(ctx, pkgs: typing.List[str], config: str, verbose: int): """ @@ -79,7 +78,6 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: int): if pkgs is None: pkgs = [] - ctx.obj[CTX_PACKAGES] = pkgs ctx.obj[CTX_VERBOSE] = verbose diff --git a/flytekit/clis/sdk_in_container/runv2.py b/flytekit/clis/sdk_in_container/runv2.py index b0464b7eb0..3b52c00b3b 100644 --- a/flytekit/clis/sdk_in_container/runv2.py +++ b/flytekit/clis/sdk_in_container/runv2.py @@ -1,4 +1,5 @@ from __future__ import annotations + import importlib.util import json import sys @@ -6,8 +7,8 @@ import click import flyte -import flytekit +import flytekit from flytekit.migration.task import task_shim from flytekit.migration.workflow import workflow_shim @@ -30,7 +31,7 @@ def _parse_kv(pairs: tuple[str, ...]) -> Dict[str, Any]: k, v = kv.split("=", 1) # naive coercion if v.lower() in {"true", "false"}: - out[k] = (v.lower() == "true") + out[k] = v.lower() == "true" else: try: out[k] = int(v) if "." not in v else float(v) @@ -61,13 +62,17 @@ def _run_remote(entity, inputs): @click.command("runv2", context_settings={"ignore_unknown_options": True}) -@click.option("--remote", is_flag=True, default=False, - help="Submit via Flyte 2 remote backend if configured; otherwise run locally.") +@click.option( + "--remote", + is_flag=True, + default=False, + help="Submit via Flyte 2 remote backend if configured; otherwise run locally.", +) @click.argument("pyfile", type=click.Path(exists=True)) @click.argument("entity_name") @click.option("-i", "--input", "inputs_kv", multiple=True, help="key=value pairs") @click.option("--config", type=click.Path(exists=True), help="Flyte 2 SDK config file") -def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str | None,remote: bool): +def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str | None, remote: bool): """ pyflyte runv2 xx.py -i a=1 -i b=hello @@ -92,8 +97,11 @@ def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str inputs = {} for kv in inputs_kv: k, v = kv.split("=", 1) - inputs[k] = (v.lower() == "true") if v.lower() in ("true", "false") else ( - float(v) if "." in v else (int(v) if v.isdigit() else v)) + inputs[k] = ( + (v.lower() == "true") + if v.lower() in ("true", "false") + else (float(v) if "." in v else (int(v) if v.isdigit() else v)) + ) if remote: out = _run_remote(entity, inputs) @@ -119,4 +127,4 @@ def runv2(pyfile: str, entity_name: str, inputs_kv: tuple[str, ...], config: str try: click.echo(json.dumps({"result": value}, default=str)) except Exception: - click.echo(str(value)) \ No newline at end of file + click.echo(str(value)) diff --git a/flytekit/migration/task.py b/flytekit/migration/task.py index c667d41cb8..14315bad54 100644 --- a/flytekit/migration/task.py +++ b/flytekit/migration/task.py @@ -1,6 +1,11 @@ from __future__ import annotations + import datetime -from typing import Callable, Dict, Iterable, List, Literal, Optional, Tuple, Union, Any +from typing import Any, Callable, Dict, Iterable, List, Literal, Optional, Tuple, Union + +from flyte import Image, Resources, TaskEnvironment +from flyte._doc import Documentation as V2Docs +from flyte._task import AsyncFunctionTaskTemplate, P, R import flytekit from flytekit.core import launch_plan, workflow @@ -10,14 +15,11 @@ from flytekit.deck import DeckField from flytekit.extras.accelerators import BaseAccelerator -from flyte import Image, Resources, TaskEnvironment -from flyte._doc import Documentation as V2Docs -from flyte._task import AsyncFunctionTaskTemplate, P, R - def _to_v2_resources(req: Optional[flytekit.Resources], lim: Optional[flytekit.Resources]) -> Optional[Resources]: if not req and not lim: return None + # Pick requests first, then fall back to limits if requests missing. def pick(getter: Callable[[flytekit.Resources], Any], fallback_getter: Callable[[flytekit.Resources], Any]): if req and getter(req) is not None: @@ -89,7 +91,7 @@ def task_shim( accelerator: Optional[BaseAccelerator] = None, pickle_untyped: bool = False, shared_memory: Optional[Union[Literal[True], str]] = None, - resources: Optional[Resources] = None, # explicit v2 resources passthrough + resources: Optional[Resources] = None, # explicit v2 resources passthrough labels: Optional[dict[str, str]] = None, annotations: Optional[dict[str, str]] = None, **kwargs, diff --git a/flytekit/migration/workflow.py b/flytekit/migration/workflow.py index c956dda8db..9dc4b0f8b2 100644 --- a/flytekit/migration/workflow.py +++ b/flytekit/migration/workflow.py @@ -1,9 +1,8 @@ # compat_v2/workflow.py from __future__ import annotations -from typing import Callable, Optional, Dict, Any -import flytekit -import flyte +from typing import Any, Callable, Optional + from flyte import Image, Resources, TaskEnvironment from flyte._doc import Documentation as V2Docs From 8ef9be3e8f435593c6b6841445fc524d5bfd0e0b Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Wed, 17 Sep 2025 01:36:15 +0800 Subject: [PATCH 5/5] remove comment Signed-off-by: Nelson Chen --- flytekit/migration/workflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/migration/workflow.py b/flytekit/migration/workflow.py index 9dc4b0f8b2..ab2b864a80 100644 --- a/flytekit/migration/workflow.py +++ b/flytekit/migration/workflow.py @@ -1,4 +1,3 @@ -# compat_v2/workflow.py from __future__ import annotations from typing import Any, Callable, Optional