Skip to content

Commit 0263c5a

Browse files
glin93CaralHsi
andauthored
Fix/rabbitmq publish cache (#713)
* Handle RabbitMQ publish when offline and avoid duplicate init * Apply ruff check/format --------- Co-authored-by: [email protected] <> Co-authored-by: CaralHsi <[email protected]>
1 parent 1a2ef2f commit 0263c5a

File tree

2 files changed

+83
-6
lines changed

2 files changed

+83
-6
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,9 @@ def _submit_web_logs(
844844
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
845845
)
846846
if self.rabbitmq_config is None:
847+
logger.info(
848+
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
849+
)
847850
return
848851

849852
if isinstance(messages, ScheduleLogForWebItem):
@@ -859,9 +862,11 @@ def _submit_web_logs(
859862
message_info = message.debug_info()
860863
logger.debug(f"Submitted Scheduling log for web: {message_info}")
861864

862-
if self.is_rabbitmq_connected():
863-
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
864-
self.rabbitmq_publish_message(message=message.to_dict())
865+
# Always call publish; the publisher now caches when offline and flushes after reconnect
866+
logger.info(
867+
f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}"
868+
)
869+
self.rabbitmq_publish_message(message=message.to_dict())
865870
logger.debug(
866871
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
867872
)

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66

77
from pathlib import Path
8+
from queue import Empty
89

910
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
1011
from memos.context.context import ContextThread
@@ -44,6 +45,11 @@ def __init__(self):
4445
self.rabbitmq_message_cache = AutoDroppingQueue(
4546
maxsize=self.rabbitmq_message_cache_max_size
4647
)
48+
# Pending outgoing messages to avoid loss when connection is not ready
49+
self.rabbitmq_publish_cache_max_size = 50
50+
self.rabbitmq_publish_cache = AutoDroppingQueue(
51+
maxsize=self.rabbitmq_publish_cache_max_size
52+
)
4753
self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure
4854
self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries
4955
self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio
@@ -54,6 +60,7 @@ def __init__(self):
5460
self._rabbitmq_io_loop_thread = None # For IOLoop execution
5561
self._rabbitmq_stop_flag = False # Graceful shutdown flag
5662
self._rabbitmq_lock = threading.Lock() # Ensure thread safety
63+
self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations
5764

5865
def is_rabbitmq_connected(self) -> bool:
5966
"""Check if RabbitMQ connection is alive"""
@@ -70,11 +77,22 @@ def initialize_rabbitmq(
7077
"""
7178
Establish connection to RabbitMQ using pika.
7279
"""
80+
with self._rabbitmq_lock:
81+
if self._rabbitmq_initializing:
82+
logger.info(
83+
"[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call."
84+
)
85+
return
86+
self._rabbitmq_initializing = True
7387
try:
7488
# Skip remote initialization in CI/pytest unless explicitly enabled
7589
enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true"
7690
in_ci = os.getenv("CI", "").lower() == "true"
7791
in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None
92+
logger.info(
93+
f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, "
94+
f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}"
95+
)
7896
if (in_ci or in_pytest) and not enable_env:
7997
logger.info(
8098
"Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable."
@@ -131,6 +149,9 @@ def initialize_rabbitmq(
131149
logger.info("RabbitMQ connection process started")
132150
except Exception:
133151
logger.error("Fail to initialize auth_config", exc_info=True)
152+
finally:
153+
with self._rabbitmq_lock:
154+
self._rabbitmq_initializing = False
134155

135156
def get_rabbitmq_queue_size(self) -> int:
136157
"""Get the current number of messages in the queue.
@@ -197,7 +218,7 @@ def get_rabbitmq_connection_param(self):
197218
# Connection lifecycle callbacks
198219
def on_rabbitmq_connection_open(self, connection):
199220
"""Called when connection is established."""
200-
logger.debug("Connection opened")
221+
logger.info("[DIAGNOSTIC] RabbitMQ connection opened")
201222
connection.channel(on_open_callback=self.on_rabbitmq_channel_open)
202223

203224
def on_rabbitmq_connection_error(self, connection, error):
@@ -215,7 +236,7 @@ def on_rabbitmq_connection_closed(self, connection, reason):
215236
def on_rabbitmq_channel_open(self, channel):
216237
"""Called when channel is ready."""
217238
self.rabbitmq_channel = channel
218-
logger.debug("Channel opened")
239+
logger.info("[DIAGNOSTIC] RabbitMQ channel opened")
219240

220241
# Setup exchange and queue
221242
channel.exchange_declare(
@@ -243,6 +264,8 @@ def on_rabbitmq_queue_declared(self, frame):
243264
def on_rabbitmq_bind_ok(self, frame):
244265
"""Final setup step when bind is complete."""
245266
logger.info("RabbitMQ setup completed")
267+
# Flush any cached publish messages now that connection is ready
268+
self._flush_cached_publish_messages()
246269

247270
def on_rabbitmq_message(self, channel, method, properties, body):
248271
"""Handle incoming messages. Only for test."""
@@ -311,8 +334,21 @@ def rabbitmq_publish_message(self, message: dict):
311334
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")
312335

313336
with self._rabbitmq_lock:
337+
logger.info(
338+
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. "
339+
f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, "
340+
f"routing_key='{routing_key}', label={label}"
341+
)
314342
if not self.is_rabbitmq_connected():
315-
logger.error("Cannot publish - no active connection")
343+
logger.error(
344+
"[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. "
345+
f"connection_exists={bool(self.rabbitmq_connection)}, "
346+
f"channel_exists={bool(self.rabbitmq_channel)}, "
347+
f"config_loaded={self.rabbitmq_config is not None}"
348+
)
349+
self.rabbitmq_publish_cache.put(message)
350+
# Best-effort to connect
351+
self.initialize_rabbitmq(config=self.rabbitmq_config)
316352
return False
317353

318354
logger.info(
@@ -332,6 +368,8 @@ def rabbitmq_publish_message(self, message: dict):
332368
return True
333369
except Exception as e:
334370
logger.error(f"Failed to publish message: {e}")
371+
# Cache message for retry on next connection
372+
self.rabbitmq_publish_cache.put(message)
335373
self.rabbit_reconnect()
336374
return False
337375

@@ -379,3 +417,37 @@ def rabbitmq_close(self):
379417
logger.warning("IOLoop thread did not terminate cleanly")
380418

381419
logger.info("RabbitMQ connection closed")
420+
421+
def _flush_cached_publish_messages(self):
422+
"""Flush cached outgoing messages once connection is available."""
423+
if self.rabbitmq_publish_cache.empty():
424+
return
425+
426+
if not self.is_rabbitmq_connected():
427+
logger.info(
428+
"[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; "
429+
f"pending={self.rabbitmq_publish_cache.qsize()}"
430+
)
431+
return
432+
433+
drained: list[dict] = []
434+
while True:
435+
try:
436+
drained.append(self.rabbitmq_publish_cache.get_nowait())
437+
except Empty:
438+
break
439+
440+
if not drained:
441+
return
442+
443+
logger.info(
444+
f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect."
445+
)
446+
for cached_msg in drained:
447+
success = self.rabbitmq_publish_message(cached_msg)
448+
if not success:
449+
# Message already re-cached inside publish; avoid tight loop
450+
logger.error(
451+
"[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt."
452+
)
453+
break

0 commit comments

Comments
 (0)