Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pkg/runtime/compaction_trigger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package runtime

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/docker-agent/pkg/agent"
"github.com/docker/docker-agent/pkg/chat"
"github.com/docker/docker-agent/pkg/session"
"github.com/docker/docker-agent/pkg/team"
"github.com/docker/docker-agent/pkg/tools"
)

// TestCompactIfNeeded_IgnoresSubSessionTokens is a regression test for
// issue #2871: in a multi-agent run, the tokens accumulated inside a
// transfer_task sub-session were counted by the proactive compaction
// trigger (GetAllMessages recurses into sub-sessions) even though they
// never enter the parent's prompt (GetMessages skips sub-session items).
// The phantom tokens made the parent compact its own tiny conversation;
// with everything fitting the keep budget that meant "compact
// everything, keep nothing" — the agent's next prompt was just the
// summary and it halted with a confused "no conversation history" reply.
func TestCompactIfNeeded_IgnoresSubSessionTokens(t *testing.T) {
prov := &mockProvider{id: "test/model", stream: &mockStream{}}
root := agent.New("root", "agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm,
WithSessionCompaction(true),
WithModelStore(mockModelStoreWithLimit{limit: 100_000}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("build the app"))
messageCountBefore := len(sess.OwnMessages())

// Simulate a completed transfer_task tool call: a sub-session holding
// far more content than the parent's context limit, plus a small
// tool-result message on the parent itself.
sub := session.New(session.WithUserMessage("subtask"))
sub.AddMessage(session.NewAgentMessage("worker", &chat.Message{
Role: chat.MessageRoleAssistant,
Content: strings.Repeat("z", 600_000), // ~150k estimated tokens
}))
sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleAssistant,
ToolCalls: []tools.ToolCall{{ID: "t1", Function: tools.FunctionCall{Name: "transfer_task"}}},
}))
sess.AddSubSession(sub)
sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleTool,
ToolCallID: "t1",
Content: "subtask done",
}))

events := make(chan Event, 16)
rt.compactIfNeeded(t.Context(), sess, root, 100_000, messageCountBefore, NewChannelSink(events))
close(events)

for ev := range events {
_, isCompaction := ev.(*SessionCompactionEvent)
assert.False(t, isCompaction,
"sub-session tokens must not trigger compaction of the parent session")
}
}

// TestCompactIfNeeded_TriggersOnOwnMessages pins the complementary case:
// large tool results recorded directly on the session still trigger the
// proactive compaction.
func TestCompactIfNeeded_TriggersOnOwnMessages(t *testing.T) {
prov := &mockProvider{id: "test/model", stream: &mockStream{}}
root := agent.New("root", "agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm,
WithSessionCompaction(true),
WithModelStore(mockModelStoreWithLimit{limit: 100_000}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("build the app"))
messageCountBefore := len(sess.OwnMessages())

sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleAssistant,
ToolCalls: []tools.ToolCall{{ID: "t1", Function: tools.FunctionCall{Name: "shell"}}},
}))
sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleTool,
ToolCallID: "t1",
Content: strings.Repeat("z", 600_000), // ~150k estimated tokens
}))

events := make(chan Event, 16)
rt.compactIfNeeded(t.Context(), sess, root, 100_000, messageCountBefore, NewChannelSink(events))
close(events)

sawCompaction := false
for ev := range events {
if _, ok := ev.(*SessionCompactionEvent); ok {
sawCompaction = true
}
}
assert.True(t, sawCompaction, "large own tool results must still trigger compaction")
}
63 changes: 52 additions & 11 deletions pkg/runtime/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"

