Skip to content

Commit 34b4d0e

Browse files
committed
added backup exitcode value for Threads.
1 parent 7a12265 commit 34b4d0e

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

ezpq/Job.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(self, function, args=None, kwargs=None, name=None, priority=100, la
4747
self._ended = None
4848
self._processed = None
4949
self._output = None
50+
self._exitcode = None
5051
self._exception = None
5152
self._callback = None
5253

@@ -75,6 +76,8 @@ def get_exitcode(self):
7576
'''Returns the exit code of the inner job. Only works for processes, not threads.'''
7677
if self._inner_job is not None and hasattr(self._inner_job, 'exitcode'):
7778
return self._inner_job.exitcode
79+
else:
80+
return self._exitcode
7881
return None
7982

8083
def _terminate(self):

ezpq/Queue.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,28 +43,29 @@ def stop(self):
4343

4444

4545
def __init__(self,
46-
job_runner = mp.Process,
4746
n_workers = mp.cpu_count(),
4847
max_size = 0,
48+
job_runner = mp.Process,
4949
auto_remove = False,
5050
auto_start = True,
5151
auto_stop = False,
5252
callback = None,
5353
log_file = None,
5454
poll = 0.1,
55+
show_progress = False,
5556
qid = None):
5657
"""Implements a parallel queueing system.
5758
5859
Args:
59-
job_runner: the class to use to invoke new jobs.
60-
- Accepts: multiprocessing.Process, threading.Thread
61-
- Default: multiprocessing.Process
6260
n_workers: the max number of concurrent jobs.
6361
- Accepts: int
6462
- Default: cpu_count()
6563
max_size: when > 0, will throw an exception the number of enqueued jobs exceeds this value. Otherwise, no limit.
6664
- Accepts: int
6765
- Default: 0 (unlimited)
66+
job_runner: the class to use to invoke new jobs.
67+
- Accepts: multiprocessing.Process, threading.Thread
68+
- Default: multiprocessing.Process
6869
auto_remove: controls whether jobs are discarded of after completion.
6970
- Accepts: bool
7071
- Default: False
@@ -268,10 +269,12 @@ def _pulse(self):
268269
job._ended = job_data['_ended']
269270
job._output = job_data['_output']
270271
job._exception = job_data['_exception']
272+
job._exitcode = job_data['_exitcode']
271273
except KeyError as ex:
272274
job._ended = time.time()
273275
job._output = None
274276
job._exception = Exception('No data for job; it may have exited unexpectedly.')
277+
job._exitcode = 1
275278

276279
if self._callback is not None:
277280
try:
@@ -471,13 +474,15 @@ def _job_wrap(_job, _output, *args, **kwargs):
471474
'''Used internally to wrap a job, capture output and any exception.'''
472475
out = None
473476
err = None
477+
code = 0
474478

475479
try:
476480
out = _job.function(*args, **kwargs)
477481
except Exception as ex:
478482
err = ex
483+
code = -1
479484

480-
_output.update({ _job._id: {'_ended':time.time(), '_output':out, '_exception': err} })
485+
_output.update({ _job._id: {'_ended':time.time(), '_output':out, '_exception': err, '_exitcode': code} })
481486

482487
if err is not None and not _job._suppress_errors:
483488
raise err

0 commit comments

Comments
 (0)