Skip to content

Commit dcac2ca

Browse files
authored
Merge pull request #6 from bcdev/yogesh-xxx-add-tests
Add tests and internal refactor of managers
2 parents b360ae5 + 5f3e661 commit dcac2ca

31 files changed

+3631
-807
lines changed

CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Version 0.0.4 (in development)
2+
3+
* Added tests
4+
* Internal refactoring
5+
6+
17
## Version 0.0.3
28

39
* **Bug fix** - `ExternalPythonOperator` does not need Airflow in external environment now.

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ Quick rule of thumb:
215215
- Use `prod_local` for testing end-to-end workflows on production-like settngs.
216216
- Use `prod` for production pipelines in the real cluster.
217217

218-
## User workflow:
218+
## User workflow
219219
A typical user workflow could look like this:
220220

221221
```mermaid

pixi.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[project]
22
name = "gaiaflow"
33
requires-python = ">= 3.11"
4-
version = "0.0.3"
4+
version = "0.0.4dev0"
55
description = "Local-first MLOps infrastructure python tool that simplifies the process of building, testing, and deploying ML workflows."
66
authors = [{name = "Yogesh Kumar Baljeet Singh", email = "[email protected]"}]
77
dependencies = [

src/gaiaflow/cli/commands/minikube.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212

1313
def load_imports():
1414
from gaiaflow.constants import BaseAction
15-
from gaiaflow.managers.minikube_manager import (ExtendedAction,
16-
MinikubeManager)
17-
from gaiaflow.managers.utils import (create_gaiaflow_context_path,
18-
gaiaflow_path_exists_in_state,
19-
parse_key_value_pairs)
15+
from gaiaflow.managers.minikube_manager import ExtendedAction, MinikubeManager
16+
from gaiaflow.managers.utils import (
17+
create_gaiaflow_context_path,
18+
gaiaflow_path_exists_in_state,
19+
parse_key_value_pairs,
20+
)
2021

2122
return SimpleNamespace(
2223
BaseAction=BaseAction,
@@ -101,12 +102,14 @@ def restart(
101102
)
102103

103104

104-
@app.command(help="Containerize your package into a docker image inside the "
105-
"minikube cluster.")
105+
@app.command(
106+
help="Containerize your package into a docker image inside the minikube cluster."
107+
)
106108
def dockerize(
107109
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
108-
image_name: str = typer.Option(DEFAULT_IMAGE_NAME, "--image-name", "-i",
109-
help=("Name of your image.")),
110+
image_name: str = typer.Option(
111+
DEFAULT_IMAGE_NAME, "--image-name", "-i", help=("Name of your image.")
112+
),
110113
):
111114
imports = load_imports()
112115
gaiaflow_path, user_project_path = imports.create_gaiaflow_context_path(
@@ -121,7 +124,7 @@ def dockerize(
121124
user_project_path=user_project_path,
122125
action=imports.ExtendedAction.DOCKERIZE,
123126
local=False,
124-
image_name=image_name
127+
image_name=image_name,
125128
)
126129

127130

src/gaiaflow/cli/commands/mlops.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@
55
import fsspec
66
import typer
77

8-
from gaiaflow.constants import Service, DEFAULT_IMAGE_NAME
8+
from gaiaflow.constants import DEFAULT_IMAGE_NAME, Service
99

1010
app = typer.Typer()
1111
fs = fsspec.filesystem("file")
1212

1313

1414
def load_imports():
1515
from gaiaflow.constants import BaseAction, ExtendedAction
16-
from gaiaflow.managers.mlops_manager import MlopsManager
1716
from gaiaflow.managers.minikube_manager import MinikubeManager
18-
from gaiaflow.managers.utils import (create_gaiaflow_context_path,
19-
gaiaflow_path_exists_in_state,
20-
save_project_state)
17+
from gaiaflow.managers.mlops_manager import MlopsManager
18+
from gaiaflow.managers.utils import (
19+
create_gaiaflow_context_path,
20+
gaiaflow_path_exists_in_state,
21+
save_project_state,
22+
)
2123

2224
return SimpleNamespace(
2325
BaseAction=BaseAction,
@@ -56,11 +58,17 @@ def start(
5658
False, "--docker-build", "-b", help="Force Docker image build"
5759
),
5860
user_env_name: str = typer.Option(
59-
None, "--env", "-e", help="Provide conda/mamba environment name for "
60-
"Jupyter Lab to run. If not set, it will use the name from your environment.yml file."
61+
None,
62+
"--env",
63+
"-e",
64+
help="Provide conda/mamba environment name for "
65+
"Jupyter Lab to run. If not set, it will use the name from your environment.yml file.",
6166
),
6267
env_tool: "str" = typer.Option(
63-
"mamba", "--env-tool", "-t", help="Which tool to use for running your Jupyter lab. Options: mamba, conda",
68+
"mamba",
69+
"--env-tool",
70+
"-t",
71+
help="Which tool to use for running your Jupyter lab. Options: mamba, conda",
6472
),
6573
):
6674
imports = load_imports()
@@ -242,12 +250,12 @@ def cleanup(
242250
)
243251

244252

245-
246253
@app.command(help="Containerize your package into a docker image locally.")
247254
def dockerize(
248255
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
249-
image_name: str = typer.Option(DEFAULT_IMAGE_NAME, "--image-name", "-i",
250-
help=("Name of your image.")),
256+
image_name: str = typer.Option(
257+
DEFAULT_IMAGE_NAME, "--image-name", "-i", help=("Name of your image.")
258+
),
251259
):
252260
imports = load_imports()
253261
gaiaflow_path, user_project_path = imports.create_gaiaflow_context_path(
@@ -268,15 +276,18 @@ def dockerize(
268276
user_project_path=user_project_path,
269277
action=imports.ExtendedAction.DOCKERIZE,
270278
local=True,
271-
image_name=image_name
279+
image_name=image_name,
272280
)
273281

274-
@app.command(help="Update the dependencies for the Airflow tasks. This command "
275-
"synchronizes the running container environments with the project's"
276-
"`environment.yml`. Make sure you have updated "
277-
"`environment.yml` before running"
278-
"this, as the container environments are updated based on "
279-
"its contents.")
282+
283+
@app.command(
284+
help="Update the dependencies for the Airflow tasks. This command "
285+
"synchronizes the running container environments with the project's"
286+
"`environment.yml`. Make sure you have updated "
287+
"`environment.yml` before running"
288+
"this, as the container environments are updated based on "
289+
"its contents."
290+
)
280291
def update_deps(
281292
project_path: Path = typer.Option(..., "--path", "-p", help="Path to your project"),
282293
):

src/gaiaflow/core/create_task.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from enum import Enum
22

3-
from .operators import (DevTaskOperator, DockerTaskOperator,
4-
ProdLocalTaskOperator, ProdTaskOperator)
3+
from .operators import (
4+
DevTaskOperator,
5+
DockerTaskOperator,
6+
ProdLocalTaskOperator,
7+
ProdTaskOperator,
8+
)
59

610

711
class GaiaflowMode(Enum):

src/gaiaflow/core/operators.py

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
import platform
33
from datetime import datetime
44

5-
from airflow.providers.cncf.kubernetes.operators.pod import \
6-
KubernetesPodOperator
5+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
76
from airflow.providers.docker.operators.docker import DockerOperator
87
from airflow.providers.standard.operators.python import ExternalPythonOperator
9-
from kubernetes.client import V1ResourceRequirements
108

11-
from gaiaflow.constants import (DEFAULT_MINIO_AWS_ACCESS_KEY_ID,
12-
DEFAULT_MINIO_AWS_SECRET_ACCESS_KEY,
13-
RESOURCE_PROFILES)
9+
from gaiaflow.constants import (
10+
DEFAULT_MINIO_AWS_ACCESS_KEY_ID,
11+
DEFAULT_MINIO_AWS_SECRET_ACCESS_KEY,
12+
RESOURCE_PROFILES,
13+
)
1414

1515
from .utils import build_env_from_secrets, inject_params_as_env_vars
1616

@@ -122,16 +122,35 @@ def create_func_env_vars(self):
122122

123123
class DevTaskOperator(BaseTaskOperator):
124124
def create_task(self):
125-
from gaiaflow.core.runner import run
125+
import os
126+
127+
current_dir = os.path.dirname(os.path.abspath(__file__))
126128

127129
args, kwargs = self.resolve_args_kwargs()
128130
kwargs["params"] = dict(self.params)
129-
op_kwargs = {"func_path": self.func_path, "args": args, "kwargs": kwargs}
131+
op_kwargs = {
132+
"func_path": self.func_path,
133+
"args": args,
134+
"kwargs": kwargs,
135+
"current_dir": current_dir,
136+
}
137+
138+
def run_wrapper(**op_kwargs):
139+
import sys
140+
141+
sys.path.append(op_kwargs.get("current_dir", ""))
142+
from runner import run
143+
144+
return run(
145+
func_path=op_kwargs.get("func_path"),
146+
args=op_kwargs.get("args"),
147+
kwargs=op_kwargs.get("kwargs"),
148+
)
130149

131150
return ExternalPythonOperator(
132151
task_id=self.task_id,
133152
python="/home/airflow/.local/share/mamba/envs/default_user_env/bin/python",
134-
python_callable=run,
153+
python_callable=run_wrapper,
135154
op_kwargs=op_kwargs,
136155
do_xcom_push=True,
137156
retries=self.retries,
@@ -191,17 +210,17 @@ def create_task(self):
191210
if profile is None:
192211
raise ValueError(f"Unknown resource profile: {profile_name}")
193212

194-
resources = V1ResourceRequirements(
195-
requests={
196-
"cpu": profile["request_cpu"],
197-
"memory": profile["request_memory"],
198-
},
199-
limits={
200-
"cpu": profile["limit_cpu"],
201-
"memory": profile["limit_memory"],
202-
# "gpu": profile.get["limit_gpu"],
203-
},
204-
)
213+
# resources = V1ResourceRequirements(
214+
# requests={
215+
# "cpu": profile["request_cpu"],
216+
# "memory": profile["request_memory"],
217+
# },
218+
# limits={
219+
# "cpu": profile["limit_cpu"],
220+
# "memory": profile["limit_memory"],
221+
# # "gpu": profile.get["limit_gpu"],
222+
# },
223+
# )
205224

206225
return KubernetesPodOperator(
207226
task_id=self.task_id,
@@ -216,7 +235,7 @@ def create_task(self):
216235
do_xcom_push=True,
217236
retries=self.retries,
218237
params=self.params,
219-
container_resources=resources,
238+
# container_resources=resources,
220239
)
221240

222241

src/gaiaflow/core/runner.py

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,29 @@
1010
from typing import Any
1111

1212

13+
def run(
14+
func_path: str | None = None,
15+
args: list | None = None,
16+
kwargs: dict[str, Any] | None = None,
17+
) -> dict[str, str]:
18+
mode = os.environ.get("MODE", "dev")
19+
print(f"## Runner running in {mode} mode ##")
20+
func_path, args, kwargs = _resolve_inputs(func_path, args, kwargs, mode)
21+
22+
if not func_path:
23+
raise ValueError("func_path must be provided")
24+
25+
func = _import_function(func_path)
26+
27+
print(f"Running {func_path} with args: {args} and kwargs :{kwargs}")
28+
result = func(*args, **kwargs)
29+
print("Function result:", result)
30+
31+
_write_result(result, mode)
32+
33+
return result
34+
35+
1336
def _extract_params_from_env(prefix="PARAMS_") -> dict[str, str]:
1437
return {
1538
k[len(prefix) :].lower(): v
@@ -18,47 +41,32 @@ def _extract_params_from_env(prefix="PARAMS_") -> dict[str, str]:
1841
}
1942

2043

21-
def run(
22-
func_path: str | None = None,
23-
args: list | None = None,
24-
kwargs: dict[str, Any] | None = None,
25-
) -> dict[str, str]:
26-
mode = os.environ.get("MODE", "dev")
27-
print(f"## Runner running in {mode} mode ##")
44+
def _resolve_inputs(func_path: str, args: list[Any], kwargs: dict[Any], mode: str):
2845
if mode == "dev":
29-
print("args", args)
30-
print("kwargs", kwargs)
31-
else:
32-
func_path = os.environ.get("FUNC_PATH", "")
46+
return func_path, args or [], kwargs or {}
47+
else: # all other modes (dev_docker, prod_local and prod)
48+
func_path = os.environ.get("FUNC_PATH", func_path)
3349
args = json.loads(os.environ.get("FUNC_ARGS", "[]"))
3450
kwargs = json.loads(os.environ.get("FUNC_KWARGS", "{}"))
35-
params: dict = _extract_params_from_env()
36-
kwargs["params"] = params
37-
print("args", args)
38-
print("kwargs", kwargs)
51+
kwargs["params"] = _extract_params_from_env()
52+
return func_path, args, kwargs
3953

40-
if not func_path:
41-
raise ValueError("func_path must be provided")
4254

43-
module_path, func_name = func_path.rsplit(":", 1)
55+
def _import_function(func_path: str):
4456
import importlib
4557

58+
module_path, func_name = func_path.rsplit(":", 1)
4659
module = importlib.import_module(module_path)
47-
func = getattr(module, func_name)
60+
return getattr(module, func_name)
4861

49-
print(f"Running {func_path} with args: {args} and kwargs :{kwargs}")
50-
result = func(*args, **kwargs)
51-
print("Function result:", result)
62+
63+
def _write_result(result, mode):
5264
if mode == "prod" or mode == "prod_local":
53-
# This is needed when we use KubernetesPodOperator and want to
54-
# share information via XCOM.
5565
_write_xcom_result(result)
5666
if mode == "dev_docker":
5767
with open("/tmp/script.out", "wb+") as tmp:
5868
pickle.dump(result, tmp)
5969

60-
return result
61-
6270

6371
def _write_xcom_result(result: Any) -> None:
6472
try:
@@ -68,12 +76,6 @@ def _write_xcom_result(result: Any) -> None:
6876
with open(f"{xcom_dir}/return.json", "w") as f:
6977
json.dump(result, f)
7078

71-
path = "/airflow/xcom/return.json"
72-
print("[DEBUG] File exists:", os.path.exists(path))
73-
print("[DEBUG] File size:", os.path.getsize(path))
74-
with open(path, "r") as f:
75-
print("[DEBUG] File contents:", f.read())
76-
7779
print("Result written to XCom successfully")
7880
except Exception as e:
7981
print(f"Failed to write XCom result: {e}")

0 commit comments

Comments
 (0)