Skip to content

Commit 053f427

Browse files
committed
Fix shutdown
1 parent 360c8fe commit 053f427

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

pulsar/client/manager.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import functools
99
import threading
10-
import time
1110
from logging import getLogger
1211
from os import getenv
1312
from queue import Queue
@@ -277,6 +276,7 @@ def __init__(self, relay_url: str, relay_username: str, relay_password: str, rel
277276
self.callback_lock = threading.Lock()
278277
self.callback_thread = None
279278
self.active = True
279+
self.shutdown_event = threading.Event()
280280

281281
def callback_wrapper(self, callback, message_data):
282282
"""Process status update messages from the relay."""
@@ -315,12 +315,14 @@ def status_consumer(self, callback_wrapper):
315315
if self.active:
316316
log.exception("Exception while polling for status updates from relay, will retry.")
317317
# Brief sleep before retrying to avoid tight loop on persistent errors
318-
time.sleep(5)
318+
# Use wait() instead of sleep() to allow immediate interruption on shutdown
319+
if self.shutdown_event.wait(timeout=5):
320+
break
319321
else:
320322
log.debug("Exception during shutdown, ignoring.")
321323
break
322324

323-
log.debug("Leaving Pulsar client relay status consumer, no additional updates will be processed.")
325+
log.info("Done consuming relay status updates for topic %s", topic)
324326

325327
def ensure_has_status_update_callback(self, callback):
326328
"""Start a thread to poll for status updates if not already running."""
@@ -334,7 +336,10 @@ def ensure_has_status_update_callback(self, callback):
334336
name="pulsar_client_%s_relay_status_consumer" % self.manager_name,
335337
target=run
336338
)
337-
thread.daemon = False # Don't interrupt processing
339+
# Make daemon so Python can exit even if thread is blocked in HTTP request.
340+
# Unlike MessageQueueClientManager which uses AMQP connections that can be
341+
# interrupted cleanly, HTTP long-poll requests block until timeout.
342+
thread.daemon = True
338343
thread.start()
339344
self.callback_thread = thread
340345

@@ -370,6 +375,8 @@ def _make_topic_name(self, base_topic: str, manager_name: str) -> str:
370375
def shutdown(self, ensure_cleanup: bool = False):
371376
"""Shutdown the client manager and cleanup resources."""
372377
self.active = False
378+
# Signal the shutdown event to interrupt any waiting threads
379+
self.shutdown_event.set()
373380
if ensure_cleanup:
374381
if self.callback_thread is not None:
375382
self.callback_thread.join()

0 commit comments

Comments
 (0)