Skip to content

Commit c86f5b5

Browse files
authored
Merge pull request #1 from m3dev/feature/env_init
Env setup
2 parents 04a7e11 + 90d9c14 commit c86f5b5

File tree

11 files changed

+2691
-0
lines changed

11 files changed

+2691
-0
lines changed

.github/workflows/publish.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: Publish
2+
3+
on:
4+
push:
5+
tags: '*'
6+
7+
jobs:
8+
deploy:
9+
10+
runs-on: ubuntu-latest
11+
12+
steps:
13+
- uses: actions/checkout@v3
14+
- name: Set up Python
15+
uses: actions/setup-python@v4
16+
with:
17+
python-version: '3.x'
18+
- name: Install dependencies and build
19+
run: |
20+
python -m pip install --upgrade pip
21+
python -m pip install poetry poetry-dynamic-versioning twine
22+
- name: Build and publish
23+
env:
24+
TWINE_USERNAME: __token__
25+
TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
26+
run: |
27+
poetry publish --build --username $TWINE_USERNAME --password $TWINE_PASSWORD

.github/workflows/test.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: Test
2+
3+
on:
4+
push:
5+
branches: [ master ]
6+
pull_request:
7+
8+
jobs:
9+
tests:
10+
runs-on: ${{ matrix.platform }}
11+
strategy:
12+
max-parallel: 4
13+
matrix:
14+
platform: [ubuntu-latest]
15+
python-version: ["3.8", "3.9", "3.10", "3.11"]
16+
17+
steps:
18+
- uses: actions/checkout@v3
19+
- name: Set up Python ${{ matrix.python-version }}
20+
uses: actions/setup-python@v4
21+
with:
22+
python-version: ${{ matrix.python-version }}
23+
- name: Install dependencies
24+
run: |
25+
python -m pip install --upgrade pip
26+
python -m pip install tox-gh-actions poetry
27+
poetry install
28+
- name: Test with tox
29+
run: poetry run tox

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# kannon
2+
3+
Kannon is a wrapper for the [gokart](https://github.com/m3dev/gokart) library that allows gokart tasks to be easily executed in a distributed and parallel manner on multiple [kubernetes](https://kubernetes.io/) jobs.
4+
5+
# Thanks
6+
7+
Kannon is a wrapper for gokart. Thanks to gokart and dependent projects!
8+
9+
- [gokart](https://github.com/m3dev/gokart)

kannon/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from kannon.task import TaskOnBullet
2+
from kannon.master import Kannon

kannon/kube_util.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import enum
2+
from time import sleep
3+
from kubernetes import client
4+
from datetime import datetime
5+
import random
6+
7+
import logging
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class JobStatus(enum.Enum):
13+
RUNNING = 0
14+
SUCCEEDED = 1
15+
FAILED = 2
16+
17+
18+
def create_job(
19+
api_instance: client.BatchV1Api, job: client.V1Job, namespace: str
20+
) -> None:
21+
api_response = api_instance.create_namespaced_job(
22+
body=job,
23+
namespace=namespace,
24+
)
25+
logger.debug(f"Job created. status={api_response.status}")
26+
27+
def get_job_status(
28+
api_instance: client.BatchV1Api, job_name: str, namespace: str
29+
) -> JobStatus:
30+
api_response = api_instance.read_namespaced_job_status(
31+
name=job_name, namespace=namespace
32+
)
33+
if (
34+
api_response.status.succeeded is not None
35+
or api_response.status.failed is not None
36+
):
37+
final_status = (
38+
JobStatus.SUCCEEDED if api_response.status.succeeded else JobStatus.FAILED
39+
)
40+
return final_status
41+
return JobStatus.RUNNING
42+
43+
44+
def gen_job_name(job_prefix: str) -> str:
45+
job_name = f"{job_prefix}-{str(random.randint(0, 255)).zfill(3)}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
46+
# TODO: validate job_name more precisely
47+
job_name = job_name[:50]
48+
job_name = job_name.replace("_", "-").lower()
49+
return job_name

kannon/master.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
from collections import deque
2+
import os
3+
from time import sleep
4+
from typing import Deque, Dict, List, Set
5+
import logging
6+
7+
import gokart
8+
from kubernetes import client
9+
10+
from .task import TaskOnBullet
11+
from .kube_util import create_job, JobStatus, gen_job_name, get_job_status
12+
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class Kannon:
18+
def __init__(
19+
self,
20+
api_instance: client.BatchV1Api,
21+
namespace: str,
22+
image_name: str,
23+
container_name: str,
24+
service_account_name: str,
25+
job_prefix: str,
26+
path_child_script: str,
27+
env_to_inherit: List[str],
28+
backoff_limit: int = 0,
29+
) -> None:
30+
# validation
31+
if not os.path.exists(path_child_script):
32+
raise FileNotFoundError(f"Child script {path_child_script} does not exist.")
33+
if backoff_limit < 0:
34+
raise ValueError(f"backoff_limit should be >= 0")
35+
self.api_instance = api_instance
36+
self.namespace = namespace
37+
self.image_name = image_name
38+
self.container_name = container_name
39+
self.service_account_name = service_account_name
40+
self.job_prefix = job_prefix
41+
self.path_child_script = path_child_script
42+
self.env_to_inherit = env_to_inherit
43+
self.backoff_limit = backoff_limit
44+
45+
self.task_id_to_job_name: Dict[str, str] = dict()
46+
47+
def build(self, root_task: gokart.TaskOnKart):
48+
# push tasks into queue
49+
logger.info("Creating task queue...")
50+
task_queue = self._create_task_queue(root_task)
51+
52+
# consume task queue
53+
launched_task_ids: Set[str] = set()
54+
logger.info("Consuming task queue...")
55+
while task_queue:
56+
task = task_queue.popleft()
57+
if task.complete():
58+
logger.info(f"Task {self._gen_task_info(task)} is already done.")
59+
continue
60+
if task.make_unique_id() in launched_task_ids:
61+
logger.info(f"Task {self._gen_task_info(task)} is already running.")
62+
continue
63+
64+
logger.info(
65+
f"Checking if task {self._gen_task_info(task)} is executable..."
66+
)
67+
# TODO: enable user to specify duration to sleep for each task
68+
sleep(1.0)
69+
if not self._is_executable(task):
70+
task_queue.append(task)
71+
continue
72+
# execute task
73+
if isinstance(task, TaskOnBullet):
74+
logger.info(
75+
f"Trying to run task {self._gen_task_info(task)} on child job..."
76+
)
77+
self._exec_bullet_task(task)
78+
elif isinstance(task, gokart.TaskOnKart):
79+
logger.info(
80+
f"Executing task {self._gen_task_info(task)} on master job..."
81+
)
82+
self._exec_gokart_task(task)
83+
logger.info(
84+
f"Completed task {self._gen_task_info(task)} on master job."
85+
)
86+
else:
87+
raise TypeError(f"Invalid task type: {type(task)}")
88+
launched_task_ids.add(task.make_unique_id())
89+
90+
logger.info(f"All tasks completed!")
91+
92+
def _create_task_queue(
93+
self, root_task: gokart.TaskOnKart
94+
) -> Deque[gokart.TaskOnKart]:
95+
task_queue: Deque[gokart.TaskOnKart] = deque()
96+
97+
def _rec_enqueue_task(task: gokart.TaskOnKart) -> None:
98+
"""Traversal task tree in post-order to push tasks into task queue."""
99+
nonlocal task_queue
100+
# run children
101+
children = task.requires()
102+
if isinstance(children, dict):
103+
children = children.values()
104+
for child in children:
105+
_rec_enqueue_task(child)
106+
107+
task_queue.append(task)
108+
logger.info(f"Task {self._gen_task_info(task)} is pushed to task queue")
109+
110+
_rec_enqueue_task(root_task)
111+
return task_queue
112+
113+
def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None:
114+
# Run on master job
115+
try:
116+
gokart.build(task)
117+
except Exception:
118+
raise RuntimeError(
119+
f"Task {self._gen_task_info(task)} on job master has failed."
120+
)
121+
122+
def _exec_bullet_task(self, task: TaskOnBullet) -> None:
123+
# Run on child job
124+
serialized_task = gokart.TaskInstanceParameter().serialize(task)
125+
job_name = gen_job_name(f"{self.job_prefix}-{task.get_task_family()}")
126+
job = self._create_job_object(
127+
job_name=job_name,
128+
serialized_task=serialized_task,
129+
)
130+
create_job(self.api_instance, job, self.namespace)
131+
logger.info(
132+
f"Created child job {job_name} with task {self._gen_task_info(task)}"
133+
)
134+
task_unique_id = task.make_unique_id()
135+
self.task_id_to_job_name[task_unique_id] = job_name
136+
137+
@staticmethod
138+
def _gen_task_info(task: gokart.TaskOnKart) -> str:
139+
return f"{task.get_task_family()}_{task.make_unique_id()}"
140+
141+
def _create_job_object(self, serialized_task: str, job_name: str) -> client.V1Job:
142+
# TODO: use python -c to avoid dependency to execute_task.py
143+
cmd = [
144+
"python",
145+
self.path_child_script,
146+
"--serialized-task",
147+
f"'{serialized_task}'",
148+
]
149+
child_envs = []
150+
for env_name in self.env_to_inherit:
151+
if env_name not in os.environ:
152+
raise ValueError(f"Envvar {env_name} does not exist.")
153+
child_envs.append({"name": env_name, "value": os.environ.get(env_name)})
154+
container = client.V1Container(
155+
name=self.container_name,
156+
image=self.image_name,
157+
command=cmd,
158+
env=child_envs,
159+
)
160+
template = client.V1PodTemplateSpec(
161+
metadata=client.V1ObjectMeta(labels={"app": "kannon"}),
162+
spec=client.V1PodSpec(
163+
restart_policy="Never",
164+
containers=[container],
165+
service_account_name=self.service_account_name,
166+
),
167+
)
168+
spec = client.V1JobSpec(template=template, backoff_limit=self.backoff_limit)
169+
job = client.V1Job(
170+
api_version="batch/v1",
171+
kind="Job",
172+
metadata=client.V1ObjectMeta(
173+
name=job_name,
174+
namespace=self.namespace,
175+
),
176+
spec=spec,
177+
)
178+
179+
return job
180+
181+
def _is_executable(self, task: gokart.TaskOnKart) -> bool:
182+
children = task.requires()
183+
if isinstance(children, dict):
184+
children = children.values()
185+
186+
for child in children:
187+
if not child.complete():
188+
return False
189+
if child.make_unique_id() not in self.task_id_to_job_name:
190+
continue
191+
job_name = self.task_id_to_job_name[child.make_unique_id()]
192+
job_status = get_job_status(
193+
self.api_instance,
194+
job_name,
195+
self.namespace,
196+
)
197+
if job_status == JobStatus.FAILED:
198+
raise RuntimeError(
199+
f"Task {self._gen_task_info(child)} on job {job_name} has failed."
200+
)
201+
if job_status == JobStatus.RUNNING:
202+
return False
203+
return True

kannon/task.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import gokart
2+
3+
4+
class TaskOnBullet(gokart.TaskOnKart):
5+
pass

0 commit comments

Comments
 (0)