Skip to content

Commit ba66bbb

Browse files
authored
Merge pull request #383 from ExaWorks/free_lock_early
Don't hold the reaper lock while checking processes, since it can
2 parents 275a84f + bc11359 commit ba66bbb

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

src/psij/executors/local.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,20 +136,24 @@ def register(self, entry: _ProcessEntry) -> None:
136136

137137
def run(self) -> None:
138138
logger.debug('Started {}'.format(self))
139+
done: List[_ProcessEntry] = []
139140
while True:
140141
with self._lock:
141-
try:
142-
self._check_processes()
143-
except Exception as ex:
144-
logger.error('Error polling for process status', ex)
142+
for entry in done:
143+
del self._jobs[entry.job]
144+
jobs = dict(self._jobs)
145+
try:
146+
done = self._check_processes(jobs)
147+
except Exception as ex:
148+
logger.error('Error polling for process status', ex)
145149
with self._cvar:
146150
self._cvar.wait(_REAPER_SLEEP_TIME)
147151

148152
def _handle_sigchld(self) -> None:
149153
with self._cvar:
150154
try:
151155
self._cvar.notify_all()
152-
except RuntimeError as ex:
156+
except RuntimeError:
153157
# In what looks like rare cases, notify_all(), seemingly when combined with
154158
# signal handling, raises `RuntimeError: release unlocked lock`.
155159
# There appears to be an unresolved Python bug about this:
@@ -160,11 +164,12 @@ def _handle_sigchld(self) -> None:
160164
# small delay in processing a completed job. However, since this exception seems
161165
# to be a logical impossibility when looking at the code in threading.Condition,
162166
# there is really no telling what else could go wrong.
163-
logger.warning('Exception in Condition.notify_all()', ex)
167+
logger.debug('Exception in Condition.notify_all()')
164168

165-
def _check_processes(self) -> None:
169+
def _check_processes(self, jobs: Dict[Job, _ProcessEntry]) -> List[_ProcessEntry]:
166170
done: List[_ProcessEntry] = []
167-
for entry in self._jobs.values():
171+
172+
for entry in jobs.values():
168173
if entry.kill_flag:
169174
entry.kill()
170175

@@ -174,10 +179,12 @@ def _check_processes(self) -> None:
174179
entry.done_time = time.time()
175180
entry.out = out
176181
done.append(entry)
182+
177183
for entry in done:
178-
del self._jobs[entry.job]
179184
entry.executor._process_done(entry)
180185

186+
return done
187+
181188
def cancel(self, job: Job) -> None:
182189
with self._lock:
183190
p = self._jobs[job]

0 commit comments

Comments
 (0)