Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion go/adk/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,18 @@ func main() {
logger.Info("Memory service enabled", "appName", appName)
}

runnerConfig, subagentSessionIDs, err := runnerpkg.CreateRunnerConfig(ctx, agentConfig, sessionService, appName, memoryService)
runnerConfig, subagentSessionIDs, compactionCfg, err := runnerpkg.CreateRunnerConfig(ctx, agentConfig, sessionService, appName, memoryService)
if err != nil {
logger.Error(err, "Failed to create Google ADK Runner config")
os.Exit(1)
}
if compactionCfg != nil {
logger.Info("Compaction enabled",
"interval", compactionCfg.CompactionInterval,
"overlap", compactionCfg.OverlapSize,
"tokenThreshold", compactionCfg.TokenThreshold,
)
}

stream := agentConfig.GetStream()
executor := a2a.NewKAgentExecutor(a2a.KAgentExecutorConfig{
Expand All @@ -192,6 +199,7 @@ func main() {
Stream: stream,
AppName: appName,
Logger: logger,
CompactionConfig: compactionCfg,
})

// Build the agent card.
Expand Down
18 changes: 17 additions & 1 deletion go/adk/pkg/a2a/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/a2aproject/a2a-go/a2asrv/eventqueue"
"github.com/go-logr/logr"
"github.com/kagent-dev/kagent/go/adk/pkg/auth"
"github.com/kagent-dev/kagent/go/adk/pkg/compaction"
"github.com/kagent-dev/kagent/go/adk/pkg/models"
"github.com/kagent-dev/kagent/go/adk/pkg/session"
"github.com/kagent-dev/kagent/go/adk/pkg/skills"
Expand All @@ -37,6 +38,7 @@ type KAgentExecutorConfig struct {
AppName string
SkillsDirectory string
Logger logr.Logger
CompactionConfig *compaction.Config
}

// KAgentExecutor implements a2asrv.AgentExecutor
Expand All @@ -48,6 +50,7 @@ type KAgentExecutor struct {
appName string
skillsDirectory string
logger logr.Logger
compactor *compaction.Compactor
}

var _ a2asrv.AgentExecutor = (*KAgentExecutor)(nil)
Expand All @@ -69,6 +72,7 @@ func NewKAgentExecutor(cfg KAgentExecutorConfig) *KAgentExecutor {
appName: cfg.AppName,
skillsDirectory: skillsDir,
logger: cfg.Logger.WithName("kagent-executor"),
compactor: compaction.New(cfg.CompactionConfig, cfg.Logger),
}
}

Expand Down Expand Up @@ -355,7 +359,19 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
}
}

// 11. Emit final event.
// 11. Post-invocation compaction (best-effort; never fails the request).
if e.compactor != nil && e.sessionService != nil && runErr == nil {
liveSess, sessErr := e.sessionService.GetSession(ctx, e.appName, userID, sessionID)
if sessErr != nil {
e.logger.V(1).Info("Compaction skipped: could not fetch session", "error", sessErr)
} else if liveSess != nil {
if compactErr := e.compactor.MaybeCompact(ctx, liveSess, e.sessionService, 0); compactErr != nil {
e.logger.Error(compactErr, "compaction failed (continuing)")
}
}
}

// 12. Emit final event.
finalMeta := maps.Clone(baseMeta)
if invocationID != "" {
finalMeta[adka2a.ToA2AMetaKey("invocation_id")] = invocationID
Expand Down
Loading
Loading