Skip to content

Commit 1d0087a

Browse files
authored
add global make commands, add cluster-tools typechecking (#683)
* add format/typecheck/lint/test commands to makefile * add wkcuber typing (unclear why CI doesn't complain, got errors locally) * load .env also for mp start method * upgrade mypy in wkcuber * add typechecking for cluster_tools * update development docs * undo dotenv loading * remove cloudpickle from deps, format
1 parent 5d667a7 commit 1d0087a

File tree

25 files changed

+267
-264
lines changed

25 files changed

+267
-264
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ jobs:
8585
pip install poetry
8686
poetry install
8787
88+
- name: Check typing
89+
if: ${{ matrix.executors == 'multiprocessing' }}
90+
run: ./typecheck.sh
91+
8892
- name: Check formatting
8993
if: ${{ matrix.executors == 'multiprocessing' }}
9094
run: ./format.sh check
@@ -150,8 +154,7 @@ jobs:
150154
run: ./lint.sh
151155

152156
- name: Check typing
153-
run: |
154-
./typecheck.sh
157+
run: ./typecheck.sh
155158

156159
- name: Python tests
157160
env:

Makefile

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
packages_by_priority := webknossos wkcuber cluster_tools docs
22
packages_by_dependency := cluster_tools webknossos wkcuber docs
3+
code_packages := cluster_tools webknossos wkcuber
34

45
define in_each_pkg_by_dependency
56
for PKG in $(packages_by_dependency); do echo $$PKG; cd $$PKG; $1; cd ..; done
67
endef
78

8-
.PHONY: list_packages_by_priority update update-internal install
9+
define in_each_code_pkg
10+
for PKG in $(code_packages); do echo $$PKG; cd $$PKG; $1; cd ..; done
11+
endef
12+
13+
.PHONY: list_packages_by_priority update update-internal install format lint typecheck flt test
914

1015
list_packages_by_priority:
1116
@echo $(packages_by_priority)
@@ -18,3 +23,18 @@ update-internal:
1823

1924
install:
2025
$(call in_each_pkg_by_dependency, poetry install)
26+
27+
format:
28+
$(call in_each_code_pkg, ./format.sh)
29+
30+
lint:
31+
$(call in_each_code_pkg, ./lint.sh)
32+
33+
typecheck:
34+
$(call in_each_code_pkg, ./typecheck.sh)
35+
36+
flt:
37+
$(call in_each_code_pkg, ./format.sh && ./lint.sh && ./typecheck.sh)
38+
39+
test:
40+
$(call in_each_code_pkg, ./test.sh)

cluster_tools/cluster_tools/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import logging
21
import multiprocessing
32
import os
43
import tempfile

cluster_tools/cluster_tools/pickling.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,7 @@
11
import os
2+
import pickle
23
import sys
34

4-
use_cloudpickle = "USE_CLOUDPICKLE" in os.environ
5-
6-
if use_cloudpickle:
7-
import cloudpickle
8-
9-
pickle_strategy = cloudpickle
10-
else:
11-
import pickle
12-
13-
pickle_strategy = pickle
145
from .util import warn_after
156

167
WARNING_TIMEOUT = 10 * 60 # seconds
@@ -45,27 +36,23 @@ def get_suitable_pickle_protocol():
4536

4637
@warn_after("pickle.dumps", WARNING_TIMEOUT)
4738
def dumps(*args, **kwargs):
48-
return pickle_strategy.dumps(
49-
*args, protocol=get_suitable_pickle_protocol(), **kwargs
50-
)
39+
return pickle.dumps(*args, protocol=get_suitable_pickle_protocol(), **kwargs)
5140

5241

5342
@warn_after("pickle.dump", WARNING_TIMEOUT)
5443
def dump(*args, **kwargs):
55-
return pickle_strategy.dump(
56-
*args, protocol=get_suitable_pickle_protocol(), **kwargs
57-
)
44+
return pickle.dump(*args, protocol=get_suitable_pickle_protocol(), **kwargs)
5845

5946

6047
@warn_after("pickle.loads", WARNING_TIMEOUT)
6148
def loads(*args, **kwargs):
6249
assert (
6350
"custom_main_path" not in kwargs
6451
), "loads does not implement support for the argument custom_main_path"
65-
return pickle_strategy.loads(*args, **kwargs)
52+
return pickle.loads(*args, **kwargs)
6653

6754

68-
class RenameUnpickler(pickle_strategy.Unpickler):
55+
class RenameUnpickler(pickle.Unpickler):
6956
def find_class(self, module, name):
7057
renamed_module = module
7158
if module == "__main__" and self.custom_main_path is not None:

cluster_tools/cluster_tools/schedulers/cluster_executor.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
from abc import abstractmethod
88
from concurrent import futures
99
from functools import partial
10-
from typing import Union
10+
from typing import List, Optional, Tuple
11+
12+
from typing_extensions import Literal
1113

1214
from cluster_tools import pickling
1315
from cluster_tools.pickling import file_path_to_absolute_module
@@ -111,7 +113,7 @@ def handle_kill(self, _signum, _frame):
111113
@abstractmethod
112114
def check_for_crashed_job(
113115
self, job_id_with_index
114-
) -> Union["failed", "ignore", "completed"]:
116+
) -> Literal["failed", "ignore", "completed"]:
115117
pass
116118

117119
def _start(self, workerid, job_count=None, job_name=None):
@@ -137,7 +139,13 @@ def _start(self, workerid, job_count=None, job_name=None):
137139
return jobids_futures, job_index_ranges
138140

139141
@abstractmethod
140-
def inner_submit(self, *args, **kwargs):
142+
def inner_submit(
143+
self,
144+
cmdline: str,
145+
job_name: Optional[str] = None,
146+
additional_setup_lines: Optional[List[str]] = None,
147+
job_count: Optional[int] = None,
148+
) -> Tuple[List["futures.Future[str]"], List[Tuple[int, int]]]:
141149
pass
142150

143151
def _cleanup(self, jobid):

cluster_tools/cluster_tools/schedulers/kube.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import re
55
import sys
66
from pathlib import Path
7-
from typing import List, Optional, Union
7+
from typing import List, Optional, Tuple
88
from uuid import uuid4
99

1010
import kubernetes
1111
import kubernetes.client.models as kubernetes_models
12+
from typing_extensions import Literal
1213

1314
from .cluster_executor import ClusterExecutor
1415

@@ -19,9 +20,9 @@ def volume_name_from_path(path: Path) -> str:
1920

2021
def deduplicate_mounts(mounts: List[Path]) -> List[Path]:
2122
output = []
22-
mounts = set(mounts)
23-
for mount in mounts:
24-
if not any(m in mount.parents for m in mounts):
23+
unique_mounts = set(mounts)
24+
for mount in unique_mounts:
25+
if not any(m in mount.parents for m in unique_mounts):
2526
output.append(mount)
2627
return output
2728

@@ -73,7 +74,7 @@ def get_current_job_id() -> Optional[str]:
7374
return os.environ.get("JOB_ID", None)
7475

7576
@classmethod
76-
def get_job_id_string(cls) -> str:
77+
def get_job_id_string(cls) -> Optional[str]:
7778
job_id = cls.get_current_job_id()
7879
job_index = cls.get_job_array_index()
7980
if job_index is None:
@@ -104,21 +105,21 @@ def inner_submit(
104105
self,
105106
cmdline: str,
106107
job_name: Optional[str] = None,
108+
additional_setup_lines: Optional[List[str]] = None,
107109
job_count: Optional[int] = None,
108-
**_,
109-
):
110+
) -> Tuple[List["concurrent.futures.Future[str]"], List[Tuple[int, int]]]:
110111
"""Starts a Kubernetes pod that runs the specified shell command line."""
111112

