Skip to content

Commit b189788

Browse files
committed
Fix shutdown
1 parent 360c8fe commit b189788

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

pulsar/client/manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ def __init__(self, relay_url: str, relay_username: str, relay_password: str, rel
277277
self.callback_lock = threading.Lock()
278278
self.callback_thread = None
279279
self.active = True
280+
self.shutdown_event = threading.Event()
280281

281282
def callback_wrapper(self, callback, message_data):
282283
"""Process status update messages from the relay."""
@@ -315,12 +316,14 @@ def status_consumer(self, callback_wrapper):
315316
if self.active:
316317
log.exception("Exception while polling for status updates from relay, will retry.")
317318
# Brief sleep before retrying to avoid tight loop on persistent errors
318-
time.sleep(5)
319+
# Use wait() instead of sleep() to allow immediate interruption on shutdown
320+
if self.shutdown_event.wait(timeout=5):
321+
break
319322
else:
320323
log.debug("Exception during shutdown, ignoring.")
321324
break
322325

323-
log.debug("Leaving Pulsar client relay status consumer, no additional updates will be processed.")
326+
log.info("Done consuming relay status updates for topic %s", topic)
324327

325328
def ensure_has_status_update_callback(self, callback):
326329
"""Start a thread to poll for status updates if not already running."""
@@ -334,7 +337,10 @@ def ensure_has_status_update_callback(self, callback):
334337
name="pulsar_client_%s_relay_status_consumer" % self.manager_name,
335338
target=run
336339
)
337-
thread.daemon = False # Don't interrupt processing
340+
# Make daemon so Python can exit even if thread is blocked in HTTP request.
341+
# Unlike MessageQueueClientManager which uses AMQP connections that can be
342+
# interrupted cleanly, HTTP long-poll requests block until timeout.
343+
thread.daemon = True
338344
thread.start()
339345
self.callback_thread = thread
340346

@@ -370,6 +376,8 @@ def _make_topic_name(self, base_topic: str, manager_name: str) -> str:
370376
def shutdown(self, ensure_cleanup: bool = False):
371377
"""Shutdown the client manager and cleanup resources."""
372378
self.active = False
379+
# Signal the shutdown event to interrupt any waiting threads
380+
self.shutdown_event.set()
373381
if ensure_cleanup:
374382
if self.callback_thread is not None:
375383
self.callback_thread.join()

0 commit comments

Comments
 (0)