Skip to content

Commit 801a5d7

Browse files
authored
Revert "Fix/rabbitmq publish cache" (#719)
Revert "Fix/rabbitmq publish cache (#713)" This reverts commit 0263c5a.
1 parent 0263c5a commit 801a5d7

File tree

2 files changed

+6
-83
lines changed

2 files changed

+6
-83
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -844,9 +844,6 @@ def _submit_web_logs(
844844
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
845845
)
846846
if self.rabbitmq_config is None:
847-
logger.info(
848-
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
849-
)
850847
return
851848

852849
if isinstance(messages, ScheduleLogForWebItem):
@@ -862,11 +859,9 @@ def _submit_web_logs(
862859
message_info = message.debug_info()
863860
logger.debug(f"Submitted Scheduling log for web: {message_info}")
864861

865-
# Always call publish; the publisher now caches when offline and flushes after reconnect
866-
logger.info(
867-
f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}"
868-
)
869-
self.rabbitmq_publish_message(message=message.to_dict())
862+
if self.is_rabbitmq_connected():
863+
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
864+
self.rabbitmq_publish_message(message=message.to_dict())
870865
logger.debug(
871866
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
872867
)

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import time
66

77
from pathlib import Path
8-
from queue import Empty
98

109
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
1110
from memos.context.context import ContextThread
@@ -45,11 +44,6 @@ def __init__(self):
4544
self.rabbitmq_message_cache = AutoDroppingQueue(
4645
maxsize=self.rabbitmq_message_cache_max_size
4746
)
48-
# Pending outgoing messages to avoid loss when connection is not ready
49-
self.rabbitmq_publish_cache_max_size = 50
50-
self.rabbitmq_publish_cache = AutoDroppingQueue(
51-
maxsize=self.rabbitmq_publish_cache_max_size
52-
)
5347
self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure
5448
self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries
5549
self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio
@@ -60,7 +54,6 @@ def __init__(self):
6054
self._rabbitmq_io_loop_thread = None # For IOLoop execution
6155
self._rabbitmq_stop_flag = False # Graceful shutdown flag
6256
self._rabbitmq_lock = threading.Lock() # Ensure thread safety
63-
self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations
6457

6558
def is_rabbitmq_connected(self) -> bool:
6659
"""Check if RabbitMQ connection is alive"""
@@ -77,22 +70,11 @@ def initialize_rabbitmq(
7770
"""
7871
Establish connection to RabbitMQ using pika.
7972
"""
80-
with self._rabbitmq_lock:
81-
if self._rabbitmq_initializing:
82-
logger.info(
83-
"[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call."
84-
)
85-
return
86-
self._rabbitmq_initializing = True
8773
try:
8874
# Skip remote initialization in CI/pytest unless explicitly enabled
8975
enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true"
9076
in_ci = os.getenv("CI", "").lower() == "true"
9177
in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None
92-
logger.info(
93-
f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, "
94-
f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}"
95-
)
9678
if (in_ci or in_pytest) and not enable_env:
9779
logger.info(
9880
"Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable."
@@ -149,9 +131,6 @@ def initialize_rabbitmq(
149131
logger.info("RabbitMQ connection process started")
150132
except Exception:
151133
logger.error("Fail to initialize auth_config", exc_info=True)
152-
finally:
153-
with self._rabbitmq_lock:
154-
self._rabbitmq_initializing = False
155134

156135
def get_rabbitmq_queue_size(self) -> int:
157136
"""Get the current number of messages in the queue.
@@ -218,7 +197,7 @@ def get_rabbitmq_connection_param(self):
218197
# Connection lifecycle callbacks
219198
def on_rabbitmq_connection_open(self, connection):
220199
"""Called when connection is established."""
221-
logger.info("[DIAGNOSTIC] RabbitMQ connection opened")
200+
logger.debug("Connection opened")
222201
connection.channel(on_open_callback=self.on_rabbitmq_channel_open)
223202

224203
def on_rabbitmq_connection_error(self, connection, error):
@@ -236,7 +215,7 @@ def on_rabbitmq_connection_closed(self, connection, reason):
236215
def on_rabbitmq_channel_open(self, channel):
237216
"""Called when channel is ready."""
238217
self.rabbitmq_channel = channel
239-
logger.info("[DIAGNOSTIC] RabbitMQ channel opened")
218+
logger.debug("Channel opened")
240219

241220
# Setup exchange and queue
242221
channel.exchange_declare(
@@ -264,8 +243,6 @@ def on_rabbitmq_queue_declared(self, frame):
264243
def on_rabbitmq_bind_ok(self, frame):
265244
"""Final setup step when bind is complete."""
266245
logger.info("RabbitMQ setup completed")
267-
# Flush any cached publish messages now that connection is ready
268-
self._flush_cached_publish_messages()
269246

270247
def on_rabbitmq_message(self, channel, method, properties, body):
271248
"""Handle incoming messages. Only for test."""
@@ -334,21 +311,8 @@ def rabbitmq_publish_message(self, message: dict):
334311
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")
335312

336313
with self._rabbitmq_lock:
337-
logger.info(
338-
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. "
339-
f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, "
340-
f"routing_key='{routing_key}', label={label}"
341-
)
342314
if not self.is_rabbitmq_connected():
343-
logger.error(
344-
"[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. "
345-
f"connection_exists={bool(self.rabbitmq_connection)}, "
346-
f"channel_exists={bool(self.rabbitmq_channel)}, "
347-
f"config_loaded={self.rabbitmq_config is not None}"
348-
)
349-
self.rabbitmq_publish_cache.put(message)
350-
# Best-effort to connect
351-
self.initialize_rabbitmq(config=self.rabbitmq_config)
315+
logger.error("Cannot publish - no active connection")
352316
return False
353317

354318
logger.info(
@@ -368,8 +332,6 @@ def rabbitmq_publish_message(self, message: dict):
368332
return True
369333
except Exception as e:
370334
logger.error(f"Failed to publish message: {e}")
371-
# Cache message for retry on next connection
372-
self.rabbitmq_publish_cache.put(message)
373335
self.rabbit_reconnect()
374336
return False
375337

@@ -417,37 +379,3 @@ def rabbitmq_close(self):
417379
logger.warning("IOLoop thread did not terminate cleanly")
418380

419381
logger.info("RabbitMQ connection closed")
420-
421-
def _flush_cached_publish_messages(self):
422-
"""Flush cached outgoing messages once connection is available."""
423-
if self.rabbitmq_publish_cache.empty():
424-
return
425-
426-
if not self.is_rabbitmq_connected():
427-
logger.info(
428-
"[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; "
429-
f"pending={self.rabbitmq_publish_cache.qsize()}"
430-
)
431-
return
432-
433-
drained: list[dict] = []
434-
while True:
435-
try:
436-
drained.append(self.rabbitmq_publish_cache.get_nowait())
437-
except Empty:
438-
break
439-
440-
if not drained:
441-
return
442-
443-
logger.info(
444-
f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect."
445-
)
446-
for cached_msg in drained:
447-
success = self.rabbitmq_publish_message(cached_msg)
448-
if not success:
449-
# Message already re-cached inside publish; avoid tight loop
450-
logger.error(
451-
"[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt."
452-
)
453-
break

0 commit comments

Comments
 (0)