5
5
import ulid
6
6
from bertopic import BERTopic
7
7
from redis .asyncio .client import Redis
8
- from redisvl .query .filter import Tag
9
- from redisvl .query .query import FilterQuery
10
8
from tenacity .asyncio import AsyncRetrying
11
9
from tenacity .stop import stop_after_attempt
12
10
from transformers import AutoModelForTokenClassification , AutoTokenizer , pipeline
13
11
14
12
from agent_memory_server .config import settings
13
+ from agent_memory_server .filters import DiscreteMemoryExtracted
15
14
from agent_memory_server .llms import (
16
15
AnthropicClientWrapper ,
17
16
OpenAIClientWrapper ,
18
17
get_model_client ,
19
18
)
20
19
from agent_memory_server .logging import get_logger
21
20
from agent_memory_server .models import MemoryRecord
22
- from agent_memory_server .utils .redis import get_redis_conn , get_search_index
21
+ from agent_memory_server .utils .keys import Keys
22
+ from agent_memory_server .utils .redis import get_redis_conn
23
23
24
24
25
25
logger = get_logger (__name__ )
@@ -269,25 +269,32 @@ async def extract_discrete_memories(
269
269
"""
270
270
redis = await get_redis_conn ()
271
271
client = await get_model_client (settings .generation_model )
272
- query = FilterQuery (
273
- filter_expression = (Tag ("discrete_memory_extracted" ) == "f" )
274
- & (Tag ("memory_type" ) == "message" )
275
- )
272
+
273
+ # Use vectorstore adapter to find messages that need discrete memory extraction
274
+ from agent_memory_server .filters import MemoryType
275
+ from agent_memory_server .vectorstore_factory import get_vectorstore_adapter
276
+
277
+ adapter = await get_vectorstore_adapter ()
276
278
offset = 0
277
279
278
280
while True :
279
- query .paging (num = 25 , offset = offset )
280
- search_index = get_search_index (redis = redis )
281
- messages = await search_index .query (query )
281
+ # Search for message-type memories that haven't been processed for discrete extraction
282
+ search_result = await adapter .search_memories (
283
+ query = "" , # Empty query to get all messages
284
+ memory_type = MemoryType (eq = "message" ),
285
+ discrete_memory_extracted = DiscreteMemoryExtracted (ne = "t" ),
286
+ limit = 25 ,
287
+ offset = offset ,
288
+ )
289
+
282
290
discrete_memories = []
283
291
284
- for message in messages :
285
- if not message or not message .get ( " text" ) :
292
+ for message in search_result . memories :
293
+ if not message or not message .text :
286
294
logger .info (f"Deleting memory with no text: { message } " )
287
- await redis . delete ( message [ "id" ])
295
+ await adapter . delete_memories ([ message . id ])
288
296
continue
289
- id_ = message .get ("id_" )
290
- if not id_ :
297
+ if not message .id :
291
298
logger .error (f"Skipping memory with no ID: { message } " )
292
299
continue
293
300
@@ -296,7 +303,7 @@ async def extract_discrete_memories(
296
303
response = await client .create_chat_completion (
297
304
model = settings .generation_model ,
298
305
prompt = DISCRETE_EXTRACTION_PROMPT .format (
299
- message = message [ " text" ] , top_k_topics = settings .top_k_topics
306
+ message = message . text , top_k_topics = settings .top_k_topics
300
307
),
301
308
response_format = {"type" : "json_object" },
302
309
)
@@ -317,13 +324,15 @@ async def extract_discrete_memories(
317
324
raise
318
325
discrete_memories .extend (new_message ["memories" ])
319
326
327
+ # Update the memory to mark it as processed
328
+ # For now, we need to use Redis directly as the adapter doesn't have an update method
320
329
await redis .hset (
321
- name = message [ "id" ],
330
+ name = Keys . memory_key ( message . id ), # Construct the key
322
331
key = "discrete_memory_extracted" ,
323
332
value = "t" ,
324
333
) # type: ignore
325
334
326
- if len (messages ) < 25 :
335
+ if len (search_result . memories ) < 25 :
327
336
break
328
337
offset += 25
329
338
@@ -333,7 +342,7 @@ async def extract_discrete_memories(
333
342
if discrete_memories :
334
343
long_term_memories = [
335
344
MemoryRecord (
336
- id_ = str (ulid .ULID ()),
345
+ id = str (ulid .ULID ()),
337
346
text = new_memory ["text" ],
338
347
memory_type = new_memory .get ("type" , "episodic" ),
339
348
topics = new_memory .get ("topics" , []),
0 commit comments