Skip to content

Commit 2639279

Browse files
authored
Fix cloud playground env detection (#707)
* Fix cloud env detection for RabbitMQ * Refactor: Simplify cloud env check and apply formatting --------- Co-authored-by: [email protected] <>
1 parent 2f8d627 commit 2639279

File tree

4 files changed

+53
-16
lines changed

4 files changed

+53
-16
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import concurrent.futures
22
import contextlib
33
import json
4-
import os
54
import traceback
65

76
from memos.configs.mem_scheduler import GeneralSchedulerConfig
@@ -30,7 +29,10 @@
3029
is_all_english,
3130
transform_name_to_key,
3231
)
33-
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
32+
from memos.mem_scheduler.utils.misc_utils import (
33+
group_messages_by_user_and_mem_cube,
34+
is_cloud_env,
35+
)
3436
from memos.memories.textual.item import TextualMemoryItem
3537
from memos.memories.textual.preference import PreferenceTextMemory
3638
from memos.memories.textual.tree import TreeTextMemory
@@ -194,9 +196,9 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
194196
f"prepared_add_items: {prepared_add_items};\n prepared_update_items_with_original: {prepared_update_items_with_original}"
195197
)
196198
# Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default
197-
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
199+
cloud_env = is_cloud_env()
198200

199-
if is_cloud_env:
201+
if cloud_env:
200202
self.send_add_log_messages_to_cloud_env(
201203
msg, prepared_add_items, prepared_update_items_with_original
202204
)
@@ -615,8 +617,8 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
615617
f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}"
616618
)
617619

618-
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
619-
if is_cloud_env:
620+
cloud_env = is_cloud_env()
621+
if cloud_env:
620622
record = feedback_result.get("record") if isinstance(feedback_result, dict) else {}
621623
add_records = record.get("add") if isinstance(record, dict) else []
622624
update_records = record.get("update") if isinstance(record, dict) else []
@@ -733,7 +735,7 @@ def _extract_fields(mem_item):
733735
else:
734736
logger.info(
735737
"Skipping web log for feedback. Not in a cloud environment (is_cloud_env=%s)",
736-
is_cloud_env,
738+
cloud_env,
737739
)
738740

739741
except Exception as e:
@@ -893,8 +895,8 @@ def _process_memories_with_reader(
893895

894896
# LOGGING BLOCK START
895897
# This block is replicated from _add_message_consumer to ensure consistent logging
896-
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
897-
if is_cloud_env:
898+
cloud_env = is_cloud_env()
899+
if cloud_env:
898900
# New: Knowledge Base Logging (Cloud Service)
899901
kb_log_content = []
900902
for item in flattened_memories:
@@ -1013,8 +1015,8 @@ def _process_memories_with_reader(
10131015
f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True
10141016
)
10151017
with contextlib.suppress(Exception):
1016-
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
1017-
if is_cloud_env:
1018+
cloud_env = is_cloud_env()
1019+
if cloud_env:
10181020
if not kb_log_content:
10191021
trigger_source = (
10201022
info.get("trigger_source", "Messages") if info else "Messages"

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import concurrent
2-
import os
32
import threading
43
import time
54

@@ -25,7 +24,7 @@
2524
from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator
2625
from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue
2726
from memos.mem_scheduler.task_schedule_modules.task_queue import ScheduleTaskQueue
28-
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
27+
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube, is_cloud_env
2928
from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso
3029
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
3130

@@ -351,8 +350,8 @@ def _maybe_emit_task_completion(
351350
mem_cube_id = first.mem_cube_id
352351

353352
try:
354-
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
355-
if not is_cloud_env:
353+
cloud_env = is_cloud_env()
354+
if not cloud_env:
356355
return
357356

358357
for task_id in task_ids:

src/memos/mem_scheduler/utils/misc_utils.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import os
23
import re
34
import traceback
45

@@ -17,6 +18,40 @@
1718
logger = get_logger(__name__)
1819

1920

21+
def _normalize_env_value(value: str | None) -> str:
22+
"""Normalize environment variable values for comparison."""
23+
return value.strip().lower() if isinstance(value, str) else ""
24+
25+
26+
def is_playground_env() -> bool:
27+
"""Return True when ENV_NAME indicates a Playground environment."""
28+
env_name = _normalize_env_value(os.getenv("ENV_NAME"))
29+
return env_name.startswith("playground")
30+
31+
32+
def is_cloud_env() -> bool:
33+
"""
34+
Determine whether the scheduler should treat the runtime as a cloud environment.
35+
36+
Rules:
37+
- Any Playground ENV_NAME is explicitly NOT cloud.
38+
- MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME must be set to enable cloud behavior.
39+
- The default memos-fanout/fanout combination is treated as non-cloud.
40+
"""
41+
if is_playground_env():
42+
return False
43+
44+
exchange_name = _normalize_env_value(os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME"))
45+
exchange_type = _normalize_env_value(os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_TYPE"))
46+
47+
if not exchange_name:
48+
return False
49+
50+
return not (
51+
exchange_name == "memos-fanout" and (not exchange_type or exchange_type == "fanout")
52+
)
53+
54+
2055
def extract_json_obj(text: str):
2156
"""
2257
Safely extracts JSON from LLM response text with robust error handling.

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
1414
from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue
1515
from memos.mem_scheduler.schemas.general_schemas import DIRECT_EXCHANGE_TYPE, FANOUT_EXCHANGE_TYPE
16+
from memos.mem_scheduler.utils.misc_utils import is_cloud_env
1617

1718

1819
logger = get_logger(__name__)
@@ -291,7 +292,7 @@ def rabbitmq_publish_message(self, message: dict):
291292

292293
# Cloud environment override: applies to specific message types if MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set
293294
env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
294-
if env_exchange_name and label in ["taskStatus", "knowledgeBaseUpdate"]:
295+
if is_cloud_env() and env_exchange_name and label in ["taskStatus", "knowledgeBaseUpdate"]:
295296
exchange_name = env_exchange_name
296297
routing_key = "" # Routing key is always empty in cloud environment for these types
297298

0 commit comments

Comments
 (0)