diff --git a/internal/flow/processor/content.go b/internal/flow/processor/content.go index d8a83eb4d..1369e2ef8 100644 --- a/internal/flow/processor/content.go +++ b/internal/flow/processor/content.go @@ -183,6 +183,10 @@ func (p *ContentRequestProcessor) ProcessRequest( } } messages = p.getIncrementMessages(invocation, summaryUpdatedAt) + // When summary is present, ensure messages start with user role to comply with API requirements. + if !summaryUpdatedAt.IsZero() { + messages = p.trimLeadingNonUserMessages(messages) + } req.Messages = append(req.Messages, messages...) needToAddInvocationMessage = summaryUpdatedAt.IsZero() && len(messages) == 0 } @@ -328,13 +332,33 @@ func (p *ContentRequestProcessor) mergeUserMessages( return merged } +// trimLeadingNonUserMessages removes leading non-user messages from the slice. +// This ensures that when summary is present, the incremental messages start with +// a user message to comply with API requirements (system messages must be followed by user). +// +// Note: Under normal operation, summary is only triggered after final assistant responses +// (not user messages, tool calls, or tool results), so the incremental messages should +// already start with a user message. This function serves as a defensive measure for +// edge cases or potential future changes. +func (p *ContentRequestProcessor) trimLeadingNonUserMessages(messages []model.Message) []model.Message { + for i, msg := range messages { + if msg.Role == model.RoleUser { + return messages[i:] + } + } + // No user message found, return empty slice. + return nil +} + func (p *ContentRequestProcessor) shouldIncludeEvent(evt event.Event, inv *agent.Invocation, filter string, isZeroTime bool, since time.Time) bool { if evt.Response == nil || evt.IsPartial || !evt.IsValidContent() { return false } - if !isZeroTime && evt.Timestamp.Before(since) { + // Use !After instead of Before to exclude events at exactly the summary boundary. + // This prevents the last summarized event from appearing in both the summary and incremental messages. + if !isZeroTime && !evt.Timestamp.After(since) { return false } diff --git a/runner/runner.go b/runner/runner.go index 27afdd43a..0d19da37a 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -316,7 +316,13 @@ func (r *runner) handleEventPersistence( return } - // Trigger summarization immediately after appending a qualifying event. + // Trigger summarization only after final assistant responses. + // Skip user messages, tool calls, and tool results to ensure summary + // always contains complete Q&A pairs (including tool call round-trips). + if agentEvent.IsUserMessage() || agentEvent.IsToolCallResponse() || agentEvent.IsToolResultResponse() { + return + } + // Use EnqueueSummaryJob for true asynchronous processing. // Prefer filter-specific summarization to avoid scanning all filters. if err := r.sessionService.EnqueueSummaryJob(