1717from memos .mem_scheduler .modules .retriever import SchedulerRetriever
1818from memos .mem_scheduler .modules .schemas import (
1919 ACTIVATION_MEMORY_TYPE ,
20- ACTIVATION_MEMORY_VLLM_BACKEND ,
2120 ADD_LABEL ,
2221 DEFAULT_ACT_MEM_DUMP_PATH ,
2322 DEFAULT_CONSUME_INTERVAL_SECONDS ,
3332 TreeTextMemory_SEARCH_METHOD ,
3433)
3534from memos .mem_scheduler .utils import normalize_name
36- from memos .memories .activation .kv import KVCacheItem , KVCacheMemory
35+ from memos .memories .activation .kv import KVCacheMemory
36+ from memos .memories .activation .vllmkv import VLLMKVCacheItem , VLLMKVCacheMemory
3737from memos .memories .textual .tree import TextualMemoryItem , TreeTextMemory
3838from memos .templates .mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE
3939
@@ -88,7 +88,6 @@ def __init__(self, config: BaseSchedulerConfig):
8888 self .auth_config_path : str | Path | None = self .config .get ("auth_config_path" , None )
8989 self .auth_config = None
9090 self .rabbitmq_config = None
91- self .act_mem_backend = ACTIVATION_MEMORY_VLLM_BACKEND
9291
9392 def initialize_modules (self , chat_llm : BaseLLM , process_llm : BaseLLM | None = None ):
9493 if process_llm is None :
@@ -186,8 +185,13 @@ def update_activation_memory(
186185 logger .error ("Not Implemented." )
187186
188187 try :
189- assert isinstance (mem_cube .act_mem , KVCacheMemory )
190- act_mem : KVCacheMemory = mem_cube .act_mem
188+ if isinstance (mem_cube .act_mem , VLLMKVCacheMemory ):
189+ act_mem : VLLMKVCacheMemory = mem_cube .act_mem
190+ elif isinstance (mem_cube .act_mem , KVCacheMemory ):
191+ act_mem : KVCacheMemory = mem_cube .act_mem
192+ else :
193+ logger .error ("Not Implemented." )
194+ return
191195
192196 text_memory = MEMORY_ASSEMBLY_TEMPLATE .format (
193197 memory_text = "" .join (
@@ -200,14 +204,17 @@ def update_activation_memory(
200204 )
201205
202206 # huggingface or vllm kv cache
203- original_cache_items : list [KVCacheItem ] = act_mem .get_all ()
204- pre_cache_item : KVCacheItem = original_cache_items [- 1 ]
205- original_text_memories = pre_cache_item .records .text_memories
206- act_mem .delete_all ()
207- cache_item : KVCacheItem = act_mem .extract (text_memory )
207+ original_cache_items : list [VLLMKVCacheItem ] = act_mem .get_all ()
208+ original_text_memories = []
209+ if len (original_cache_items ) > 0 :
210+ pre_cache_item : VLLMKVCacheItem = original_cache_items [- 1 ]
211+ original_text_memories = pre_cache_item .records .text_memories
212+ act_mem .delete_all ()
213+
214+ cache_item = act_mem .extract (text_memory )
208215 cache_item .records .text_memories = new_text_memories
209216
210- act_mem .add (cache_item )
217+ act_mem .add ([ cache_item ] )
211218 act_mem .dump (self .act_mem_dump_path )
212219
213220 self .log_activation_memory_update (
@@ -219,10 +226,11 @@ def update_activation_memory(
219226 )
220227
221228 except Exception as e :
222- logger .warning (f"MOS-based activation memory update failed: { e } " )
229+ logger .warning (f"MOS-based activation memory update failed: { e } " , exc_info = True )
223230
224231 def update_activation_memory_periodically (
225232 self ,
233+ interval_seconds : int ,
226234 user_id : str ,
227235 mem_cube_id : str ,
228236 mem_cube : GeneralMemCube ,
@@ -231,8 +239,10 @@ def update_activation_memory_periodically(
231239
232240 if self .monitor .timed_trigger (
233241 last_time = self .monitor ._last_activation_mem_update_time ,
234- interval_seconds = self . monitor . act_mem_update_interval ,
242+ interval_seconds = interval_seconds ,
235243 ):
244+ logger .info (f"Updating activation memory for user { user_id } and mem_cube { mem_cube_id } " )
245+
236246 self .monitor .update_memory_monitors (
237247 user_id = user_id , mem_cube_id = mem_cube_id , mem_cube = mem_cube
238248 )
@@ -241,10 +251,24 @@ def update_activation_memory_periodically(
241251 m .memory_text for m in self .monitor .activation_memory_monitors [user_id ][mem_cube_id ]
242252 ]
243253
254+ logger .info (
255+ f"Collected { len (new_activation_memories )} new memory entries for processing"
256+ )
257+
244258 self .update_activation_memory (new_memories = new_activation_memories , mem_cube = mem_cube )
245259
246260 self .monitor ._last_activation_mem_update_time = datetime .now ()
247261
262+ logger .debug (
263+ f"Activation memory update completed at { self .monitor ._last_activation_mem_update_time } "
264+ )
265+ else :
266+ logger .info (
267+ f"Skipping update - { interval_seconds } second interval not yet reached. "
268+ f"Last update time is { self .monitor ._last_activation_mem_update_time } and now is"
269+ f"{ datetime .now ()} "
270+ )
271+
248272 def submit_messages (self , messages : ScheduleMessageItem | list [ScheduleMessageItem ]):
249273 """Submit multiple messages to the message queue."""
250274 if isinstance (messages , ScheduleMessageItem ):
0 commit comments