Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ea036bd
examples
wild-endeavor Nov 28, 2025
e9d80d9
copy files over
wild-endeavor Nov 28, 2025
e34e768
remove cloudidl and get it compiling
wild-endeavor Nov 28, 2025
98a2414
wheels
wild-endeavor Nov 28, 2025
d3af2f9
add basic trace support to action, untested
wild-endeavor Dec 1, 2025
90d90c9
at least temporarily replace groupdata with just a string, update con…
wild-endeavor Dec 1, 2025
3c51e78
update for regular hybrid mode
wild-endeavor Dec 1, 2025
f6bb342
readme and alternate between original and rs controller in hybrid
wild-endeavor Dec 1, 2025
251d473
pin idl to 14 because of semver lexical ordering and hybrid mode
wild-endeavor Dec 1, 2025
4fa9a8a
pr into pr ignore (#376)
wild-endeavor Dec 4, 2025
2d5d693
ignore - pr into pr (#423)
wild-endeavor Dec 12, 2025
ae603f5
Merge remote-tracking branch 'origin/main' into ctrl-rs
wild-endeavor Dec 12, 2025
8246b27
ignore pr into pr (#424)
wild-endeavor Dec 14, 2025
2b17e42
remove rs_controller from gitignore
wild-endeavor Dec 14, 2025
6e63f23
pr into ctrl-rs - error handling (#427)
wild-endeavor Dec 16, 2025
6e81b29
Improve Rust controller devex (#431)
machichima Dec 16, 2025
7f56f9a
refactor[rs_controller]: Use nightly fmt to reorganize imports (#473)
SVilgelm Dec 24, 2025
e76ac34
feat[rs_controller]: add Rust lint targets and GitHub Action job (#480)
SVilgelm Dec 27, 2025
05f5ff4
[Rust Controller] Add worker pool (#477)
machichima Dec 31, 2025
475ae6e
ctrl-rs select with env var (#429)
wild-endeavor Dec 31, 2025
8b68f35
merge main
wild-endeavor Dec 31, 2025
332fe22
wip - pr into ctrl-rs (#487)
wild-endeavor Dec 31, 2025
e3e3322
wip - fixes to ctrl-rs (#489)
wild-endeavor Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,31 @@ jobs:
- name: Lint
run: |
make fmt
rs-fmt:
name: rust fmt
runs-on: ubuntu-latest
steps:
- name: Fetch the code
uses: actions/checkout@v4
- name: Install nightly toolchain
run: |
rustup toolchain install nightly
rustup component add --toolchain nightly-x86_64-unknown-linux-gnu rustfmt
- name: fmt
run: |
make -C rs_controller check-fmt
rs-lint:
name: rust lint
runs-on: ubuntu-latest
steps:
- name: Fetch the code
uses: actions/checkout@v4
- name: Install toolchain
run: |
rustup toolchain install
- name: lint
run: |
make -C rs_controller lint
mypy:
name: make mypy
runs-on: ubuntu-latest
Expand Down
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ __pycache__/
# C extensions
*.so

# Temporary
rs_controller/

# Distribution / packaging
.Python
build/
Expand Down Expand Up @@ -193,4 +190,7 @@ examples/local_debug.ipynb
examples/remote_example.py
config.yaml
.claude/
tests/flyte/internal/bin/flyte-inputs.json
tests/flyte/internal/bin/flyte-parameters.json
.cargo/
.run/
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Default registry for image builds
REGISTRY ?= ghcr.io/flyteorg
# Default name for connector image
CONNECTOR_IMAGE_NAME ?= flyte-connector

# Default target: show all available targets
.PHONY: help
help:
Expand Down Expand Up @@ -78,6 +83,12 @@ unit_test_plugins:
fi \
done

.PHONY: dev-rs-dist
dev-rs-dist:
cd rs_controller && $(MAKE) build-wheels
$(MAKE) dist
uv run python maint_tools/build_default_image.py --registry $(REGISTRY) --name $(CONNECTOR_IMAGE_NAME)
uv pip install --find-links ./rs_controller/dist --no-index --force-reinstall --no-deps flyte_controller_base

.PHONY: cli-docs-gen
cli-docs-gen: ## Generate CLI documentation
Expand Down
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,65 @@ python maint_tools/build_default_image.py --registry ghcr.io/my-org --name my-fl
## 📄 License

Flyte 2 is licensed under the [Apache 2.0 License](LICENSE).

## Developing the Core Controller

Create a separate virtual environment for the Rust contoller inside the rs_controller folder. The reason for this is
because the rust controller should be a separate pypi package. The reason it should be a separate pypi package is that
including it into the main SDK as a core component means the entire build toolchain for the SDK will need to become
rust/maturin based. We should probably move to this model in the future though.

Keep important dependencies the same though, namely flyteidl2.

The following instructions are for helping to build the default multi-arch image. Each architecture needs a different wheel. Each wheel needs to be built by a different docker image.

### Setup Builders
`cd` into `rs_controller` and run `make build-builders`. This will build the builder images once, so you can keep using them as the rust code changes.

### Iteration Cycle
Run `make build-wheels` to actually build the multi-arch wheels. This command should probably be updated to build all three,
currently it only builds for linux/amd64 and linux/arm64... the `make build-wheel-local` command builds a macosx wheel,
unclear what the difference is between that and the arm64 one, and unclear if both are present, which one pip chooses.

`cd` back up to the root folder of this project and proceed with
```bash
make dist
python maint_tools/build_default_image.py
```

To install the wheel locally for testing, use the following command with your venv active.
```bash
uv pip install --find-links ./rs_controller/dist --no-index --force-reinstall --no-deps flyte_controller_base
```
Repeat this process to iterate - build new wheels, force reinstall the controller package.

### Build Configuration Summary

In order to support both Rust crate publication and Python wheel distribution, we have
to sometimes use and sometimes not use the 'pyo3/extension-module' feature. To do this, this
project's Cargo.toml itself can toggle this on and off.

[features]
default = ["pyo3/auto-initialize"] # For Rust crate users (links to libpython)
extension-module = ["pyo3/extension-module"] # For Python wheels (no libpython linking)

The cargo file contains

# Cargo.toml
[lib]
crate-type = ["rlib", "cdylib"] # Support both Rust and Python usage

When using 'default', 'auto-initialize' is turned on, which requires linking to libpython, which exists on local Mac so
this works nicely. It is not available in manylinux however, so trying to build with this feature in a manylinux docker
image will fail. But that's okay, because the purpose of the manylinux container is to build wheels,
and for wheels, we need the 'extension-module' feature, which disables linking to libpython.

The key insight: auto-initialize is for embedding Python in Rust (needs libpython), while
extension-module is for extending Python with Rust (must NOT link libpython for portability).

This setup makes it possible to build wheels and also run Rust binaries with `cargo run --bin`.

(not sure if this is needed)
# pyproject.toml
[tool.maturin]
features = ["extension-module"] # Tells maturin to use extension-module feature
11 changes: 10 additions & 1 deletion examples/advanced/cancel_tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import asyncio
from pathlib import Path

import flyte
import flyte.errors
from flyte._image import PythonWheels

env = flyte.TaskEnvironment("cancel")
controller_dist_folder = Path("/Users/ytong/go/src/github.com/flyteorg/sdk-rust/rs_controller/dist")
wheel_layer = PythonWheels(wheel_dir=controller_dist_folder, package_name="flyte_controller_base")
base = flyte.Image.from_debian_base()
rs_controller_image = base.clone(addl_layer=wheel_layer)


env = flyte.TaskEnvironment("cancel", image=rs_controller_image)


@env.task
Expand Down
72 changes: 72 additions & 0 deletions examples/advanced/hybrid_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
import os
from pathlib import Path
from typing import List

import flyte
import flyte.storage
from flyte.storage import S3

env = flyte.TaskEnvironment(name="hello_world", cache="disable")


@env.task
async def say_hello_hybrid(data: str, lt: List[int]) -> str:
print(f"Hello, world! - {flyte.ctx().action}")
return f"Hello {data} {lt}"


@env.task
async def squared(i: int = 3) -> int:
print(flyte.ctx().action)
return i * i


@env.task
async def squared_2(i: int = 3) -> int:
print(flyte.ctx().action)
return i * i


@env.task
async def say_hello_hybrid_nested(data: str = "default string") -> str:
print(f"Hello, nested! - {flyte.ctx().action}")
coros = []
for i in range(3):
coros.append(squared(i=i))

vals = await asyncio.gather(*coros)
return await say_hello_hybrid(data=data, lt=vals)


@env.task
async def hybrid_parent_placeholder():
import sys
import time

print(f"Hello, hybrid parent placeholder - Task Context: {flyte.ctx()}")
print(f"Run command: {sys.argv}")
print("Environment Variables:")
for k, value in sorted(os.environ.items()):
if k.startswith("FLYTE_") or k.startswith("_U"): # noqa: PIE810
print(f"{k}: {value}")

print("Sleeping for 24 hours to simulate a long-running task...", flush=True)
time.sleep(86400) # noqa: ASYNC251


if __name__ == "__main__":
# Get current working directory
current_directory = Path(os.getcwd())
repo_root = current_directory.parent.parent
s3_sandbox = S3.for_sandbox()
flyte.init_from_config("/Users/ytong/.flyte/config-k3d.yaml", root_dir=repo_root, storage=s3_sandbox)

# Kick off a run of hybrid_parent_placeholder and fill in with kicked off things.
run_name = "r9sfvk6plj7gld7fds6f"
outputs = flyte.with_runcontext(
mode="hybrid",
name=run_name,
run_base_dir=f"s3://bucket/metadata/v2/testorg/testproject/development/{run_name}",
).run(say_hello_hybrid_nested, data="hello world")
print("Output:", outputs)
71 changes: 71 additions & 0 deletions examples/advanced/remote_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio

# from cloud_mod.cloud_mod import cloudidl
# from cloud_mod.cloud_mod import Action
from pathlib import Path

from flyte_controller_base import Action, BaseController, cloudidl

from examples.advanced.hybrid_mode import say_hello_hybrid
from flyte._internal.imagebuild.image_builder import ImageCache
from flyte._internal.runtime.task_serde import translate_task_to_wire
from flyte.models import (
CodeBundle,
SerializationContext,
)

img_cache = ImageCache.from_transport(
"H4sIAAAAAAAC/wXBSQ6AIAwAwL/0TsG6hs8YlILEpUbFxBj/7swLaXWR+0VkzjvYF1y+BCzEaTwwic5bks0lJeepw/JcbPenxKJUt0FCM1CLnu+KVAwjd559g54M1aYtavi+H56TcPxgAAAA"
)
s_ctx = SerializationContext(
project="testproject",
domain="development",
org="testorg",
code_bundle=CodeBundle(
computed_version="605136feba679aeb1936677f4c5593f6",
tgz="s3://bucket/testproject/development/MBITN7V2M6NOWGJWM57UYVMT6Y======/fast0dc2ef669a983610a0b9793e974fb288.tar.gz",
),
version="605136feba679aeb1936677f4c5593f6",
image_cache=img_cache,
root_dir=Path("/Users/ytong/go/src/github.com/unionai/unionv2"),
)
task_spec = translate_task_to_wire(say_hello_hybrid, s_ctx)
xxx = task_spec.SerializeToString()

yyy = cloudidl.workflow.TaskSpec.decode(xxx)
print(yyy)


class MyRunner(BaseController):
...
# play around with this
# def __init__(self, run_id: cloudidl.workflow.RunIdentifier):
# super().__new__(BaseController, run_id)


async def main():
run_id = cloudidl.workflow.RunIdentifier(
org="testorg", domain="development", name="rxp79l5qjpmmdd84qg7j", project="testproject"
)

sub_action_id = cloudidl.workflow.ActionIdentifier(name="sub_action_3", run=run_id)

action = Action.from_task(
sub_action_id=sub_action_id,
parent_action_name="a0",
group_data=None,
task_spec=yyy,
inputs_uri="s3://bucket/metadata/v2/testorg/testproject/development/rllmmzgh6v4xjc8pswc8/4jzwmmj06fnpql20rtlqz4aq2/inputs.pb",
run_output_base="s3://bucket/metadata/v2/testorg/testproject/development/rllmmzgh6v4xjc8pswc8",
cache_key=None,
)

runner = MyRunner(run_id=run_id)

result = await runner.submit_action(action)
print("First submit done", flush=True)
print(result)


if __name__ == "__main__":
asyncio.run(main())
8 changes: 8 additions & 0 deletions examples/basics/devbox_one.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import asyncio
import logging
from pathlib import Path
from typing import List

import flyte
from flyte._image import PythonWheels

controller_dist_folder = Path("/Users/ytong/go/src/github.com/flyteorg/sdk-rust/rs_controller/dist")
wheel_layer = PythonWheels(wheel_dir=controller_dist_folder, package_name="flyte_controller_base")
base = flyte.Image.from_debian_base()
rs_controller_image = base.clone(addl_layer=wheel_layer)

env = flyte.TaskEnvironment(
name="hello_world",
resources=flyte.Resources(cpu=1, memory="1Gi"),
image=rs_controller_image,
)


Expand Down
16 changes: 16 additions & 0 deletions examples/basics/hello.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from pathlib import Path

import flyte
from flyte._image import PythonWheels

controller_dist_folder = Path("/Users/ytong/go/src/github.com/flyteorg/sdk-rust/rs_controller/dist")
wheel_layer = PythonWheels(wheel_dir=controller_dist_folder, package_name="flyte_controller_base")
base = flyte.Image.from_debian_base()
rs_controller_image = base.clone(addl_layer=wheel_layer)

# TaskEnvironments provide a simple way of grouping configuration used by tasks (more later).
env = flyte.TaskEnvironment(
name="hello_world",
resources=flyte.Resources(memory="250Mi"),
image=rs_controller_image,
)


Expand All @@ -26,6 +35,13 @@ def main(x_list: list[int]) -> float:
return y_mean


@env.task
def main2(x_list: list[int]) -> float:
y = fn(x_list[0])
print(f"y = {y}!!!", flush=True)
return float(y)


if __name__ == "__main__":
flyte.init_from_config() # establish remote connection from within your script.
run = flyte.run(main, x_list=list(range(10))) # run remotely inline and pass data.
Expand Down
12 changes: 10 additions & 2 deletions examples/stress/crash_recovery_trace.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import os
from pathlib import Path

import flyte
import flyte.errors
from flyte._image import PythonWheels

controller_dist_folder = Path("/Users/ytong/go/src/github.com/flyteorg/sdk-rust/rs_controller/dist")
wheel_layer = PythonWheels(wheel_dir=controller_dist_folder, package_name="flyte_controller_base")
base = flyte.Image.from_debian_base()
rs_controller_image = base.clone(addl_layer=wheel_layer)


env = flyte.TaskEnvironment(
name="crash_recovery_trace",
resources=flyte.Resources(memory="250Mi", cpu=1),
name="crash_recovery_trace", resources=flyte.Resources(memory="250Mi", cpu=1), image=rs_controller_image
)


Expand Down Expand Up @@ -38,6 +45,7 @@ async def main() -> list[int]:
attempt_number = get_attempt_number()
# Fail at attempts 0, 1, and 2 at for i = 100, 200, 300 respectively, then succeed
if i == (attempt_number + 1) * 100 and attempt_number < 3:
print(f"Simulating crash for element {i=} and {attempt_number=}", flush=True)
raise flyte.errors.RuntimeSystemError(
"simulated", f"Simulated failure on attempt {get_attempt_number()} at iteration {i}"
)
Expand Down
Loading
Loading