Skip to content

Commit de33596

Browse files
committed
issue #147, pool: do not block when trying to terminate tasks
In order to stop cancelled or timing out tasks, the Pool Manager loop needs to acquire the channel lock to prevent any worker from accessing either the lock or the pipe during its termination. The previous logic would try to acquire the lock indefinitely. This led to possible deadlocks in case the Pool Manager was trying to stop cancelled tasks with the results channel filled up while the pool was terminated. This was due to the workers holding the channel lock while being unable to push the data as the Message Manager loop was not reading it anymore. The new Pool Manager loop does not try to acquire the lock indefinitely but rather polls it for availability. This allows the loop to continue assessing its state and correctly terminate upon request. The drawback of this implementation is that timing out or cancelled tasks might be stopped a little later in case the pipe is busy transferring lots of data.
1 parent 760ba95 commit de33596

File tree

2 files changed

+58
-38
lines changed

2 files changed

+58
-38
lines changed

pebble/pool/channel.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,16 @@ def windows_send(obj: Any):
114114

115115
return unix_send if os.name != 'nt' else windows_send
116116

117-
@property
118117
@contextmanager
119-
def lock(self):
120-
with self.mutex:
121-
yield self
118+
def lock(self, block: bool = True, timeout: int = None) -> bool:
119+
"""Lock the channel, yields True if channel is locked."""
120+
acquired = self.mutex.acquire(block=block, timeout=timeout)
121+
122+
try:
123+
yield acquired
124+
finally:
125+
if acquired:
126+
self.mutex.release()
122127

123128
def initialize(self):
124129
"""Close unused connections."""
@@ -151,32 +156,45 @@ def __exit__(self, *_):
151156
self.release()
152157

153158
def _make_acquire_method(self) -> Callable:
154-
def unix_acquire() -> bool:
155-
return (
156-
self.reader_mutex.acquire(timeout=CONSTS.channel_lock_timeout)
157-
and
158-
self.writer_mutex.acquire(timeout=CONSTS.channel_lock_timeout)
159-
)
159+
def unix_acquire(
160+
block: bool = True, timeout: int = CONSTS.channel_lock_timeout
161+
) -> bool:
162+
"""Acquire both locks. Returns True if both locks where acquired.
163+
Otherwise, handle the locks state.
164+
165+
"""
166+
if self.reader_mutex.acquire(block=block, timeout=timeout):
167+
if self.writer_mutex.acquire(block=block, timeout=timeout):
168+
return True
169+
170+
self.reader_mutex.release()
171+
172+
return False
160173

161-
def windows_acquire() -> bool:
162-
return self.reader_mutex.acquire(
163-
timeout=CONSTS.channel_lock_timeout)
174+
def windows_acquire(
175+
block: bool = True, timeout: int = CONSTS.channel_lock_timeout
176+
) -> bool:
177+
"""Acquire the reader lock (on NT OS, writes are atomic)."""
178+
return self.reader_mutex.acquire(block=block, timeout=timeout)
164179

165-
return unix_acquire if os.name != 'nt' else windows_acquire
180+
return windows_acquire if os.name == 'nt' else unix_acquire
166181

167182
def _make_release_method(self) -> Callable:
168183
def unix_release():
184+
"""Release both the locks."""
169185
self.reader_mutex.release()
170186
self.writer_mutex.release()
171187

172188
def windows_release():
189+
"""Release the reader lock (on NT OS, writes are atomic)."""
173190
self.reader_mutex.release()
174191

175-
return unix_release if os.name != 'nt' else windows_release
192+
return windows_release if os.name == 'nt' else unix_release
176193

177194
@property
178195
@contextmanager
179196
def reader(self):
197+
"""Reader lock context manager."""
180198
if self.reader_mutex.acquire(timeout=CONSTS.channel_lock_timeout):
181199
try:
182200
yield self
@@ -188,6 +206,7 @@ def reader(self):
188206
@property
189207
@contextmanager
190208
def writer(self):
209+
"""Writer lock context manager."""
191210
if self.writer_mutex.acquire(timeout=CONSTS.channel_lock_timeout):
192211
try:
193212
yield self

pebble/pool/process.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def start(self):
210210

211211
def stop(self):
212212
self.worker_manager.close_channels()
213-
self.worker_manager.stop_workers()
213+
self.worker_manager.force_stop_workers()
214214

215215
def schedule(self, task: Task):
216216
"""Schedules a new Task in the PoolManager."""
@@ -238,16 +238,16 @@ def update_status(self):
238238
def update_tasks(self):
239239
"""Handles timing out Tasks."""
240240
for task in self.task_manager.timeout_tasks():
241-
self.worker_manager.stop_worker(task.worker_id)
242-
self.task_manager.task_done(
243-
task.id,
244-
Result(ResultStatus.FAILURE,
245-
TimeoutError("Task timeout", task.timeout)))
241+
if self.worker_manager.maybe_stop_worker(task.worker_id):
242+
self.task_manager.task_done(
243+
task.id,
244+
Result(ResultStatus.FAILURE,
245+
TimeoutError("Task timeout", task.timeout)))
246246

247247
for task in self.task_manager.cancelled_tasks():
248-
self.worker_manager.stop_worker(task.worker_id)
249-
self.task_manager.task_done(
250-
task.id, Result(ResultStatus.FAILURE, CancelledError()))
248+
if self.worker_manager.maybe_stop_worker(task.worker_id):
249+
self.task_manager.task_done(
250+
task.id, Result(ResultStatus.FAILURE, CancelledError()))
251251

252252
def update_workers(self):
253253
"""Handles unexpected processes termination."""
@@ -390,9 +390,9 @@ def close_channels(self):
390390
self.pool_channel.close()
391391
self.workers_channel.close()
392392

393-
def stop_workers(self):
393+
def force_stop_workers(self):
394394
for worker_id in tuple(self.workers.keys()):
395-
self.stop_worker(worker_id, force=True)
395+
stop_process(self.workers.pop(worker_id))
396396

397397
def new_worker(self):
398398
try:
@@ -403,17 +403,18 @@ def new_worker(self):
403403
except OSError as error:
404404
raise BrokenProcessPool from error
405405

406-
def stop_worker(self, worker_id: int, force=False):
407-
try:
408-
if force:
409-
stop_process(self.workers.pop(worker_id))
410-
else:
411-
with self.workers_channel.lock:
412-
stop_process(self.workers.pop(worker_id))
413-
except ChannelError as error:
414-
raise BrokenProcessPool from error
415-
except KeyError:
416-
return # worker already expired
406+
def maybe_stop_worker(self, worker_id: int) -> bool:
407+
"""Try to stop the assigned worker.
408+
Returns True if the worker existed and could be stopped.
409+
410+
"""
411+
with self.workers_channel.lock(block=False) as locked:
412+
if locked:
413+
worker = self.workers.pop(worker_id, None)
414+
if worker is not None: # Worker have already ended
415+
stop_process(worker)
416+
417+
return locked
417418

418419

419420
def worker_process(params: Worker, channel: WorkerChannel):
@@ -465,7 +466,7 @@ def fetch_task(channel: WorkerChannel) -> Task:
465466

466467
def task_transaction(channel: WorkerChannel) -> Task:
467468
"""Ensures a task is fetched and acknowledged atomically."""
468-
with channel.lock:
469+
with channel.lock():
469470
if channel.poll(0):
470471
task = channel.recv()
471472
channel.send(Acknowledgement(os.getpid(), task.id))

0 commit comments

Comments
 (0)