Skip to content

Commit 1cf6f50

Browse files
authored
Also store success boolean to pickle output when using multiprocessing (#686)
* also store success boolean to pickle output when using multiprocessing instead of slurm * update changelog * merge print and logging instructions; convert print to logging in other places, too * add logging import * fix return value propagation in multiprocessing context with serialization of result * format
1 parent a081c56 commit 1cf6f50

File tree

4 files changed

+17
-10
lines changed

4 files changed

+17
-10
lines changed

cluster_tools/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
1010
[Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.9.17...HEAD)
1111

1212
### Breaking Changes
13+
- The cluster-tools serialize the output of a job in the format `(wasSuccessful, result_value)` to a pickle file if `output_pickle_path` is provided and multiprocessing is used. This is consistent with how it is already done when using a cluster executor (e.g., slurm). [#686](https://github.com/scalableminds/webknossos-libs/pull/686)
1314

1415
### Added
1516

cluster_tools/cluster_tools/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import multiprocessing
23
import os
34
import tempfile
@@ -161,12 +162,19 @@ def _execute_and_persist_function(output_pickle_path, *args, **kwargs):
161162
func = args[0]
162163
args = args[1:]
163164

164-
result = func(*args, **kwargs)
165+
try:
166+
result = True, func(*args, **kwargs)
167+
except Exception as exc:
168+
result = False, exc
169+
logging.warning(f"Job computation failed with:\n{exc.__repr__()}")
165170

166171
with open(output_pickle_path, "wb") as file:
167172
pickling.dump(result, file)
168173

169-
return result
174+
if result[0]:
175+
return result[1]
176+
else:
177+
raise result[1]
170178

171179
def map_unordered(self, func, args):
172180

cluster_tools/cluster_tools/remote.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ def worker(executor, workerid, job_array_index, job_array_index_offset, cfut_dir
4747

4848
try:
4949
input_file_name = executor.format_infile_name(cfut_dir, workerid_with_idx)
50-
print("trying to read: ", input_file_name)
51-
print("working dir: ", os.getcwd())
50+
logging.debug(f"Trying to read: {input_file_name} (working dir: {os.getcwd()}")
5251

5352
custom_main_path = get_custom_main_path(workerid, executor)
5453
with open(input_file_name, "rb") as f:
@@ -78,10 +77,9 @@ def worker(executor, workerid, job_array_index, job_array_index_offset, cfut_dir
7877
out = pickling.dumps(result)
7978

8079
except Exception:
81-
print(traceback.format_exc())
8280

8381
result = False, format_remote_exc()
84-
logging.warning("Job computation failed.")
82+
logging.warning(f"Job computation failed with:\n\n{traceback.format_exc()}")
8583
out = pickling.dumps(result)
8684

8785
# The .preliminary postfix is added since the output can

cluster_tools/cluster_tools/schedulers/cluster_executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def executor_key(cls):
103103
def handle_kill(self, _signum, _frame):
104104
self.wait_thread.stop()
105105
job_ids = ",".join(str(id) for id in self.jobs.keys())
106-
print(
106+
logging.debug(
107107
"A termination signal was registered. The following jobs are still running on the cluster:\n{}".format(
108108
job_ids
109109
)
@@ -204,7 +204,7 @@ def _completion(self, jobid, failed_early):
204204
if not self.jobs:
205205
self.jobs_empty_cond.notify_all()
206206
if self.debug:
207-
print("job completed: {}".format(jobid), file=sys.stderr)
207+
logging.debug("Job completed: {}".format(jobid), file=sys.stderr)
208208

209209
preliminary_outfile_name = with_preliminary_postfix(outfile_name)
210210
if failed_early:
@@ -298,7 +298,7 @@ def submit(self, fun, *args, **kwargs):
298298
jobid = jobids_futures[0].result()
299299

300300
if self.debug:
301-
print(f"job submitted: {jobid}", file=sys.stderr)
301+
logging.debug(f"Job submitted: {jobid}", file=sys.stderr)
302302

303303
# Thread will wait for it to finish.
304304
self.wait_thread.waitFor(preliminary_output_pickle_path, jobid)
@@ -420,7 +420,7 @@ def register_jobs(
420420
jobid = jobid_future.result()
421421
if self.debug:
422422

423-
print(
423+
logging.debug(
424424
"Submitted array job {} with JobId {} and {} subjobs.".format(
425425
batch_description, jobid, len(futs_with_output_paths)
426426
),

0 commit comments

Comments
 (0)