Skip to content

Commit ed31a3d

Browse files
glin93CaralHsi
andauthored
Fix/rabbitmq publish cache (#720)
* Handle RabbitMQ publish when offline and avoid duplicate init * Apply ruff check/format * Fix RabbitMQ publish cache deadlock --------- Co-authored-by: [email protected] <> Co-authored-by: CaralHsi <[email protected]>
1 parent 801a5d7 commit ed31a3d

File tree

2 files changed

+85
-7
lines changed

2 files changed

+85
-7
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,9 @@ def _submit_web_logs(
844844
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
845845
)
846846
if self.rabbitmq_config is None:
847+
logger.info(
848+
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
849+
)
847850
return
848851

849852
if isinstance(messages, ScheduleLogForWebItem):
@@ -859,9 +862,11 @@ def _submit_web_logs(
859862
message_info = message.debug_info()
860863
logger.debug(f"Submitted Scheduling log for web: {message_info}")
861864

862-
if self.is_rabbitmq_connected():
863-
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
864-
self.rabbitmq_publish_message(message=message.to_dict())
865+
# Always call publish; the publisher now caches when offline and flushes after reconnect
866+
logger.info(
867+
f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}"
868+
)
869+
self.rabbitmq_publish_message(message=message.to_dict())
865870
logger.debug(
866871
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
867872
)

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66

77
from pathlib import Path
8+
from queue import Empty
89

910
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
1011
from memos.context.context import ContextThread
@@ -44,6 +45,11 @@ def __init__(self):
4445
self.rabbitmq_message_cache = AutoDroppingQueue(
4546
maxsize=self.rabbitmq_message_cache_max_size
4647
)
48+
# Pending outgoing messages to avoid loss when connection is not ready
49+
self.rabbitmq_publish_cache_max_size = 50
50+
self.rabbitmq_publish_cache = AutoDroppingQueue(
51+
maxsize=self.rabbitmq_publish_cache_max_size
52+
)
4753
self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure
4854
self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries
4955
self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio
@@ -53,7 +59,9 @@ def __init__(self):
5359
# Thread management
5460
self._rabbitmq_io_loop_thread = None # For IOLoop execution
5561
self._rabbitmq_stop_flag = False # Graceful shutdown flag
56-
self._rabbitmq_lock = threading.Lock() # Ensure thread safety
62+
# Use RLock because publishing may trigger initialization, which also grabs the lock.
63+
self._rabbitmq_lock = threading.RLock()
64+
self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations
5765

5866
def is_rabbitmq_connected(self) -> bool:
5967
"""Check if RabbitMQ connection is alive"""
@@ -70,11 +78,22 @@ def initialize_rabbitmq(
7078
"""
7179
Establish connection to RabbitMQ using pika.
7280
"""
81+
with self._rabbitmq_lock:
82+
if self._rabbitmq_initializing:
83+
logger.info(
84+
"[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call."
85+
)
86+
return
87+
self._rabbitmq_initializing = True
7388
try:
7489
# Skip remote initialization in CI/pytest unless explicitly enabled
7590
enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true"
7691
in_ci = os.getenv("CI", "").lower() == "true"
7792
in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None
93+
logger.info(
94+
f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, "
95+
f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}"
96+
)
7897
if (in_ci or in_pytest) and not enable_env:
7998
logger.info(
8099
"Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable."
@@ -131,6 +150,9 @@ def initialize_rabbitmq(
131150
logger.info("RabbitMQ connection process started")
132151
except Exception:
133152
logger.error("Fail to initialize auth_config", exc_info=True)
153+
finally:
154+
with self._rabbitmq_lock:
155+
self._rabbitmq_initializing = False
134156

135157
def get_rabbitmq_queue_size(self) -> int:
136158
"""Get the current number of messages in the queue.
@@ -197,7 +219,7 @@ def get_rabbitmq_connection_param(self):
197219
# Connection lifecycle callbacks
198220
def on_rabbitmq_connection_open(self, connection):
199221
"""Called when connection is established."""
200-
logger.debug("Connection opened")
222+
logger.info("[DIAGNOSTIC] RabbitMQ connection opened")
201223
connection.channel(on_open_callback=self.on_rabbitmq_channel_open)
202224

203225
def on_rabbitmq_connection_error(self, connection, error):
@@ -215,7 +237,7 @@ def on_rabbitmq_connection_closed(self, connection, reason):
215237
def on_rabbitmq_channel_open(self, channel):
216238
"""Called when channel is ready."""
217239
self.rabbitmq_channel = channel
218-
logger.debug("Channel opened")
240+
logger.info("[DIAGNOSTIC] RabbitMQ channel opened")
219241

220242
# Setup exchange and queue
221243
channel.exchange_declare(
@@ -243,6 +265,8 @@ def on_rabbitmq_queue_declared(self, frame):
243265
def on_rabbitmq_bind_ok(self, frame):
244266
"""Final setup step when bind is complete."""
245267
logger.info("RabbitMQ setup completed")
268+
# Flush any cached publish messages now that connection is ready
269+
self._flush_cached_publish_messages()
246270

247271
def on_rabbitmq_message(self, channel, method, properties, body):
248272
"""Handle incoming messages. Only for test."""
@@ -311,8 +335,21 @@ def rabbitmq_publish_message(self, message: dict):
311335
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")
312336

313337
with self._rabbitmq_lock:
338+
logger.info(
339+
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. "
340+
f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, "
341+
f"routing_key='{routing_key}', label={label}"
342+
)
314343
if not self.is_rabbitmq_connected():
315-
logger.error("Cannot publish - no active connection")
344+
logger.error(
345+
"[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. "
346+
f"connection_exists={bool(self.rabbitmq_connection)}, "
347+
f"channel_exists={bool(self.rabbitmq_channel)}, "
348+
f"config_loaded={self.rabbitmq_config is not None}"
349+
)
350+
self.rabbitmq_publish_cache.put(message)
351+
# Best-effort to connect
352+
self.initialize_rabbitmq(config=self.rabbitmq_config)
316353
return False
317354

318355
logger.info(
@@ -332,6 +369,8 @@ def rabbitmq_publish_message(self, message: dict):
332369
return True
333370
except Exception as e:
334371
logger.error(f"Failed to publish message: {e}")
372+
# Cache message for retry on next connection
373+
self.rabbitmq_publish_cache.put(message)
335374
self.rabbit_reconnect()
336375
return False
337376

@@ -379,3 +418,37 @@ def rabbitmq_close(self):
379418
logger.warning("IOLoop thread did not terminate cleanly")
380419

381420
logger.info("RabbitMQ connection closed")
421+
422+
def _flush_cached_publish_messages(self):
423+
"""Flush cached outgoing messages once connection is available."""
424+
if self.rabbitmq_publish_cache.empty():
425+
return
426+
427+
if not self.is_rabbitmq_connected():
428+
logger.info(
429+
"[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; "
430+
f"pending={self.rabbitmq_publish_cache.qsize()}"
431+
)
432+
return
433+
434+
drained: list[dict] = []
435+
while True:
436+
try:
437+
drained.append(self.rabbitmq_publish_cache.get_nowait())
438+
except Empty:
439+
break
440+
441+
if not drained:
442+
return
443+
444+
logger.info(
445+
f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect."
446+
)
447+
for cached_msg in drained:
448+
success = self.rabbitmq_publish_message(cached_msg)
449+
if not success:
450+
# Message already re-cached inside publish; avoid tight loop
451+
logger.error(
452+
"[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt."
453+
)
454+
break

0 commit comments

Comments
 (0)