forked from agentscope-ai/ReMe
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreme_light.py
More file actions
671 lines (569 loc) · 27.2 KB
/
reme_light.py
File metadata and controls
671 lines (569 loc) · 27.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
"""
ReMe Light Application Module
This module provides the ReMeLight class, a specialized application built on top of
ReMe's core Application framework. It integrates memory management capabilities
including memory compaction, summarization, tool result management, and semantic
memory search functionality.
Key Features:
- Memory compaction and summarization for long conversations
- Tool result compaction with file-based storage for large outputs
- Semantic memory search using vector and full-text search
- Configurable embedding models and vector store backends
- Async task management for background summarization
"""
import asyncio
import logging
import os
import platform
from pathlib import Path
from agentscope.formatter import FormatterBase
from agentscope.message import Msg, TextBlock
from agentscope.model import ChatModelBase, OpenAIChatModel
from agentscope.token import HuggingFaceTokenCounter
from agentscope.tool import Toolkit, ToolResponse
from .config import ReMeConfigParser
from .core import Application
from .memory.file_based import Compactor, Summarizer, ToolResultCompactor, ReMeInMemoryMemory, ReMeChatFormatter
from .memory.file_based.utils import get_token_counter
from .memory.tools import MemorySearch
from .core.utils import load_env
logger = logging.getLogger(__name__)
class ReMeLight(Application):
"""
ReMe Light Application Class
A specialized application class that extends ReMe's core Application framework
with advanced memory management capabilities. This class is designed to handle
long-running conversations by providing intelligent memory compaction,
summarization, and semantic search features.
Attributes:
working_path (Path): Absolute path to the working directory for storing data
memory_path (Path): Path to the memory storage directory
tool_result_path (Path): Path to store large tool result files
chat_model (ChatModelBase): Language model for generating summaries and processing
formatter (FormatterBase): Formatter for structuring model inputs/outputs
token_counter (HuggingFaceTokenCounter): Token counting utility for length management
toolkit (Toolkit): Collection of tools available to the application
max_input_length (int): Maximum allowed input length in tokens
memory_compact_threshold (int): Threshold at which memory compaction triggers
language (str): Language code for localization ("zh" for Chinese, empty for English)
vector_weight (float): Weight for vector search in hybrid search (0.0-1.0)
candidate_multiplier (float): Multiplier for candidate retrieval in search
tool_result_threshold (int): Size threshold for tool result compaction
retention_days (int): Number of days to retain tool result files
summary_tasks (list[asyncio.Task]): List of background summarization tasks
"""
def __init__(
self,
working_dir: str = ".reme",
llm_api_key: str | None = None,
llm_base_url: str | None = None,
embedding_api_key: str | None = None,
embedding_base_url: str | None = None,
chat_model: ChatModelBase | None = None,
formatter: FormatterBase | None = None,
token_counter: HuggingFaceTokenCounter | None = None,
toolkit: Toolkit | None = None,
max_input_length: int = 128000,
memory_compact_ratio: float = 0.7,
language: str = "zh",
vector_weight: float = 0.7,
candidate_multiplier: float = 3.0,
tool_result_threshold: int = 1000,
retention_days: int = 7,
):
# Initialize working directory structure
# All application data will be stored under this path
self.working_path = Path(working_dir).absolute()
self.working_path.mkdir(parents=True, exist_ok=True)
# Create memory storage directory for persistent memory files
self.memory_path = self.working_path / "memory"
self.memory_path.mkdir(parents=True, exist_ok=True)
# Create tool result directory for storing large tool outputs
self.tool_result_path = self.working_path / "tool_result"
self.tool_result_path.mkdir(parents=True, exist_ok=True)
# Initialize runtime parameters (will be updated via update_params)
self.max_input_length: int = 0
self.memory_compact_threshold: int = 0
self.language: str = ""
# Apply initial parameter configuration
self.update_params(
max_input_length=max_input_length,
memory_compact_ratio=memory_compact_ratio,
language=language,
)
# Store configuration parameters
self.vector_weight: float = vector_weight
self.candidate_multiplier: float = candidate_multiplier
self.tool_result_threshold: int = tool_result_threshold
self.retention_days: int = retention_days
load_env()
llm_model_name = self._safe_str("LLM_MODEL_NAME", "")
embedding_model_name = self._safe_str("EMBEDDING_MODEL_NAME", "")
embedding_dimensions = self._safe_int("EMBEDDING_DIMENSIONS", 1024)
embedding_cache_enabled = self._safe_str("EMBEDDING_CACHE_ENABLED", "true").lower() == "true"
embedding_max_cache_size = self._safe_int("EMBEDDING_MAX_CACHE_SIZE", 2000)
embedding_max_input_length = self._safe_int("EMBEDDING_MAX_INPUT_LENGTH", 8192)
embedding_max_batch_size = self._safe_int("EMBEDDING_MAX_BATCH_SIZE", 10)
# Determine if vector search should be enabled based on configuration
# Vector search requires either an API key or a local model name
vector_enabled = bool(embedding_api_key) or bool(embedding_model_name)
if vector_enabled:
logger.info("Vector search enabled.")
else:
logger.warning(
"Vector search disabled. Memory search functionality will be restricted. "
"To enable, configure: EMBEDDING_API_KEY, EMBEDDING_BASE_URL, EMBEDDING_MODEL_NAME.",
)
# Check if full-text search (FTS) is enabled via environment variable
fts_enabled = os.environ.get("FTS_ENABLED", "true").lower() == "true"
# Determine the memory store backend to use
# "auto" selects based on platform (local for Windows, chroma otherwise)
memory_store_backend = os.environ.get("MEMORY_STORE_BACKEND", "auto")
if memory_store_backend == "auto":
memory_backend = "local" if platform.system() == "Windows" else "chroma"
else:
memory_backend = memory_store_backend
# Initialize the parent Application class with comprehensive configuration
super().__init__(
llm_api_key=llm_api_key,
llm_base_url=llm_base_url,
embedding_api_key=embedding_api_key,
embedding_base_url=embedding_base_url,
working_dir=str(self.working_path),
config_path="light",
enable_logo=False,
log_to_console=False,
parser=ReMeConfigParser,
default_embedding_model_config={
"model_name": embedding_model_name,
"dimensions": embedding_dimensions,
"enable_cache": embedding_cache_enabled,
"use_dimensions": False,
"max_cache_size": embedding_max_cache_size,
"max_input_length": embedding_max_input_length,
"max_batch_size": embedding_max_batch_size,
},
default_file_store_config={
"backend": memory_backend,
"store_name": "copaw",
"vector_enabled": vector_enabled,
"fts_enabled": fts_enabled,
},
default_file_watcher_config={
"watch_paths": [
str(self.working_path / "MEMORY.md"),
str(self.working_path / "memory.md"),
str(self.memory_path),
],
},
)
if chat_model is not None:
self.chat_model: ChatModelBase = chat_model
else:
# add more params later
self.chat_model = OpenAIChatModel(
api_key=os.environ["LLM_API_KEY"],
client_kwargs={"base_url": os.environ["LLM_BASE_URL"]},
model_name=llm_model_name,
)
if token_counter is not None:
self.token_counter: HuggingFaceTokenCounter = token_counter
else:
self.token_counter = get_token_counter()
if formatter is not None:
self.formatter: FormatterBase = formatter
else:
self.formatter = ReMeChatFormatter(token_counter=self.token_counter)
self.toolkit: Toolkit | None = toolkit
# Initialize list to track background summarization tasks
self.summary_tasks: list[asyncio.Task] = []
def update_params(
self,
max_input_length: int,
memory_compact_ratio: float,
language: str,
):
"""
Update runtime parameters for memory management.
This method allows dynamic adjustment of memory-related parameters during
runtime. It recalculates the memory compaction threshold based on the
new input length and compaction ratio.
Args:
max_input_length (int): New maximum input length in tokens
memory_compact_ratio (float): Ratio at which to trigger compaction (0.0-1.0)
language (str): Language code for localization ("zh" or other)
Note:
The memory_compact_threshold is calculated as:
max_input_length * memory_compact_ratio * 0.9
The 0.9 factor provides a safety margin before reaching the absolute limit
"""
# Update the maximum allowed input length
self.max_input_length = max_input_length
# Calculate compaction threshold with safety margin
# This ensures compaction happens before hitting the hard limit
self.memory_compact_threshold = int(max_input_length * memory_compact_ratio * 0.9)
# Set language for localization
if language == "zh":
self.language = "zh"
else:
self.language = ""
@staticmethod
def _safe_str(key: str, default: str) -> str:
"""
Safely retrieve a string value from an environment variable.
Args:
key (str): The name of the environment variable to retrieve
default (str): The default value to return if the variable is not set
Returns:
str: The value of the environment variable, or the default if not set
"""
return os.environ.get(key, default)
@staticmethod
def _safe_int(key: str, default: int) -> int:
"""
Safely retrieve an integer value from an environment variable.
This method handles cases where the environment variable is not set
or contains a non-integer value by returning the specified default.
Args:
key (str): The name of the environment variable to retrieve
default (int): The default value to return on failure or if not set
Returns:
int: The integer value of the environment variable, or the default
Note:
Logs a warning if the value exists but cannot be parsed as an integer
"""
value = os.environ.get(key)
if value is None:
return default
try:
return int(value)
except ValueError:
logger.warning(f"Invalid int value '{value}' for key '{key}', using default {default}")
return default
def _cleanup_tool_results(self) -> int:
"""
Clean up expired tool result files from the tool result directory.
This method removes tool result files that have exceeded the retention
period specified during initialization. It helps manage disk space by
automatically removing old, unused tool outputs.
Returns:
int: The number of files that were successfully deleted
Note:
Exceptions during cleanup are logged but do not raise errors,
ensuring the application continues to function even if cleanup fails
"""
try:
# Create a compactor instance with current configuration
compactor = ToolResultCompactor(
tool_result_dir=self.tool_result_path,
tool_result_threshold=self.tool_result_threshold,
retention_days=self.retention_days,
)
# Execute cleanup and return count of deleted files
return compactor.cleanup_expired_files()
except Exception as e:
# Log exception details but return 0 to indicate failure gracefully
logger.exception(f"Error cleaning up tool results: {e}")
return 0
async def start(self):
"""
Start the application lifecycle.
This method initializes the application by calling the parent class's
start method and performs initial cleanup of expired tool result files.
Returns:
The result from the parent class's start method
Note:
Tool result cleanup runs after successful startup to ensure
the application is fully initialized before performing maintenance
"""
# Initialize parent application components
result = await super().start()
# Perform initial cleanup of old tool result files
self._cleanup_tool_results()
return result
async def close(self) -> bool:
"""
Close the application and perform cleanup.
This method performs final cleanup of expired tool result files before
shutting down the application through the parent class's close method.
Returns:
bool: True if shutdown was successful, False otherwise
Note:
Cleanup is performed before calling parent close to ensure
all resources are available during the cleanup process
"""
# Clean up tool results before shutting down
self._cleanup_tool_results()
# Shutdown parent application components
return await super().close()
async def compact_tool_result(
self,
messages: list[Msg],
) -> list[Msg]:
"""
Compact tool results by truncating large outputs and saving full content to files.
This method processes a list of messages and identifies tool results that exceed
the configured size threshold. Large tool outputs are truncated in the message
list while their full content is saved to files for later retrieval.
Args:
messages (list[Msg]): List of messages to process for tool result compaction
Returns:
list[Msg]: The processed message list with large tool results compacted
Note:
- Tool results below the threshold remain unchanged in the messages
- Large results are replaced with truncated versions and file references
- Expired files are cleaned up as part of the compaction process
- If compaction fails, the original messages are returned unchanged
"""
try:
# Create compactor with instance configuration
compactor = ToolResultCompactor(
tool_result_dir=self.tool_result_path,
tool_result_threshold=self.tool_result_threshold,
retention_days=self.retention_days,
)
# Execute compaction and get processed messages
result = await compactor.call(messages=messages, service_context=self.service_context)
# Clean up any expired tool result files during compaction
compactor.cleanup_expired_files()
return result
except Exception as e:
# Log the error and return original messages to maintain functionality
logger.exception(f"Error compacting tool results: {e}")
return messages
async def compact_memory(self, messages: list[Msg], previous_summary: str = "") -> str:
"""
Compact a list of messages into a condensed summary.
This method uses the Compactor to reduce the length of message history
while preserving essential information. It's useful when conversation
history approaches the maximum input length limit.
Args:
messages (list[Msg]): The list of messages to compact
previous_summary (str): Optional previous summary to incorporate
into the compaction process for continuity
Returns:
str: A compacted summary of the messages, or empty string on failure
Note:
- Compaction uses the configured language model to generate summaries
- The compaction threshold determines when compaction is triggered
- If compaction fails, an empty string is returned
"""
try:
# Initialize compactor with current configuration
compactor = Compactor(
memory_compact_threshold=self.memory_compact_threshold,
chat_model=self.chat_model,
formatter=self.formatter,
token_counter=self.token_counter,
language=self.language,
)
# Execute compaction with optional previous summary context
return await compactor.call(
messages=messages,
previous_summary=previous_summary,
service_context=self.service_context,
)
except Exception as e:
# Log error and return empty string to indicate failure
logger.exception(f"Error compacting memory: {e}")
return ""
async def summary_memory(self, messages: list[Msg]) -> str:
"""
Generate a comprehensive summary of the given messages.
This method uses the Summarizer to create a detailed summary of the
conversation history, which can be stored as persistent memory. Unlike
compaction, summarization aims to capture key information in a format
suitable for long-term storage and retrieval.
Args:
messages (list[Msg]): The list of messages to summarize
Returns:
str: A generated summary of the messages, or empty string on failure
Note:
- Summarization may use tools from the toolkit to enhance the summary
- The summary is typically stored in the memory directory
- If summarization fails, an empty string is returned
"""
try:
# Initialize summarizer with working directories and configuration
compactor = Summarizer(
working_dir=str(self.working_path),
memory_dir=str(self.memory_path),
memory_compact_threshold=self.memory_compact_threshold,
chat_model=self.chat_model,
formatter=self.formatter,
token_counter=self.token_counter,
toolkit=self.toolkit,
language=self.language,
)
# Execute summarization on the provided messages
return await compactor.call(messages=messages, service_context=self.service_context)
except Exception as e:
# Log error and return empty string to indicate failure
logger.exception(f"Error summarizing memory: {e}")
return ""
async def await_summary_tasks(self) -> str:
"""
Wait for all background summary tasks to complete and collect results.
This method iterates through all pending summary tasks, waits for their
completion, and collects their results or error information. It's used
to synchronize with background summarization operations before shutdown
or when results are needed.
Returns:
str: A concatenated string containing the status and results of
all summary tasks, with each task on a new line
Note:
- Completed tasks are processed immediately without waiting
- Incomplete tasks are awaited with a timeout
- Cancelled tasks and exceptions are logged and included in results
- The task list is cleared after processing all tasks
"""
result = ""
for task in self.summary_tasks:
if task.done():
# Task has already completed, check its status
if task.cancelled():
logger.warning("Summary task was cancelled.")
result += "Summary task was cancelled.\n"
else:
# Check if the task raised an exception
exc = task.exception()
if exc is not None:
logger.exception(f"Summary task failed: {exc}")
result += f"Summary task failed: {exc}\n"
else:
# Task completed successfully, collect result
task_result = task.result()
logger.info(f"Summary task completed: {task_result}")
result += f"Summary task completed: {task_result}\n"
else:
# Task is still running, wait for it to complete
try:
task_result = await task
logger.info(f"Summary task completed: {task_result}")
result += f"Summary task completed: {task_result}\n"
except asyncio.CancelledError:
logger.warning("Summary task was cancelled while waiting.")
result += "Summary task was cancelled.\n"
except Exception as e:
logger.exception(f"Summary task failed: {e}")
result += f"Summary task failed: {e}\n"
# Clear the task list after processing all tasks
self.summary_tasks.clear()
return result
def add_async_summary_task(self, messages: list[Msg]):
"""
Add an asynchronous summary task for the given messages.
This method creates a background task to summarize the provided messages
without blocking the main execution flow. Before adding a new task, it
cleans up any completed tasks from the task list to prevent memory leaks.
Args:
messages (list[Msg]): The list of messages to be summarized in the
background task
Note:
- Completed tasks are removed from the tracking list before adding
- Task status (success, failure, cancellation) is logged for monitoring
- The new task is created using asyncio.create_task for true async execution
- Failed or cancelled tasks are logged but do not prevent new tasks
"""
# Clean up completed summary tasks before adding a new one
remaining_tasks = []
for task in self.summary_tasks:
if task.done():
# Process completed task status
if task.cancelled():
logger.warning("Summary task was cancelled.")
continue
exc = task.exception()
if exc is not None:
logger.exception(f"Summary task failed: {exc}")
else:
# Log successful completion with result summary
result = task.result()
logger.info(f"Summary task completed: {result}")
else:
# Keep incomplete tasks in the tracking list
remaining_tasks.append(task)
self.summary_tasks = remaining_tasks
# Create and track the new background summarization task
task = asyncio.create_task(self.summary_memory(messages=messages))
self.summary_tasks.append(task)
async def memory_search(self, query: str, max_results: int = 5, min_score: float = 0.1) -> ToolResponse:
"""
Perform semantic memory search using vector and full-text search.
This method searches the memory store for content relevant to the given query
using a hybrid approach combining vector similarity search and full-text search.
Results are ranked by relevance and filtered by the minimum score threshold.
Args:
query (str): The search query string. Must not be empty.
max_results (int): Maximum number of results to return (1-100, default: 5)
min_score (float): Minimum relevance score threshold (0.001-0.999, default: 0.1)
Returns:
ToolResponse: A ToolResponse containing the search results as text,
or an error message if the query is empty
Note:
- Vector search weight is controlled by self.vector_weight
- Candidate retrieval uses self.candidate_multiplier for broader search
- Parameters are validated and clamped to valid ranges
- Requires vector search to be enabled via embedding configuration
"""
# Validate query parameter
if not query:
return ToolResponse(
content=[
TextBlock(
type="text",
text="Error: No query provided.",
),
],
)
# Validate and clamp max_results to valid range [1, 100]
if isinstance(max_results, int):
max_results = min(max(max_results, 1), 100)
else:
max_results = 5
# Validate and clamp min_score to valid range [0.001, 0.999]
if isinstance(min_score, (int, float)):
min_score = min(max(min_score, 0.001), 0.999)
else:
min_score = 0.1
# Initialize memory search tool with configured weights
search_tool = MemorySearch(
vector_weight=self.vector_weight,
candidate_multiplier=self.candidate_multiplier,
)
# Execute the search with validated parameters
search_result = await search_tool.call(
query=query,
max_results=max_results,
min_score=min_score,
service_context=self.service_context,
)
# Return results wrapped in ToolResponse format
return ToolResponse(
content=[
TextBlock(
type="text",
text=search_result,
),
],
)
def get_in_memory_memory(self):
"""
Create and return an in-memory memory instance.
This method instantiates a ReMeInMemoryMemory object configured with
the current application's token counter, formatter, and input length limits.
The in-memory memory provides fast, temporary storage for conversation
context without persistence.
Returns:
ReMeInMemoryMemory: A configured in-memory memory instance ready
for storing and retrieving conversation messages
Note:
- In-memory memory is volatile and cleared when the instance is destroyed
- Useful for managing conversation context within a single session
- Shares the same token counter and formatter as the main application
"""
return ReMeInMemoryMemory(
token_counter=self.token_counter,
formatter=self.formatter,
max_input_length=self.max_input_length,
)