Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Version 0.0.4 (in development)

* Added tests
* Internal refactoring


## Version 0.0.3

* **Bug fix** - `ExternalPythonOperator` does not need Airflow in external environment now.
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Quick rule of thumb:
- Use `prod_local` for testing end-to-end workflows on production-like settngs.
- Use `prod` for production pipelines in the real cluster.

## User workflow:
## User workflow
A typical user workflow could look like this:

```mermaid
Expand Down
4 changes: 2 additions & 2 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "gaiaflow"
requires-python = ">= 3.11"
version = "0.0.3"
version = "0.0.4dev0"
description = "Local-first MLOps infrastructure python tool that simplifies the process of building, testing, and deploying ML workflows."
authors = [{name = "Yogesh Kumar Baljeet Singh", email = "[email protected]"}]
dependencies = [
Expand Down
23 changes: 13 additions & 10 deletions src/gaiaflow/cli/commands/minikube.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

def load_imports():
from gaiaflow.constants import BaseAction
from gaiaflow.managers.minikube_manager import (ExtendedAction,
MinikubeManager)
from gaiaflow.managers.utils import (create_gaiaflow_context_path,
gaiaflow_path_exists_in_state,
parse_key_value_pairs)
from gaiaflow.managers.minikube_manager import ExtendedAction, MinikubeManager
from gaiaflow.managers.utils import (
create_gaiaflow_context_path,
gaiaflow_path_exists_in_state,
parse_key_value_pairs,
)

return SimpleNamespace(
BaseAction=BaseAction,
Expand Down Expand Up @@ -101,12 +102,14 @@ def restart(
)


@app.command(help="Containerize your package into a docker image inside the "
"minikube cluster.")
@app.command(
help="Containerize your package into a docker image inside the minikube cluster."
)
def dockerize(
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
image_name: str = typer.Option(DEFAULT_IMAGE_NAME, "--image-name", "-i",
help=("Name of your image.")),
image_name: str = typer.Option(
DEFAULT_IMAGE_NAME, "--image-name", "-i", help=("Name of your image.")
),
):
imports = load_imports()
gaiaflow_path, user_project_path = imports.create_gaiaflow_context_path(
Expand All @@ -121,7 +124,7 @@ def dockerize(
user_project_path=user_project_path,
action=imports.ExtendedAction.DOCKERIZE,
local=False,
image_name=image_name
image_name=image_name,
)


Expand Down
47 changes: 29 additions & 18 deletions src/gaiaflow/cli/commands/mlops.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
import fsspec
import typer

from gaiaflow.constants import Service, DEFAULT_IMAGE_NAME
from gaiaflow.constants import DEFAULT_IMAGE_NAME, Service

app = typer.Typer()
fs = fsspec.filesystem("file")


def load_imports():
from gaiaflow.constants import BaseAction, ExtendedAction
from gaiaflow.managers.mlops_manager import MlopsManager
from gaiaflow.managers.minikube_manager import MinikubeManager
from gaiaflow.managers.utils import (create_gaiaflow_context_path,
gaiaflow_path_exists_in_state,
save_project_state)
from gaiaflow.managers.mlops_manager import MlopsManager
from gaiaflow.managers.utils import (
create_gaiaflow_context_path,
gaiaflow_path_exists_in_state,
save_project_state,
)

return SimpleNamespace(
BaseAction=BaseAction,
Expand Down Expand Up @@ -56,11 +58,17 @@ def start(
False, "--docker-build", "-b", help="Force Docker image build"
),
user_env_name: str = typer.Option(
None, "--env", "-e", help="Provide conda/mamba environment name for "
"Jupyter Lab to run. If not set, it will use the name from your environment.yml file."
None,
"--env",
"-e",
help="Provide conda/mamba environment name for "
"Jupyter Lab to run. If not set, it will use the name from your environment.yml file.",
),
env_tool: "str" = typer.Option(
"mamba", "--env-tool", "-t", help="Which tool to use for running your Jupyter lab. Options: mamba, conda",
"mamba",
"--env-tool",
"-t",
help="Which tool to use for running your Jupyter lab. Options: mamba, conda",
),
):
imports = load_imports()
Expand Down Expand Up @@ -242,12 +250,12 @@ def cleanup(
)



@app.command(help="Containerize your package into a docker image locally.")
def dockerize(
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
image_name: str = typer.Option(DEFAULT_IMAGE_NAME, "--image-name", "-i",
help=("Name of your image.")),
image_name: str = typer.Option(
DEFAULT_IMAGE_NAME, "--image-name", "-i", help=("Name of your image.")
),
):
imports = load_imports()
gaiaflow_path, user_project_path = imports.create_gaiaflow_context_path(
Expand All @@ -268,15 +276,18 @@ def dockerize(
user_project_path=user_project_path,
action=imports.ExtendedAction.DOCKERIZE,
local=True,
image_name=image_name
image_name=image_name,
)

@app.command(help="Update the dependencies for the Airflow tasks. This command "
"synchronizes the running container environments with the project's"
"`environment.yml`. Make sure you have updated "
"`environment.yml` before running"
"this, as the container environments are updated based on "
"its contents.")

@app.command(
help="Update the dependencies for the Airflow tasks. This command "
"synchronizes the running container environments with the project's"
"`environment.yml`. Make sure you have updated "
"`environment.yml` before running"
"this, as the container environments are updated based on "
"its contents."
)
def update_deps(
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
):
Expand Down
8 changes: 6 additions & 2 deletions src/gaiaflow/core/create_task.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from enum import Enum

from .operators import (DevTaskOperator, DockerTaskOperator,
ProdLocalTaskOperator, ProdTaskOperator)
from .operators import (
DevTaskOperator,
DockerTaskOperator,
ProdLocalTaskOperator,
ProdTaskOperator,
)


class GaiaflowMode(Enum):
Expand Down
61 changes: 40 additions & 21 deletions src/gaiaflow/core/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import platform
from datetime import datetime

from airflow.providers.cncf.kubernetes.operators.pod import \
KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.standard.operators.python import ExternalPythonOperator
from kubernetes.client import V1ResourceRequirements

from gaiaflow.constants import (DEFAULT_MINIO_AWS_ACCESS_KEY_ID,
DEFAULT_MINIO_AWS_SECRET_ACCESS_KEY,
RESOURCE_PROFILES)
from gaiaflow.constants import (
DEFAULT_MINIO_AWS_ACCESS_KEY_ID,
DEFAULT_MINIO_AWS_SECRET_ACCESS_KEY,
RESOURCE_PROFILES,
)

from .utils import build_env_from_secrets, inject_params_as_env_vars

Expand Down Expand Up @@ -122,16 +122,35 @@ def create_func_env_vars(self):

class DevTaskOperator(BaseTaskOperator):
def create_task(self):
from gaiaflow.core.runner import run
import os

current_dir = os.path.dirname(os.path.abspath(__file__))

args, kwargs = self.resolve_args_kwargs()
kwargs["params"] = dict(self.params)
op_kwargs = {"func_path": self.func_path, "args": args, "kwargs": kwargs}
op_kwargs = {
"func_path": self.func_path,
"args": args,
"kwargs": kwargs,
"current_dir": current_dir,
}

def run_wrapper(**op_kwargs):
import sys

sys.path.append(op_kwargs.get("current_dir", ""))
from runner import run

return run(
func_path=op_kwargs.get("func_path"),
args=op_kwargs.get("args"),
kwargs=op_kwargs.get("kwargs"),
)

return ExternalPythonOperator(
task_id=self.task_id,
python="/home/airflow/.local/share/mamba/envs/default_user_env/bin/python",
python_callable=run,
python_callable=run_wrapper,
op_kwargs=op_kwargs,
do_xcom_push=True,
retries=self.retries,
Expand Down Expand Up @@ -191,17 +210,17 @@ def create_task(self):
if profile is None:
raise ValueError(f"Unknown resource profile: {profile_name}")

resources = V1ResourceRequirements(
requests={
"cpu": profile["request_cpu"],
"memory": profile["request_memory"],
},
limits={
"cpu": profile["limit_cpu"],
"memory": profile["limit_memory"],
# "gpu": profile.get["limit_gpu"],
},
)
# resources = V1ResourceRequirements(
# requests={
# "cpu": profile["request_cpu"],
# "memory": profile["request_memory"],
# },
# limits={
# "cpu": profile["limit_cpu"],
# "memory": profile["limit_memory"],
# # "gpu": profile.get["limit_gpu"],
# },
# )

return KubernetesPodOperator(
task_id=self.task_id,
Expand All @@ -216,7 +235,7 @@ def create_task(self):
do_xcom_push=True,
retries=self.retries,
params=self.params,
container_resources=resources,
# container_resources=resources,
)


Expand Down
66 changes: 34 additions & 32 deletions src/gaiaflow/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@
from typing import Any


def run(
func_path: str | None = None,
args: list | None = None,
kwargs: dict[str, Any] | None = None,
) -> dict[str, str]:
mode = os.environ.get("MODE", "dev")
print(f"## Runner running in {mode} mode ##")
func_path, args, kwargs = _resolve_inputs(func_path, args, kwargs, mode)

if not func_path:
raise ValueError("func_path must be provided")

func = _import_function(func_path)

print(f"Running {func_path} with args: {args} and kwargs :{kwargs}")
result = func(*args, **kwargs)
print("Function result:", result)

_write_result(result, mode)

return result


def _extract_params_from_env(prefix="PARAMS_") -> dict[str, str]:
return {
k[len(prefix) :].lower(): v
Expand All @@ -18,47 +41,32 @@ def _extract_params_from_env(prefix="PARAMS_") -> dict[str, str]:
}


def run(
func_path: str | None = None,
args: list | None = None,
kwargs: dict[str, Any] | None = None,
) -> dict[str, str]:
mode = os.environ.get("MODE", "dev")
print(f"## Runner running in {mode} mode ##")
def _resolve_inputs(func_path: str, args: list[Any], kwargs: dict[Any], mode: str):
if mode == "dev":
print("args", args)
print("kwargs", kwargs)
else:
func_path = os.environ.get("FUNC_PATH", "")
return func_path, args or [], kwargs or {}
else: # all other modes (dev_docker, prod_local and prod)
func_path = os.environ.get("FUNC_PATH", func_path)
args = json.loads(os.environ.get("FUNC_ARGS", "[]"))
kwargs = json.loads(os.environ.get("FUNC_KWARGS", "{}"))
params: dict = _extract_params_from_env()
kwargs["params"] = params
print("args", args)
print("kwargs", kwargs)
kwargs["params"] = _extract_params_from_env()
return func_path, args, kwargs

if not func_path:
raise ValueError("func_path must be provided")

module_path, func_name = func_path.rsplit(":", 1)
def _import_function(func_path: str):
import importlib

module_path, func_name = func_path.rsplit(":", 1)
module = importlib.import_module(module_path)
func = getattr(module, func_name)
return getattr(module, func_name)

print(f"Running {func_path} with args: {args} and kwargs :{kwargs}")
result = func(*args, **kwargs)
print("Function result:", result)

def _write_result(result, mode):
if mode == "prod" or mode == "prod_local":
# This is needed when we use KubernetesPodOperator and want to
# share information via XCOM.
_write_xcom_result(result)
if mode == "dev_docker":
with open("/tmp/script.out", "wb+") as tmp:
pickle.dump(result, tmp)

return result


def _write_xcom_result(result: Any) -> None:
try:
Expand All @@ -68,12 +76,6 @@ def _write_xcom_result(result: Any) -> None:
with open(f"{xcom_dir}/return.json", "w") as f:
json.dump(result, f)

path = "/airflow/xcom/return.json"
print("[DEBUG] File exists:", os.path.exists(path))
print("[DEBUG] File size:", os.path.getsize(path))
with open(path, "r") as f:
print("[DEBUG] File contents:", f.read())

print("Result written to XCom successfully")
except Exception as e:
print(f"Failed to write XCom result: {e}")
Expand Down
Loading