Skip to content

Commit 20b2001

Browse files
committed
Don't hold the reaper lock while checking processes, since it can
stall new jobs from being submitted. Instead, make a copy of the jobs dict and operate on that without holding the lock.
1 parent 275a84f commit 20b2001

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

src/psij/executors/local.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,12 @@ def run(self) -> None:
138138
logger.debug('Started {}'.format(self))
139139
while True:
140140
with self._lock:
141-
try:
142-
self._check_processes()
143-
except Exception as ex:
144-
logger.error('Error polling for process status', ex)
141+
jobs = dict(self._jobs)
142+
143+
try:
144+
self._check_processes(jobs)
145+
except Exception as ex:
146+
logger.error('Error polling for process status', ex)
145147
with self._cvar:
146148
self._cvar.wait(_REAPER_SLEEP_TIME)
147149

@@ -162,9 +164,10 @@ def _handle_sigchld(self) -> None:
162164
# there is really no telling what else could go wrong.
163165
logger.warning('Exception in Condition.notify_all()', ex)
164166

165-
def _check_processes(self) -> None:
167+
def _check_processes(self, jobs: Dict[Job, _ProcessEntry]) -> None:
166168
done: List[_ProcessEntry] = []
167-
for entry in self._jobs.values():
169+
170+
for entry in jobs.values():
168171
if entry.kill_flag:
169172
entry.kill()
170173

@@ -174,9 +177,14 @@ def _check_processes(self) -> None:
174177
entry.done_time = time.time()
175178
entry.out = out
176179
done.append(entry)
177-
for entry in done:
178-
del self._jobs[entry.job]
179-
entry.executor._process_done(entry)
180+
181+
if len(done) > 0:
182+
with self._lock:
183+
for entry in done:
184+
del self._jobs[entry.job]
185+
186+
for entry in done:
187+
entry.executor._process_done(entry)
180188

181189
def cancel(self, job: Job) -> None:
182190
with self._lock:

0 commit comments

Comments
 (0)