"github.com/docker/docker-agent/pkg/agent"
Expand All @@ -35,15 +36,43 @@ import (
// MaxSummaryTokens caps the summary's output length when using the
// default LLM strategy. Exposed because the runtime subtracts it from
// the model's context budget when deciding whether the model lookup
// produced a workable limit.
// produced a workable limit. For small context windows the effective
// cap is scaled down via [summaryTokenBudget] so the summary call
// never consumes more than a quarter of the window.
const MaxSummaryTokens = 16_000

// maxKeepTokens is the runtime's policy for how much recent
// conversation to preserve verbatim across a compaction. Messages
// fitting in this window are kept aside; the rest are the candidates
// to summarize.
// to summarize. For small context windows the effective budget is
// scaled down via [keepTokenBudget] so the kept tail never occupies
// more than a fifth of the window.
const maxKeepTokens = 20_000

// summaryTokenBudget returns the output-token cap for the summary
// call, scaled to the context window. The fixed [MaxSummaryTokens]
// cap works for large windows but exceeds small ones entirely (e.g. a
// local model with provider_opts.context_size of 8k), which used to
// leave no room for the conversation being summarized — the
// summarizer then received an empty conversation and produced a
// confused non-summary that wiped the session history.
func summaryTokenBudget(contextLimit int64) int64 {
return min(MaxSummaryTokens, contextLimit/4)
}

// keepTokenBudget returns the verbatim-keep budget for a compaction,
// scaled to the context window so that the kept tail plus the summary
// always leave the post-compaction session well under the compaction
// threshold. A non-positive contextLimit (hook-supplied summaries may
// run without a resolvable model definition) falls back to the
// unscaled policy.
func keepTokenBudget(contextLimit int64) int64 {
if contextLimit <= 0 {
return maxKeepTokens
}
return min(maxKeepTokens, contextLimit/5)
}

// Result is the structural outcome of running a compaction strategy.
// The runtime applies it to the parent session by appending a
// session.Item with FirstKeptEntry set, resetting the running
Expand Down Expand Up @@ -120,12 +149,24 @@ func RunLLM(ctx context.Context, args LLMArgs) (*Result, error) {

summaryModel := provider.CloneWithOptions(ctx, args.Agent.Model(ctx),
options.WithStructuredOutput(nil),
options.WithMaxTokens(MaxSummaryTokens),
options.WithMaxTokens(summaryTokenBudget(args.ContextLimit)),
)
compactionAgent := agent.New("root", "", agent.WithModel(summaryModel))

messages, firstKeptEntry := extractMessages(args.Session, compactionAgent, args.ContextLimit, args.AdditionalPrompt)

// The first and last entries are the synthesized compaction
// system/user prompts; anything between them is the conversation to
// summarize. Running the summarizer without a conversation would
// make it fabricate a "there is no history" reply that then
// REPLACES the real session history, so treat this as a no-op
// instead (the session is left untouched).
if len(messages) <= 2 {
slog.WarnContext(ctx, "Compaction skipped: no conversation messages fit the summarization budget",
"session_id", args.Session.ID, "context_limit", args.ContextLimit)
return nil, nil
}

compactionSession := session.New(
session.WithTitle("Generating summary"),
session.WithMessages(toItems(messages)),
Expand All @@ -150,12 +191,12 @@ func RunLLM(ctx context.Context, args LLMArgs) (*Result, error) {

// ComputeFirstKeptEntry returns the index in sess.Messages of the
// first message preserved verbatim after compaction, given the
// [maxKeepTokens] window. Used by the runtime when a hook supplies
// its own summary so the kept-tail policy stays consistent across
// the two strategies.
func ComputeFirstKeptEntry(sess *session.Session) int {
// [keepTokenBudget] window for contextLimit. Used by the runtime when
// a hook supplies its own summary so the kept-tail policy stays
// consistent across the two strategies.
func ComputeFirstKeptEntry(sess *session.Session, contextLimit int64) int {
messages, sessIndices := gatherCompactionInput(sess)
return firstKeptSessionIndex(sess, sessIndices, compaction.SplitIndexForKeep(messages, maxKeepTokens))
return firstKeptSessionIndex(sess, sessIndices, compaction.SplitIndexForKeep(messages, keepTokenBudget(contextLimit)))
}

// gatherCompactionInput is a thin wrapper around
Expand Down Expand Up @@ -197,12 +238,12 @@ func gatherCompactionInput(sess *session.Session) ([]chat.Message, []int) {
// a cache checkpoint or accrue duplicate cost.
//
// If the conversation tail itself doesn't fit in
// (contextLimit − MaxSummaryTokens − prompt-overhead), older messages
// (contextLimit − summary budget − prompt-overhead), older messages
// are dropped from the front of the to-compact list to make room.
func extractMessages(sess *session.Session, _ *agent.Agent, contextLimit int64, additionalPrompt string) ([]chat.Message, int) {
messages, sessIndices := gatherCompactionInput(sess)

splitIdx := compaction.SplitIndexForKeep(messages, maxKeepTokens)
splitIdx := compaction.SplitIndexForKeep(messages, keepTokenBudget(contextLimit))
firstKeptEntry := firstKeptSessionIndex(sess, sessIndices, splitIdx)
messages = messages[:splitIdx]

Expand All @@ -222,7 +263,7 @@ func extractMessages(sess *session.Session, _ *agent.Agent, contextLimit int64,
}

contextAvailable := max(int64(0),
contextLimit-MaxSummaryTokens-
contextLimit-summaryTokenBudget(contextLimit)-
compaction.EstimateMessageTokens(&systemPromptMessage)-
compaction.EstimateMessageTokens(&userPromptMessage))
firstIndex := compaction.FirstIndexInBudget(messages, contextAvailable)
Expand Down
103 changes: 96 additions & 7 deletions pkg/runtime/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compactor
import (
"context"
"errors"
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -76,15 +77,19 @@ func TestExtractMessages(t *testing.T) {
wantConversationMsgCount: 4,
},
{
name: "truncation when context limit is very small",
name: "older messages dropped when they exceed the summarization budget",
messages: []session.Item{
newMsg(chat.MessageRoleUser, "first message with lots of content that takes tokens"),
newMsg(chat.MessageRoleAssistant, "first response with lots of content that takes tokens"),
newMsg(chat.MessageRoleUser, strings.Repeat("a", 80_000)), // ~20k tokens
newMsg(chat.MessageRoleAssistant, strings.Repeat("b", 80_000)), // ~20k tokens
newMsg(chat.MessageRoleUser, "second message"),
newMsg(chat.MessageRoleAssistant, "second response"),
},
contextLimit: MaxSummaryTokens + 50,
wantConversationMsgCount: 0,
// The two small messages form the kept tail (keep budget
// 32k/5). Of the two ~20k-token compact candidates only the
// newest fits contextAvailable ≈ 0.75×32k − prompts ≈ 23.8k;
// the older one is dropped from the summarizer's input.
contextLimit: 32_000,
wantConversationMsgCount: 1,
},
{
name: "additional prompt is appended",
Expand Down Expand Up @@ -183,7 +188,7 @@ func TestComputeFirstKeptEntry(t *testing.T) {
t.Run("empty session returns 0", func(t *testing.T) {
t.Parallel()
sess := session.New()
assert.Equal(t, 0, ComputeFirstKeptEntry(sess))
assert.Equal(t, 0, ComputeFirstKeptEntry(sess, 100_000))
})

t.Run("short conversation: split at end (compact everything)", func(t *testing.T) {
Expand All @@ -193,7 +198,7 @@ func TestComputeFirstKeptEntry(t *testing.T) {
session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleUser, Content: "hi"}}),
session.NewMessageItem(&session.Message{Message: chat.Message{Role: chat.MessageRoleAssistant, Content: "hello"}}),
}))
assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess))
assert.Equal(t, len(sess.Messages), ComputeFirstKeptEntry(sess, 100_000))
})
}

Expand Down Expand Up @@ -345,6 +350,90 @@ func TestGatherCompactionInput_PriorSummaryWithoutFirstKeptEntry(t *testing.T) {
assert.Equal(t, []int{2, 3, 4}, sessIndices)
}

// TestRunLLM_SmallContextWindow is a regression test for issue #2871:
// with a small context window (e.g. a local model whose size comes from
// provider_opts.context_size), the fixed MaxSummaryTokens budget used to
// consume the whole window, so the summarizer received zero conversation
// messages, fabricated an "I see no conversation history" reply, and that
// text then replaced the entire session history. The budgets must scale
// with the window so the summarizer always sees real conversation.
func TestRunLLM_SmallContextWindow(t *testing.T) {
t.Parallel()

big := strings.Repeat("x", 4_000) // ~1k estimated tokens per tool result
sess := session.New(session.WithUserMessage("please do the big task"))
for i := range 8 {
id := fmt.Sprintf("tc%d", i)
sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleAssistant,
ToolCalls: []tools.ToolCall{{ID: id, Function: tools.FunctionCall{Name: "shell", Arguments: `{"cmd":"ls"}`}}},
}))
sess.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleTool,
ToolCallID: id,
Content: big,
}))
}
a := agent.New("root", "instr", agent.WithModel(fakeProvider{id: modelsdev.NewID("fake", "model")}))

