Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 79 additions & 18 deletions apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -987,18 +987,19 @@ export class AgentBlockHandler implements BlockHandler {
try {
const executionData = JSON.parse(executionDataHeader)

// If execution data contains full content, persist to memory
if (ctx && inputs && executionData.output?.content) {
const assistantMessage: Message = {
role: 'assistant',
content: executionData.output.content,
}
// Fire and forget - don't await
memoryService
.persistMemoryMessage(ctx, inputs, assistantMessage, block.id)
.catch((error) =>
logger.error('Failed to persist streaming response to memory:', error)
// If execution data contains content or tool calls, persist to memory
if (ctx && inputs && (executionData.output?.content || executionData.output?.toolCalls?.list?.length)) {
const toolCalls = executionData.output?.toolCalls?.list
const messages = this.buildMessagesForMemory(executionData.output.content, toolCalls)

// Fire and forget - don't await, persist all messages
Promise.all(
messages.map((message) =>
memoryService.persistMemoryMessage(ctx, inputs, message, block.id)
)
).catch((error) =>
logger.error('Failed to persist streaming response to memory:', error)
)
}

return {
Expand Down Expand Up @@ -1117,32 +1118,92 @@ export class AgentBlockHandler implements BlockHandler {
return
}

// Extract content from regular response
// Extract content and tool calls from regular response
const blockOutput = result as any
const content = blockOutput?.content
const toolCalls = blockOutput?.toolCalls?.list

if (!content || typeof content !== 'string') {
// Build messages to persist
const messages = this.buildMessagesForMemory(content, toolCalls)

if (messages.length === 0) {
return
}

const assistantMessage: Message = {
role: 'assistant',
content,
// Persist all messages
for (const message of messages) {
await memoryService.persistMemoryMessage(ctx, inputs, message, blockId)
}

await memoryService.persistMemoryMessage(ctx, inputs, assistantMessage, blockId)

logger.debug('Persisted assistant response to memory', {
workflowId: ctx.workflowId,
memoryType: inputs.memoryType,
conversationId: inputs.conversationId,
messageCount: messages.length,
})
} catch (error) {
logger.error('Failed to persist response to memory:', error)
// Don't throw - memory persistence failure shouldn't break workflow execution
}
}

/**
* Builds messages for memory storage including tool calls and results
* Returns proper OpenAI-compatible message format:
* - Assistant message with tool_calls array (if tools were used)
* - Tool role messages with results (one per tool call)
* - Final assistant message with content (if present)
*/
private buildMessagesForMemory(content: string | undefined, toolCalls: any[] | undefined): Message[] {
const messages: Message[] = []

if (toolCalls?.length) {
// Generate stable IDs for each tool call (only if not provided by provider)
// Use index to ensure uniqueness even for same tool name in same millisecond
const toolCallsWithIds = toolCalls.map((tc: any, index: number) => ({
...tc,
_stableId: tc.id || `call_${tc.name}_${Date.now()}_${index}_${Math.random().toString(36).slice(2, 7)}`,
}))

// Add assistant message with tool_calls
const formattedToolCalls = toolCallsWithIds.map((tc: any) => ({
id: tc._stableId,
type: 'function' as const,
function: {
name: tc.name,
arguments: tc.rawArguments || JSON.stringify(tc.arguments || {}),
},
}))

messages.push({
role: 'assistant',
content: null,
tool_calls: formattedToolCalls,
})

// Add tool result messages using the same stable IDs
for (const tc of toolCallsWithIds) {
const resultContent = typeof tc.result === 'string' ? tc.result : JSON.stringify(tc.result || {})
messages.push({
role: 'tool',
content: resultContent,
tool_call_id: tc._stableId,
name: tc.name, // Store tool name for providers that need it (e.g., Google/Gemini)
})
}
}

// Add final assistant response if present
if (content && typeof content === 'string') {
messages.push({
role: 'assistant',
content,
})
}

return messages
}

private processProviderResponse(
response: any,
block: SerializedBlock,
Expand Down
155 changes: 125 additions & 30 deletions apps/sim/executor/handlers/agent/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,20 +202,99 @@ export class Memory {
const systemMessages = messages.filter((msg) => msg.role === 'system')
const conversationMessages = messages.filter((msg) => msg.role !== 'system')

const recentMessages = conversationMessages.slice(-limit)
// Group messages into conversation turns
// A turn = user message + any tool calls/results + assistant response
const turns = this.groupMessagesIntoTurns(conversationMessages)

// Take the last N turns
const recentTurns = turns.slice(-limit)

// Flatten back to messages
const recentMessages = recentTurns.flat()

const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []

return [...firstSystemMessage, ...recentMessages]
}

/**
* Groups messages into conversation turns.
* A turn starts with a user message and includes all subsequent messages
* until the next user message (tool calls, tool results, assistant response).
*/
private groupMessagesIntoTurns(messages: Message[]): Message[][] {
const turns: Message[][] = []
let currentTurn: Message[] = []

for (const msg of messages) {
if (msg.role === 'user') {
// Start a new turn
if (currentTurn.length > 0) {
turns.push(currentTurn)
}
currentTurn = [msg]
} else {
// Add to current turn (assistant, tool, etc.)
currentTurn.push(msg)
}
}

// Don't forget the last turn
if (currentTurn.length > 0) {
turns.push(currentTurn)
}

return turns
}

/**
* Remove orphaned tool messages that don't have a corresponding tool_calls message
* This prevents errors like "tool_result without corresponding tool_use"
*/
private removeOrphanedToolMessages(messages: Message[]): Message[] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: removeOrphanedToolMessages() is defined but never called - appears to be dead code (functionality is now in sanitizeMessagesForProvider() from utils.ts)

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/executor/handlers/agent/memory.ts
Line: 254:254

Comment:
**style:** `removeOrphanedToolMessages()` is defined but never called - appears to be dead code (functionality is now in `sanitizeMessagesForProvider()` from utils.ts)

How can I resolve this? If you propose a fix, please make it concise.

const result: Message[] = []
const seenToolCallIds = new Set<string>()

// First pass: collect all tool_call IDs from assistant messages with tool_calls
for (const msg of messages) {
if (msg.role === 'assistant' && msg.tool_calls && Array.isArray(msg.tool_calls)) {
for (const tc of msg.tool_calls) {
if (tc.id) {
seenToolCallIds.add(tc.id)
}
}
}
}

// Second pass: only include tool messages that have a matching tool_calls message
for (const msg of messages) {
if (msg.role === 'tool') {
const toolCallId = (msg as any).tool_call_id
if (toolCallId && seenToolCallIds.has(toolCallId)) {
result.push(msg)
} else {
logger.debug('Removing orphaned tool message', { toolCallId })
}
} else {
result.push(msg)
}
}

return result
}

/**
* Apply token-based sliding window to limit conversation by token count
*
* System message handling:
* - For consistency with message-based sliding window, the first system message is preserved
* - System messages are excluded from the token count
* - This ensures system prompts are always available while limiting conversation history
*
* Turn handling:
* - Messages are grouped into turns (user + tool calls/results + assistant response)
* - Complete turns are added to stay within token limit
* - This prevents breaking tool call/result pairs
*/
private applySlidingWindowByTokens(
messages: Message[],
Expand All @@ -233,43 +312,52 @@ export class Memory {
const systemMessages = messages.filter((msg) => msg.role === 'system')
const conversationMessages = messages.filter((msg) => msg.role !== 'system')

// Group into turns to keep tool call/result pairs together
const turns = this.groupMessagesIntoTurns(conversationMessages)

const result: Message[] = []
let currentTokenCount = 0

// Add conversation messages from most recent backwards
for (let i = conversationMessages.length - 1; i >= 0; i--) {
const message = conversationMessages[i]
const messageTokens = getAccurateTokenCount(message.content, model)
// Add turns from most recent backwards
for (let i = turns.length - 1; i >= 0; i--) {
const turn = turns[i]
const turnTokens = turn.reduce(
(sum, msg) => sum + getAccurateTokenCount(msg.content || '', model),
0
)

if (currentTokenCount + messageTokens <= tokenLimit) {
result.unshift(message)
currentTokenCount += messageTokens
if (currentTokenCount + turnTokens <= tokenLimit) {
result.unshift(...turn)
currentTokenCount += turnTokens
} else if (result.length === 0) {
logger.warn('Single message exceeds token limit, including anyway', {
messageTokens,
logger.warn('Single turn exceeds token limit, including anyway', {
turnTokens,
tokenLimit,
messageRole: message.role,
turnMessages: turn.length,
})
result.unshift(message)
currentTokenCount += messageTokens
result.unshift(...turn)
currentTokenCount += turnTokens
break
} else {
// Token limit reached, stop processing
break
}
}

// No need to remove orphaned messages - turns are already complete
const cleanedResult = result

logger.debug('Applied token-based sliding window', {
totalMessages: messages.length,
conversationMessages: conversationMessages.length,
includedMessages: result.length,
includedMessages: cleanedResult.length,
totalTokens: currentTokenCount,
tokenLimit,
})

// Preserve first system message and prepend to results (consistent with message-based window)
const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []
return [...firstSystemMessage, ...result]
return [...firstSystemMessage, ...cleanedResult]
}

/**
Expand Down Expand Up @@ -324,7 +412,7 @@ export class Memory {
// Count tokens used by system messages first
let systemTokenCount = 0
for (const msg of systemMessages) {
systemTokenCount += getAccurateTokenCount(msg.content, model)
systemTokenCount += getAccurateTokenCount(msg.content || '', model)
}

// Calculate remaining tokens available for conversation messages
Expand All @@ -339,30 +427,36 @@ export class Memory {
return systemMessages
}

// Group into turns to keep tool call/result pairs together
const turns = this.groupMessagesIntoTurns(conversationMessages)

const result: Message[] = []
let currentTokenCount = 0

for (let i = conversationMessages.length - 1; i >= 0; i--) {
const message = conversationMessages[i]
const messageTokens = getAccurateTokenCount(message.content, model)
for (let i = turns.length - 1; i >= 0; i--) {
const turn = turns[i]
const turnTokens = turn.reduce(
(sum, msg) => sum + getAccurateTokenCount(msg.content || '', model),
0
)

if (currentTokenCount + messageTokens <= remainingTokens) {
result.unshift(message)
currentTokenCount += messageTokens
if (currentTokenCount + turnTokens <= remainingTokens) {
result.unshift(...turn)
currentTokenCount += turnTokens
} else if (result.length === 0) {
logger.warn('Single message exceeds remaining context window, including anyway', {
messageTokens,
logger.warn('Single turn exceeds remaining context window, including anyway', {
turnTokens,
remainingTokens,
systemTokenCount,
messageRole: message.role,
turnMessages: turn.length,
})
result.unshift(message)
currentTokenCount += messageTokens
result.unshift(...turn)
currentTokenCount += turnTokens
break
} else {
logger.info('Auto-trimmed conversation history to fit context window', {
originalMessages: conversationMessages.length,
trimmedMessages: result.length,
originalTurns: turns.length,
trimmedTurns: turns.length - i - 1,
conversationTokens: currentTokenCount,
systemTokens: systemTokenCount,
totalTokens: currentTokenCount + systemTokenCount,
Expand All @@ -372,6 +466,7 @@ export class Memory {
}
}

// No need to remove orphaned messages - turns are already complete
return [...systemMessages, ...result]
}

Expand Down Expand Up @@ -638,7 +733,7 @@ export class Memory {
/**
* Validate inputs to prevent malicious data or performance issues
*/
private validateInputs(conversationId?: string, content?: string): void {
private validateInputs(conversationId?: string, content?: string | null): void {
if (conversationId) {
if (conversationId.length > 255) {
throw new Error('Conversation ID too long (max 255 characters)')
Expand Down
Loading
Loading