Skip to content
1 change: 1 addition & 0 deletions src/mcp_as_a_judge/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
DATABASE_URL = "sqlite://:memory:"
MAX_SESSION_RECORDS = 20 # Maximum records to keep per session (FIFO)
MAX_TOTAL_SESSIONS = 50 # Maximum total sessions to keep (LRU cleanup)
MAX_CONTEXT_TOKENS = 50000 # Maximum tokens for session token (1 token ≈ 4 characters)
63 changes: 30 additions & 33 deletions src/mcp_as_a_judge/db/conversation_history_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
create_database_provider,
)
from mcp_as_a_judge.db.db_config import Config
from mcp_as_a_judge.db.token_utils import filter_records_by_token_limit
from mcp_as_a_judge.logging_config import get_logger

# Set up logger
Expand All @@ -35,34 +36,52 @@ def __init__(
self.config = config
self.db = db_provider or create_database_provider(config)

async def load_context_for_enrichment(
self, session_id: str
async def load_filtered_context_for_enrichment(
self, session_id: str, current_prompt: str = ""
) -> list[ConversationRecord]:
"""
Load recent conversation records for LLM context enrichment.

Two-level filtering approach:
1. Database already enforces storage limits (record count + token limits)
2. Load-time filtering ensures history + current prompt fits within LLM context limits

Args:
session_id: Session identifier
current_prompt: Current prompt that will be sent to LLM (for token calculation)

Returns:
List of conversation records for LLM context
List of conversation records for LLM context (filtered for LLM limits)
"""
logger.info(f"🔍 Loading conversation history for session: {session_id}")

# Load recent conversations for this session
recent_records = await self.db.get_session_conversations(
session_id=session_id,
limit=self.config.database.max_session_records, # load last X records (same as save limit)
)
# Load all conversations for this session - database already contains
# records within storage limits, but we may need to filter further for LLM context
recent_records = await self.db.get_session_conversations(session_id)

logger.info(f"📚 Retrieved {len(recent_records)} conversation records from DB")
return recent_records

async def save_tool_interaction(
# Apply LLM context filtering: ensure history + current prompt will fit within token limit
# This filters the list without modifying the database (only token limit matters for LLM)
filtered_records = filter_records_by_token_limit(
recent_records, current_prompt=current_prompt
)

logger.info(
f"✅ Returning {len(filtered_records)} conversation records for LLM context"
)
return filtered_records

async def save_tool_interaction_and_cleanup(
self, session_id: str, tool_name: str, tool_input: str, tool_output: str
) -> str:
"""
Save a tool interaction as a conversation record.
Save a tool interaction as a conversation record and perform automatic cleanup.in the provider layer

After saving, the database provider automatically performs cleanup to enforce limits:
- Removes old records if session exceeds MAX_SESSION_RECORDS (20)
- Removes old records if session exceeds MAX_CONTEXT_TOKENS (50,000)
- Removes least recently used sessions if total sessions exceed MAX_TOTAL_SESSIONS (50)

Args:
session_id: Session identifier from AI agent
Expand All @@ -87,28 +106,6 @@ async def save_tool_interaction(
logger.info(f"✅ Saved conversation record with ID: {record_id}")
return record_id

async def get_conversation_history(
self, session_id: str
) -> list[ConversationRecord]:
"""
Get conversation history for a session to be injected into user prompts.

Args:
session_id: Session identifier

Returns:
List of conversation records for the session (most recent first)
"""
logger.info(f"🔄 Loading conversation history for session {session_id}")

context_records = await self.load_context_for_enrichment(session_id)

logger.info(
f"📝 Retrieved {len(context_records)} conversation records for session {session_id}"
)

return context_records

def format_conversation_history_as_json_array(
self, conversation_history: list[ConversationRecord]
) -> list[dict]:
Expand Down
3 changes: 3 additions & 0 deletions src/mcp_as_a_judge/db/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class ConversationRecord(SQLModel, table=True):
source: str # tool name
input: str # tool input query
output: str # tool output string
tokens: int = Field(
default=0
) # combined token count for input + output (1 token ≈ 4 characters)
timestamp: datetime = Field(
default_factory=datetime.utcnow, index=True
) # when the record was created
Expand Down
122 changes: 89 additions & 33 deletions src/mcp_as_a_judge/db/providers/sqlite_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from datetime import UTC, datetime

from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel, asc, desc, select
from sqlmodel import Session, SQLModel, desc, select

from mcp_as_a_judge.constants import MAX_CONTEXT_TOKENS
from mcp_as_a_judge.db.cleanup_service import ConversationCleanupService
from mcp_as_a_judge.db.interface import ConversationHistoryDB, ConversationRecord
from mcp_as_a_judge.db.token_utils import calculate_record_tokens
from mcp_as_a_judge.logging_config import get_logger

# Set up logger
Expand All @@ -32,7 +34,8 @@ class SQLiteProvider(ConversationHistoryDB):
- Two-level cleanup strategy:
1. Session-based LRU cleanup (runs when new sessions are created,
removes least recently used)
2. Per-session FIFO cleanup (max 20 records per session, runs on every save)
2. Per-session hybrid cleanup (respects both record count and token limits, runs on every save)
- Token-aware storage and retrieval
- Session-based conversation retrieval
"""

Expand Down Expand Up @@ -93,54 +96,103 @@ def _cleanup_excess_sessions(self) -> int:

def _cleanup_old_messages(self, session_id: str) -> int:
"""
Remove old messages from a session using FIFO strategy.
Keeps only the most recent max_session_records messages per session.
Remove old messages from a session using efficient hybrid FIFO strategy.

Two-step process:
1. If record count > max_records, remove oldest record
2. If total tokens > max_tokens, remove oldest records until within limit

Optimization: Single DB query with ORDER BY, then in-memory list operations.
Eliminates 2 extra database queries compared to naive implementation.
"""
with Session(self.engine) as session:
# Count current messages in session
count_stmt = select(ConversationRecord).where(
ConversationRecord.session_id == session_id
# Get current records ordered by timestamp DESC (newest first for token calculation)
count_stmt = (
select(ConversationRecord)
.where(ConversationRecord.session_id == session_id)
.order_by(desc(ConversationRecord.timestamp))
)
current_records = session.exec(count_stmt).all()
current_records = list(session.exec(count_stmt).all())
current_count = len(current_records)

logger.info(
f"🧹 FIFO cleanup check for session {session_id}: "
f"{current_count} records (max: {self._max_session_records})"
f"🧹 Cleanup check for session {session_id}: {current_count} records "
f"(max: {self._max_session_records})"
)

if current_count <= self._max_session_records:
logger.info(" No cleanup needed - within limits")
return 0
removed_count = 0

# Get oldest records to remove (FIFO)
records_to_remove = current_count - self._max_session_records
oldest_stmt = (
select(ConversationRecord)
.where(ConversationRecord.session_id == session_id)
.order_by(asc(ConversationRecord.timestamp))
.limit(records_to_remove)
)
old_records = session.exec(oldest_stmt).all()
# STEP 1: Handle record count limit
if current_count > self._max_session_records:
logger.info(" 📊 Record limit exceeded, removing 1 oldest record")

# Take the last record (oldest) since list is sorted by timestamp DESC (newest first)
oldest_record = current_records[-1]

logger.info(f"🗑️ Removing {len(old_records)} oldest records:")
for i, record in enumerate(old_records, 1):
logger.info(
f" {i}. ID: {record.id[:8] if record.id else 'None'}... | "
f"Source: {record.source} | Timestamp: {record.timestamp}"
f" 🗑️ Removing oldest record: {oldest_record.source} | {oldest_record.tokens} tokens | {oldest_record.timestamp}"
)
session.delete(oldest_record)
removed_count += 1
session.commit()
logger.info(" ✅ Removed 1 record due to record limit")

# Remove the old messages
for record in old_records:
session.delete(record)
# Update our in-memory list to reflect the deletion
current_records.remove(oldest_record)

session.commit()
# STEP 2: Handle token limit (list is already sorted newest first - perfect for token calculation)
current_tokens = sum(record.tokens for record in current_records)

logger.info(
f"✅ LRU cleanup completed: removed {len(old_records)} records "
f"from session {session_id}"
f" 🔢 {len(current_records)} records, {current_tokens} tokens "
f"(max: {MAX_CONTEXT_TOKENS})"
)
return len(old_records)

if current_tokens > MAX_CONTEXT_TOKENS:
logger.info(
f" 🚨 Token limit exceeded, removing oldest records to fit within {MAX_CONTEXT_TOKENS} tokens"
)

# Calculate which records to keep (newest first, within token limit)
records_to_keep = []
running_tokens = 0

for record in current_records: # Already ordered newest first
if running_tokens + record.tokens <= MAX_CONTEXT_TOKENS:
records_to_keep.append(record)
running_tokens += record.tokens
else:
break

# Remove records that didn't make the cut
records_to_remove_for_tokens = current_records[len(records_to_keep) :]

if records_to_remove_for_tokens:
logger.info(
f" 🗑️ Removing {len(records_to_remove_for_tokens)} records for token limit "
f"(keeping {len(records_to_keep)} records, {running_tokens} tokens)"
)

for record in records_to_remove_for_tokens:
logger.info(
f" - {record.source} | {record.tokens} tokens | {record.timestamp}"
)
session.delete(record)
removed_count += 1

session.commit()
logger.info(
f" ✅ Removed {len(records_to_remove_for_tokens)} additional records due to token limit"
)

if removed_count > 0:
logger.info(
f"✅ Cleanup completed for session {session_id}: removed {removed_count} total records"
)
else:
logger.info(" ✅ No cleanup needed - within both limits")

return removed_count

def _is_new_session(self, session_id: str) -> bool:
"""Check if this is a new session (no existing records)."""
Expand All @@ -167,13 +219,17 @@ async def save_conversation(
# Check if this is a new session before saving
is_new_session = self._is_new_session(session_id)

# Calculate token count for input + output
token_count = calculate_record_tokens(input_data, output)

# Create new record
record = ConversationRecord(
id=record_id,
session_id=session_id,
source=source,
input=input_data,
output=output,
tokens=token_count,
timestamp=timestamp,
)

Expand Down
104 changes: 104 additions & 0 deletions src/mcp_as_a_judge/db/token_utils.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is less recommended
I suggest: https://docs.litellm.ai/docs/completion/token_usage#3-token_counter
If the client provide LLM_API_KEY we can get the model name.
Else, if it uses sampling, it's a bit more tricky:

    result = await ctx.session.create_message(  
        messages=[SamplingMessage(role="user", content=TextContent(type="text", text=prompt))],  
        max_tokens=100,  
    )  
      
    # Cache the model name for token counting  
    model_name = result.model 

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Token calculation utilities for conversation history.

This module provides utilities for calculating token counts from text
using the approximation that 1 token ≈ 4 characters of English text.
"""

from mcp_as_a_judge.constants import MAX_CONTEXT_TOKENS


def calculate_tokens(text: str) -> int:
"""
Calculate approximate token count from text.

Uses the approximation that 1 token ≈ 4 characters of English text.
This is a simple heuristic that works reasonably well for most text.

Args:
text: Input text to calculate tokens for

Returns:
Approximate token count (rounded up to nearest integer)
"""
if not text:
return 0

# Use ceiling division to round up: (len(text) + 3) // 4
# This ensures we don't underestimate token count
return (len(text) + 3) // 4


def calculate_record_tokens(input_text: str, output_text: str) -> int:
"""
Calculate total token count for input and output text.

Combines the token counts of input and output text.

Args:
input_text: Input text string
output_text: Output text string

Returns:
Combined token count for both input and output
"""
return calculate_tokens(input_text) + calculate_tokens(output_text)


def calculate_total_tokens(records: list) -> int:
"""
Calculate total token count for a list of conversation records.

Args:
records: List of ConversationRecord objects with tokens field

Returns:
Sum of all token counts in the records
"""
return sum(record.tokens for record in records if hasattr(record, "tokens"))


def filter_records_by_token_limit(records: list, current_prompt: str = "") -> list:
"""
Filter conversation records to stay within token and record limits.

Removes oldest records (FIFO) when token limit is exceeded while
trying to keep as many recent records as possible.

Args:
records: List of ConversationRecord objects (assumed to be in reverse chronological order)
max_records: Maximum number of records to keep (optional)
current_prompt: Current prompt that will be sent to LLM (for token calculation)

Returns:
Filtered list of records that fit within the limits
"""
if not records:
return []

# Calculate current prompt tokens
current_prompt_tokens = (
calculate_record_tokens(current_prompt, "") if current_prompt else 0
)

# Calculate total tokens including current prompt
history_tokens = calculate_total_tokens(records)
total_tokens = history_tokens + current_prompt_tokens

# If total tokens (history + current prompt) are within limit, return all records
if total_tokens <= MAX_CONTEXT_TOKENS:
return records

# Remove oldest records (from the end since records are in reverse chronological order)
# until history + current prompt fit within the token limit
filtered_records = records.copy()
current_history_tokens = history_tokens

while (current_history_tokens + current_prompt_tokens) > MAX_CONTEXT_TOKENS and len(
filtered_records
) > 1:
# Remove the oldest record (last in the list)
removed_record = filtered_records.pop()
current_history_tokens -= getattr(removed_record, "tokens", 0)

return filtered_records
Loading