112113
kubernetes_client = KubernetesClient()
113114
self.ensure_kubernetes_namespace()
114115
job_id = str(uuid4())
115116

116-
job_id_future = concurrent.futures.Future()
117+
job_id_future: "concurrent.futures.Future[str]" = concurrent.futures.Future()
117118
job_id_future.set_result(job_id)
118119
job_id_futures = [job_id_future]
119120

120121
is_array_job = job_count is not None
121-
number_of_subjobs = job_count if is_array_job else 1
122+
number_of_subjobs = job_count if job_count is not None else 1
122123
ranges = [(0, number_of_subjobs)]
123124

124125
requested_resources = {
@@ -232,12 +233,12 @@ def inner_submit(
232233

233234
def check_for_crashed_job(
234235
self, job_id_with_index: str
235-
) -> Union["failed", "ignore", "completed"]:
236+
) -> Literal["failed", "ignore", "completed"]:
236237
kubernetes_client = KubernetesClient()
237238
[job_id, job_index] = (
238239
job_id_with_index.split("_")
239240
if "_" in job_id_with_index
240-
else [job_id_with_index, 0]
241+
else [job_id_with_index, "0"]
241242
)
242243
resp = kubernetes_client.core.list_namespaced_pod(
243244
namespace=self.job_resources["namespace"],

cluster_tools/cluster_tools/schedulers/pbs.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
import os
55
import re
66
from concurrent import futures
7-
from typing import Union
7+
from typing import Dict, List, Optional, Tuple
8+
9+
from typing_extensions import Literal
810

911
from cluster_tools.util import call, chcall, random_string
1012

1113
from .cluster_executor import ClusterExecutor
1214

1315
# qstat vs. checkjob
14-
PBS_STATES = {
16+
PBS_STATES: Dict[str, List[str]] = {
1517
"Failure": [],
1618
"Success": [
1719
"C", # Completed
@@ -71,8 +73,12 @@ def submit_text(self, job):
7173
return int(jobid)
7274

7375
def inner_submit(
74-
self, cmdline, job_name=None, additional_setup_lines=None, job_count=None
75-
):
76+
self,
77+
cmdline: str,
78+
job_name: Optional[str] = None,
79+
additional_setup_lines: Optional[List[str]] = None,
80+
job_count: Optional[int] = None,
81+
) -> Tuple[List["futures.Future[str]"], List[Tuple[int, int]]]:
7682
"""Starts a PBS job that runs the specified shell command line."""
7783
if additional_setup_lines is None:
7884
additional_setup_lines = []
@@ -117,14 +123,14 @@ def inner_submit(
117123
]
118124

119125
job_id = self.submit_text("\n".join(script_lines))
120-
job_id_future = futures.Future()
126+
job_id_future: "futures.Future[str]" = futures.Future()
121127
job_id_future.set_result(job_id)
122128

123129
return [job_id_future], [(0, job_count or 1)]
124130

125131
def check_for_crashed_job(
126132
self, job_id_with_index
127-
) -> Union["failed", "ignore", "completed"]:
133+
) -> Literal["failed", "ignore", "completed"]:
128134
if len(str(job_id_with_index).split("_")) >= 2:
129135
a, b = job_id_with_index.split("_")
130136
job_id_with_index = f"{a}[{b}]"

cluster_tools/cluster_tools/schedulers/slurm.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import sys
88
import threading
99
from functools import lru_cache
10-
from typing import Union
10+
from typing import List, Optional, Tuple
11+
12+
from typing_extensions import Literal
1113

1214
from cluster_tools.util import call, chcall, random_string
1315

@@ -190,8 +192,12 @@ def cleanup_submit_threads(self):
190192
]
191193

192194
def inner_submit(
193-
self, cmdline, job_name=None, additional_setup_lines=None, job_count=None
194-
):
195+
self,
196+
cmdline: str,
197+
job_name: Optional[str] = None,
198+
additional_setup_lines: Optional[List[str]] = None,
199+
job_count: Optional[int] = None,
200+
) -> Tuple[List["concurrent.futures.Future[str]"], List[Tuple[int, int]]]:
195201
"""Starts a Slurm job that runs the specified shell command line."""
196202
if additional_setup_lines is None:
197203
additional_setup_lines = []
@@ -213,7 +219,7 @@ def inner_submit(
213219
batch_size = max(min(max_array_size, max_submit_jobs), 1)
214220

215221
scripts = []
216-
job_id_futures = []
222+
job_id_futures: List["concurrent.futures.Future[str]"] = []
217223
ranges = []
218224
number_of_jobs = job_count if job_count is not None else 1
219225
for job_index_start in range(0, number_of_jobs, batch_size):
@@ -256,7 +262,7 @@ def inner_submit(
256262

257263
def check_for_crashed_job(
258264
self, job_id_with_index
259-
) -> Union["failed", "ignore", "completed"]:
265+
) -> Literal["failed", "ignore", "completed"]:
260266

261267
job_states = []
262268

cluster_tools/cluster_tools/tailf.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
import os
66
import sys
77
import time
8+
from typing import Any, Callable
89

910

1011
class Tail(object):
1112
"""Represents a tail command."""
1213

13-
def __init__(self, tailed_file, callback=sys.stdout.write):
14+
def __init__(
15+
self, tailed_file: str, callback: Callable[[str], Any] = sys.stdout.write
16+
) -> None:
1417
"""Initiate a Tail instance.
1518
Check for file validity, assigns callback function to standard out.
1619
@@ -21,7 +24,7 @@ def __init__(self, tailed_file, callback=sys.stdout.write):
2124
self.callback = callback
2225
self.is_cancelled = False
2326

24-
def follow(self, seconds=1):
27+
def follow(self, seconds: int = 1) -> None:
2528
"""Do a tail follow. If a callback function is registered it is called with every new line.
2629
Else printed to standard out.
2730
@@ -44,14 +47,14 @@ def follow(self, seconds=1):
4447
else:
4548
self.callback(line)
4649

47-
def cancel(self):
50+
def cancel(self) -> None:
4851
self.is_cancelled = True
4952

50-
def register_callback(self, func):
53+
def register_callback(self, func: Callable[[str], Any]) -> None:
5154
"""Overrides default callback function to provided function."""
5255
self.callback = func
5356

54-
def check_file_validity(self, file_):
57+
def check_file_validity(self, file_: str) -> None:
5558
"""Check whether the a given file exists, readable and is a file"""
5659
if not os.access(file_, os.F_OK):
5760
raise TailError("File '%s' does not exist" % (file_))

0 commit comments

Comments
 (0)