var conversationCount int
result, err := RunLLM(t.Context(), LLMArgs{
Session: sess,
Agent: a,
ContextLimit: 8_192,
RunAgent: func(_ context.Context, _ *agent.Agent, cs *session.Session) error {
msgs := cs.GetAllMessages()
// All non-system messages minus the trailing compaction user
// prompt are the conversation handed to the summarizer.
conversationCount = len(msgs) - 1
cs.AddMessage(session.NewAgentMessage("root", &chat.Message{
Role: chat.MessageRoleAssistant,
Content: "the summary",
}))
return nil
},
})

require.NoError(t, err)
require.NotNil(t, result)
assert.Positive(t, conversationCount, "summarizer must receive conversation messages even on small context windows")
assert.Equal(t, "the summary", result.Summary)
assert.Less(t, result.FirstKeptEntry, len(sess.Messages), "a recent tail must be kept verbatim")
assert.Positive(t, result.FirstKeptEntry)
}

// TestRunLLM_NoConversationFits_NoOps pins the safety net behind the
// scaled budgets: when not a single conversation message fits the
// summarization budget (e.g. one giant tool result), RunLLM must no-op
// instead of running the summarizer on an empty conversation — the
// resulting non-summary would otherwise wipe the session history.
func TestRunLLM_NoConversationFits_NoOps(t *testing.T) {
t.Parallel()

sess := session.New(session.WithMessages([]session.Item{
session.NewMessageItem(&session.Message{Message: chat.Message{
Role: chat.MessageRoleUser,
Content: strings.Repeat("x", 200_000), // ~50k tokens, exceeds the whole window
}}),
}))
a := agent.New("root", "instr", agent.WithModel(fakeProvider{id: modelsdev.NewID("fake", "model")}))

runAgentCalled := false
result, err := RunLLM(t.Context(), LLMArgs{
Session: sess,
Agent: a,
ContextLimit: 8_192,
RunAgent: func(context.Context, *agent.Agent, *session.Session) error {
runAgentCalled = true
return nil
},
})

require.NoError(t, err)
assert.Nil(t, result, "compaction must be a no-op when nothing fits the budget")
assert.False(t, runAgentCalled, "the summarizer must not run on an empty conversation")
}

func TestRunLLM_DoesNotDuplicateSystemPrompt(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading