Skip to content

Commit 1077117

Browse files
Kubernetes executor for cluster_tools (#600)
* wip kubernetes scheduler * proof-of-concept * refactoring * fixes * fixes * fixes * fixes * fixes * fixes * ci * ci * use jobs * Merge branch 'master' into cluster-k8s * fixes * Merge branch 'cluster-k8s' of github.com:scalableminds/webknossos-libs into cluster-k8s * ttl for job * poetry locks * test refactoring * ci * ci * fixes? * ci * ci * fixes * fixes * mounts * fixes for mounts * changelog * readme * readme * pr feedback + wkcuber integration * refactor tests * ci * test fixes * test fixes * ci * ci * ci * fixes? * ci * bool * better job_id, job_index separation * fixes? * fixes * fixes * reactivate tests * deduplicate mounts * fixes * readme * ci * fixes * using objects instead of dict * ci * ci * ci * ci * ci * Apply suggestions from code review Co-authored-by: Philipp Otto <[email protected]> * better mount test * Merge branch 'master' into cluster-k8s * comment
1 parent 6d9e601 commit 1077117

File tree

24 files changed

+1784
-478
lines changed

24 files changed

+1784
-478
lines changed

.github/workflows/ci.yml

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,18 @@ jobs:
3030
needs: changes
3131
if: ${{ needs.changes.outputs.cluster_tools == 'true' }}
3232
runs-on: ubuntu-latest
33+
timeout-minutes: 30
34+
strategy:
35+
max-parallel: 4
36+
matrix:
37+
executors: [multiprocessing, slurm, kubernetes]
3338
defaults:
3439
run:
3540
working-directory: cluster_tools
3641
steps:
3742
- uses: actions/checkout@v2
3843
- name: Build/pull dockered-slurm image
44+
if: ${{ matrix.executors == 'slurm' }}
3945
run: |
4046
cd ./dockered-slurm
4147
@@ -50,26 +56,61 @@ jobs:
5056
done
5157
5258
# Run setup.py on all three nodes
53-
docker exec slurmctld bash -c "cd /cluster_tools && poetry install"
54-
docker exec c1 bash -c "cd /cluster_tools && poetry install"
55-
docker exec c2 bash -c "cd /cluster_tools && poetry install"
59+
docker exec -w /cluster_tools slurmctld bash -c "poetry install" &
60+
docker exec -w /cluster_tools c1 bash -c "poetry install" &
61+
docker exec -w /cluster_tools c2 bash -c "poetry install" &
62+
wait
63+
64+
- name: Setup Kubernetes-in-Docker
65+
if: ${{ matrix.executors == 'kubernetes' }}
66+
run: |
67+
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64
68+
chmod +x ./kind
69+
sed -i "s#__PATH__#$(pwd)#g" tests/cluster-config.yaml
70+
./kind create cluster --config=tests/cluster-config.yaml
71+
./kind export kubeconfig
72+
73+
docker build -f tests/Dockerfile -t scalableminds/cluster-tools:latest .
74+
./kind load docker-image scalableminds/cluster-tools:latest
5675
5776
- name: Install dependencies (without docker)
77+
if: ${{ matrix.executors == 'multiprocessing' || matrix.executors == 'kubernetes' }}
5878
run: |
5979
pip install poetry
6080
poetry install
6181
6282
- name: Check formatting
83+
if: ${{ matrix.executors == 'multiprocessing' }}
6384
run: ./format.sh check
6485

6586
- name: Lint code
87+
if: ${{ matrix.executors == 'multiprocessing' }}
6688
run: ./lint.sh
6789

68-
- name: Run tests
90+
- name: Run multiprocessing tests
91+
if: ${{ matrix.executors == 'multiprocessing' }}
92+
run: |
93+
cd tests
94+
PYTEST_EXECUTORS=multiprocessing,sequential,test_pickling,debug_sequential \
95+
poetry run python -m pytest -sv test_all.py test_multiprocessing.py
96+
97+
- name: Run slurm tests
98+
if: ${{ matrix.executors == 'slurm' }}
6999
run: |
70100
cd ./dockered-slurm
71-
docker exec slurmctld bash -c "cd /cluster_tools/tests && poetry run python -m pytest -s tests.py"
72-
docker exec slurmctld bash -c "cd /cluster_tools/tests && poetry run python tests.py"
101+
docker exec \
102+
-w /cluster_tools/tests \
103+
-e PYTEST_EXECUTORS=slurm \
104+
slurmctld bash -c "poetry run python -m pytest -sv test_all.py test_slurm.py"
105+
docker exec \
106+
-w /cluster_tools/tests \
107+
slurmctld bash -c "poetry run python test_deref_main.py"
108+
109+
- name: Run kubernetes tests
110+
if: ${{ matrix.executors == 'kubernetes' }}
111+
run: |
112+
cd tests
113+
PYTEST_EXECUTORS=kubernetes poetry run python -m pytest -sv test_all.py test_kubernetes.py
73114
74115
webknossos_linux:
75116
needs: changes

cluster_tools/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
1212
### Breaking Changes
1313

1414
### Added
15+
* Added `KubernetesExecutor` for parallelizing Python scripts on a Kubernetes cluster. [#600](https://github.com/scalableminds/webknossos-libs/pull/600)
1516

1617
### Changed
1718

cluster_tools/README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
This package provides python `Executor` classes for distributing tasks on a slurm cluster or via multi processing.
66

7-
87
## Example
98

109
```python
@@ -24,11 +23,35 @@ if __name__ == '__main__':
2423

2524
### Slurm
2625

27-
The cluster_tools automatically determine the slurm limit for maximum array job size and split up larger job batches into multiple smaller batches.
26+
The `cluster_tools` automatically determine the slurm limit for maximum array job size and split up larger job batches into multiple smaller batches.
2827
Also, the slurm limit for the maximum number of jobs which are allowed to be submitted by a user at the same time is honored by looking up the number of currently submitted jobs and only submitting new batches if they fit within the limit.
2928

3029
If you would like to configure these limits independently, you can do so by setting the `SLURM_MAX_ARRAY_SIZE` and `SLURM_MAX_SUBMIT_JOBS` environment variables.
3130

31+
### Kubernetes
32+
33+
#### Resource configuration
34+
35+
| Key | Description | Example |
36+
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------- |
37+
| `namespace` | Kubernetes namespace for the resources to be created. Will be created if not existent. | `cluster-tools` |
38+
| `node_selector` | Which nodes to utilize for the processing. Needs to be a [Kubernetes `nodeSelector` object](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/). | `{"kubernetes.io/hostname": "node001"}` |
39+
| `image` | The docker image for the containerized jobs to run in. The image needs to have the same version of `cluster_tools` and the code to run installed and in the `PYTHONPATH`. | `scalableminds/voxelytics:latest` |
40+
| `mounts` | Additional mounts for the containerized jobs. The current working directory and the `.cfut` directory are automatically mounted. | `["/srv", "/data"]` |
41+
| `cpu` | [CPU requirements](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) for this job. | `4` |
42+
| `memory` | [Memory requirements](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) for this job. Not required, but highly recommended to avoid congestion. Without resource requirements, all jobs will be run in parallel and RAM will run out soon. | `16G` |
43+
| `python_executable` | The python executable may differ in the docker image from the one in the current environment. For images based of `FROM python`, it should be `python`. Defaults to `python`. | `python3.8` |
44+
| `umask` | `umask` for the jobs. | `0002` |
45+
46+
#### Notes
47+
48+
- The jobs are run with the current `uid:gid`.
49+
- The jobs are removed 7 days after completion (successful or not).
50+
- The logs are stored in the `.cfut` directory. This is actually redundant, because Kubernetes also stores them.
51+
- Pods are not restarted upon error.
52+
- Requires Kubernetes ≥ 1.23.
53+
- [Kubernetes cluster configuration](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/) is expected to be the same as for `kubectl`, i.e. in `~/.kube/config` or similar.
54+
3255
## Dev Setup
3356

3457
```

cluster_tools/cluster_tools/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from . import pickling
1212
from .multiprocessing_logging_handler import get_multiprocessing_logging_setup_fn
13+
from .schedulers.kube import KubernetesExecutor
1314
from .schedulers.pbs import PBSExecutor
1415
from .schedulers.slurm import SlurmExecutor
1516
from .util import enrich_future_with_uncaught_warning
@@ -326,6 +327,8 @@ def get_executor(environment, **kwargs):
326327
return SlurmExecutor(**kwargs)
327328
elif environment == "pbs":
328329
return PBSExecutor(**kwargs)
330+
elif environment == "kubernetes":
331+
return KubernetesExecutor(**kwargs)
329332
elif environment == "multiprocessing":
330333
global did_start_test_multiprocessing
331334
if not did_start_test_multiprocessing:

cluster_tools/cluster_tools/remote.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@
44
import sys
55
import traceback
66

7+
from cluster_tools.schedulers.kube import KubernetesExecutor
78
from cluster_tools.schedulers.pbs import PBSExecutor
89
from cluster_tools.schedulers.slurm import SlurmExecutor
910
from cluster_tools.util import with_preliminary_postfix
1011

1112
from . import pickling
1213

1314

14-
def get_executor_class():
15-
for executor in [SlurmExecutor, PBSExecutor]:
16-
if executor.get_current_job_id() is not None:
17-
return executor
15+
def get_executor_class(executor_key):
16+
return {
17+
"slurm": SlurmExecutor,
18+
"pbs": PBSExecutor,
19+
"kubernetes": KubernetesExecutor,
20+
}.get(executor_key)
1821

1922

2023
def format_remote_exc():
@@ -23,16 +26,16 @@ def format_remote_exc():
2326
return "".join(traceback.format_exception(typ, value, tb))
2427

2528

26-
def get_custom_main_path(workerid):
29+
def get_custom_main_path(workerid, executor):
2730
custom_main_path = None
28-
main_meta_path = get_executor_class().get_main_meta_path(cfut_dir, workerid)
31+
main_meta_path = executor.get_main_meta_path(cfut_dir, workerid)
2932
if os.path.exists(main_meta_path):
3033
with open(main_meta_path, "r") as file:
3134
custom_main_path = file.read()
3235
return custom_main_path
3336

3437

35-
def worker(workerid, job_array_index, job_array_index_offset, cfut_dir):
38+
def worker(executor, workerid, job_array_index, job_array_index_offset, cfut_dir):
3639
"""Called to execute a job on a remote host."""
3740

3841
if job_array_index is not None:
@@ -42,13 +45,12 @@ def worker(workerid, job_array_index, job_array_index_offset, cfut_dir):
4245
else:
4346
workerid_with_idx = worker_id
4447

45-
executor = get_executor_class()
4648
try:
4749
input_file_name = executor.format_infile_name(cfut_dir, workerid_with_idx)
4850
print("trying to read: ", input_file_name)
4951
print("working dir: ", os.getcwd())
5052

51-
custom_main_path = get_custom_main_path(workerid)
53+
custom_main_path = get_custom_main_path(workerid, executor)
5254
with open(input_file_name, "rb") as f:
5355
unpickled_tuple = pickling.load(f, custom_main_path)
5456
if len(unpickled_tuple) == 4:
@@ -129,12 +131,14 @@ def setup_logging(meta_data, executor, cfut_dir):
129131

130132

131133
if __name__ == "__main__":
132-
worker_id = sys.argv[1]
133-
cfut_dir = sys.argv[2]
134-
job_array_index_offset = sys.argv[3] if len(sys.argv) > 3 else "0"
135-
job_array_index = get_executor_class().get_job_array_index()
136-
137-
worker(worker_id, job_array_index, job_array_index_offset, cfut_dir)
134+
executor_key = sys.argv[1]
135+
executor = get_executor_class(executor_key)
136+
worker_id = sys.argv[2]
137+
cfut_dir = sys.argv[3]
138+
job_array_index_offset = sys.argv[4] if len(sys.argv) > 4 else "0"
139+
job_array_index = executor.get_job_array_index()
140+
141+
worker(executor, worker_id, job_array_index, job_array_index_offset, cfut_dir)
138142
# This is a workaround for the case that some subprocesses are still hanging around and are waited for.
139143
# If this point is reached, results were written to disk and we can "safely" shut down everything.
140144
sys.exit()

cluster_tools/cluster_tools/schedulers/cluster_executor.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ def __init__(
9393
if "logging_setup_fn" in kwargs:
9494
self.meta_data["logging_setup_fn"] = kwargs["logging_setup_fn"]
9595

96+
@classmethod
97+
@abstractmethod
98+
def executor_key(cls):
99+
pass
100+
96101
def handle_kill(self, _signum, _frame):
97102
self.wait_thread.stop()
98103
job_ids = ",".join(str(id) for id in self.jobs.keys())
@@ -104,17 +109,19 @@ def handle_kill(self, _signum, _frame):
104109
sys.exit(130)
105110

106111
@abstractmethod
107-
def check_for_crashed_job(self, job_id) -> Union["failed", "ignore", "completed"]:
112+
def check_for_crashed_job(
113+
self, job_id_with_index
114+
) -> Union["failed", "ignore", "completed"]:
108115
pass
109116

110117
def _start(self, workerid, job_count=None, job_name=None):
111118
"""Start job(s) with the given worker ID and return IDs
112119
identifying the new job(s). The job should run ``python -m
113-
cfut.remote <workerid>.
120+
cfut.remote <executorkey> <workerid>.
114121
"""
115122

116123
jobids_futures, job_index_ranges = self.inner_submit(
117-
f"{sys.executable} -m cluster_tools.remote {workerid} {self.cfut_dir}",
124+
f"{self.get_python_executable()} -m cluster_tools.remote {self.executor_key()} {workerid} {self.cfut_dir}",
118125
job_name=self.job_name if self.job_name is not None else job_name,
119126
additional_setup_lines=self.additional_setup_lines,
120127
job_count=job_count,
@@ -145,12 +152,14 @@ def _cleanup(self, jobid):
145152

146153
@staticmethod
147154
@abstractmethod
148-
def format_log_file_name(jobid, suffix=".stdout"):
155+
def format_log_file_name(job_id_with_index, suffix=".stdout"):
149156
pass
150157

151158
@classmethod
152-
def format_log_file_path(cls, cfut_dir, jobid, suffix=".stdout"):
153-
return os.path.join(cfut_dir, cls.format_log_file_name(jobid, suffix))
159+
def format_log_file_path(cls, cfut_dir, job_id_with_index, suffix=".stdout"):
160+
return os.path.join(
161+
cfut_dir, cls.format_log_file_name(job_id_with_index, suffix)
162+
)
154163

155164
@classmethod
156165
@abstractmethod
@@ -169,6 +178,9 @@ def format_infile_name(cfut_dir, job_id):
169178
def format_outfile_name(cfut_dir, job_id):
170179
return os.path.join(cfut_dir, "cfut.out.%s.pickle" % job_id)
171180

181+
def get_python_executable(self):
182+
return sys.executable
183+
172184
def _completion(self, jobid, failed_early):
173185
"""Called whenever a job finishes."""
174186
with self.jobs_lock:
@@ -278,7 +290,7 @@ def submit(self, fun, *args, **kwargs):
278290
jobid = jobids_futures[0].result()
279291

280292
if self.debug:
281-
print("job submitted: %i" % jobid, file=sys.stderr)
293+
print(f"job submitted: {jobid}", file=sys.stderr)
282294

283295
# Thread will wait for it to finish.
284296
self.wait_thread.waitFor(preliminary_output_pickle_path, jobid)
@@ -289,10 +301,12 @@ def submit(self, fun, *args, **kwargs):
289301
fut.cluster_jobid = jobid
290302
return fut
291303

292-
def get_workerid_with_index(self, workerid, index):
304+
@classmethod
305+
def get_workerid_with_index(cls, workerid, index):
293306
return workerid + "_" + str(index)
294307

295-
def get_jobid_with_index(self, jobid, index):
308+
@classmethod
309+
def get_jobid_with_index(cls, jobid, index):
296310
return str(jobid) + "_" + str(index)
297311

298312
def get_function_pickle_path(self, workerid):

0 commit comments

Comments
 (0)