Skip to content

Commit 053f80b

Browse files
maronuuyutaro-oguri
andauthored
Feature/inherit config support (#39)
* Add additional waiting * fix mock of kube api * add util method to check child task * fix exception * Add owner reference from child job to parent pod * Support dynamically generated config * add TODO * fix typing * remove unused import * fix mock kannon * remove unnecesarry import * Use root_task.workspace_directory instead of ENVVAR * Validate env_to_inherit is not None when iterating * update * dynamic config path as list * fix type error --------- Co-authored-by: yutaro-oguri <yutaro-oguri@m3.com>
1 parent 67c041b commit 053f80b

File tree

3 files changed

+53
-102
lines changed

3 files changed

+53
-102
lines changed

kannon/master.py

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from collections import deque
66
from copy import deepcopy
77
from time import sleep
8-
from typing import List, Optional
98

109
import gokart
1110
from gokart.target import make_target
@@ -28,9 +27,10 @@ def __init__(
2827
# kannon resources
2928
job_prefix: str,
3029
path_child_script: str = "./run_child.py",
31-
env_to_inherit: Optional[List[str]] = None,
32-
master_pod_name: Optional[str] = None,
33-
master_pod_uid: Optional[str] = None,
30+
env_to_inherit: list[str] | None = None,
31+
master_pod_name: str | None = None,
32+
master_pod_uid: str | None = None,
33+
dynamic_config_paths: list[str] | None = None,
3434
max_child_jobs: int | None = None,
3535
) -> None:
3636
# validation
@@ -42,12 +42,11 @@ def __init__(
4242
self.namespace = template_job.metadata.namespace
4343
self.job_prefix = job_prefix
4444
self.path_child_script = path_child_script
45-
if env_to_inherit is None:
46-
env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"]
4745
self.env_to_inherit = env_to_inherit
4846

4947
self.master_pod_name = master_pod_name
5048
self.master_pod_uid = master_pod_uid
49+
self.dynamic_config_paths = dynamic_config_paths
5150

5251
if max_child_jobs is not None and max_child_jobs <= 0:
5352
raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}")
@@ -56,6 +55,28 @@ def __init__(
5655
self.task_id_to_job_name: dict[str, str] = dict()
5756

5857
def build(self, root_task: gokart.TaskOnKart) -> None:
58+
# TODO: support multiple dynamic config files
59+
# use workspace directory of root task as the root directory for remote cache
60+
workspace_dir = root_task.workspace_directory
61+
remote_config_path = None
62+
if self.dynamic_config_paths:
63+
assert len(self.dynamic_config_paths) == 1, "Currently kannon doesn't support multiple dynamic config files."
64+
dynamic_config_path = self.dynamic_config_paths[0]
65+
logger.info("Handling dynamic config files...")
66+
logger.info(f"Handling given config file {dynamic_config_path}")
67+
# save configs to remote cache
68+
remote_config_dir = os.path.join(workspace_dir, "kannon", "conf")
69+
# TODO: support other format than .ini
70+
if not dynamic_config_path.endswith(".ini"):
71+
raise ValueError(f"Format {dynamic_config_path} is not supported.")
72+
# load local config and save it to remote cache
73+
local_conf_content = make_target(dynamic_config_path).load()
74+
remote_config_path = os.path.join(remote_config_dir, os.path.basename(dynamic_config_path))
75+
make_target(remote_config_path).dump(local_conf_content)
76+
logger.info(f"local config file {dynamic_config_path} is saved at remote {remote_config_path}.")
77+
else:
78+
logger.info("No dynamic config files are given.")
79+
5980
# push tasks into queue
6081
logger.info("Creating task queue...")
6182
task_queue = self._create_task_queue(root_task)
@@ -91,7 +112,7 @@ def build(self, root_task: gokart.TaskOnKart) -> None:
91112
logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...")
92113
continue
93114
logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...")
94-
self._exec_bullet_task(task)
115+
self._exec_bullet_task(task, remote_config_path)
95116
running_task_ids.add(task.make_unique_id()) # mark as already launched task
96117
task_queue.append(task) # re-enqueue task to check if it is done
97118
elif isinstance(task, gokart.TaskOnKart):
@@ -133,7 +154,7 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None:
133154
except Exception:
134155
raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.")
135156

136-
def _exec_bullet_task(self, task: TaskOnBullet) -> None:
157+
def _exec_bullet_task(self, task: TaskOnBullet, remote_config_path: str | None) -> None:
137158
# Save task instance as pickle object
138159
pkl_path = self._gen_pkl_path(task)
139160
make_target(pkl_path).dump(task)
@@ -142,19 +163,28 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None:
142163
job = self._create_child_job_object(
143164
job_name=job_name,
144165
task_pkl_path=pkl_path,
166+
remote_config_path=remote_config_path,
145167
)
146168
create_job(self.api_instance, job, self.namespace)
147169
logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}")
148170
self.task_id_to_job_name[task.make_unique_id()] = job_name
149171

150-
def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job:
172+
def _create_child_job_object(
173+
self,
174+
job_name: str,
175+
task_pkl_path: str,
176+
remote_config_path: str | None = None,
177+
) -> client.V1Job:
151178
# TODO: use python -c to avoid dependency to execute_task.py
152179
cmd = [
153180
"python",
154181
self.path_child_script,
155182
"--task-pkl-path",
156183
f"'{task_pkl_path}'",
157184
]
185+
if remote_config_path:
186+
cmd.append("--remote-config-path")
187+
cmd.append(remote_config_path)
158188
job = deepcopy(self.template_job)
159189
# replace command
160190
assert job.spec.template.spec.containers[0].command is None, \
@@ -164,10 +194,11 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.
164194
child_envs = job.spec.template.spec.containers[0].env
165195
if not child_envs:
166196
child_envs = []
167-
for env_name in self.env_to_inherit:
168-
if env_name not in os.environ:
169-
raise ValueError(f"Envvar {env_name} does not exist.")
170-
child_envs.append({"name": env_name, "value": os.environ.get(env_name)})
197+
if self.env_to_inherit:
198+
for env_name in self.env_to_inherit:
199+
if env_name not in os.environ:
200+
raise ValueError(f"Envvar {env_name} does not exist.")
201+
child_envs.append({"name": env_name, "value": os.environ.get(env_name)})
171202
job.spec.template.spec.containers[0].env = child_envs
172203
# replace job name
173204
job.metadata.name = job_name

0 commit comments

Comments
 (0)