Skip to content

Commit 61b06be

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Integrate event compaction in Java ADK runner
This change integrates the event compaction mechanism into the ADK Runner. The Contents class is updated to filter out events that fall within the timestamp range of a compaction event, replacing them with the compacted summary. The Runner now uses a SlidingWindowEventCompactor to periodically summarize events based on the provided EventsCompactionConfig. Tests are added to verify the compaction logic in both Contents and Runner. PiperOrigin-RevId: 851559226
1 parent b48b194 commit 61b06be

File tree

7 files changed

+343
-13
lines changed

7 files changed

+343
-13
lines changed

core/src/main/java/com/google/adk/flows/llmflows/Contents.java

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import com.google.adk.agents.InvocationContext;
2424
import com.google.adk.agents.LlmAgent;
2525
import com.google.adk.events.Event;
26+
import com.google.adk.events.EventCompaction;
2627
import com.google.adk.models.LlmRequest;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.common.collect.Iterables;
30+
import com.google.common.collect.Lists;
2931
import com.google.genai.types.Content;
3032
import com.google.genai.types.FunctionCall;
3133
import com.google.genai.types.FunctionResponse;
@@ -36,6 +38,7 @@
3638
import java.util.HashMap;
3739
import java.util.HashSet;
3840
import java.util.List;
41+
import java.util.ListIterator;
3942
import java.util.Map;
4043
import java.util.Optional;
4144
import java.util.Set;
@@ -99,24 +102,23 @@ private ImmutableList<Content> getCurrentTurnContents(
99102
private ImmutableList<Content> getContents(
100103
Optional<String> currentBranch, List<Event> events, String agentName, String modelName) {
101104
List<Event> filteredEvents = new ArrayList<>();
105+
boolean hasCompactEvent = false;
102106

103107
// Filter the events, leaving the contents and the function calls and responses from the current
104108
// agent.
105109
for (Event event : events) {
106-
// Skip events without content, or generated neither by user nor by model or has empty text.
107-
// E.g. events purely for mutating session states.
108-
if (event.content().isEmpty()) {
110+
if (event.actions().compaction().isPresent()) {
111+
// Always include the compaction event
112+
hasCompactEvent = true;
113+
filteredEvents.add(event);
109114
continue;
110115
}
111-
var content = event.content().get();
112-
if (content.role().isEmpty()
113-
|| content.role().get().isEmpty()
114-
|| content.parts().isEmpty()
115-
|| content.parts().get().isEmpty()
116-
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false)) {
116+
117+
// Skip events without content, or generated neither by user nor by model or has empty text.
118+
// E.g. events purely for mutating session states.
119+
if (isEmptyContent(event)) {
117120
continue;
118121
}
119-
120122
if (!isEventBelongsToBranch(currentBranch, event)) {
121123
continue;
122124
}
@@ -133,6 +135,10 @@ private ImmutableList<Content> getContents(
133135
}
134136
}
135137

138+
if (hasCompactEvent) {
139+
filteredEvents = processCompactionEvent(filteredEvents);
140+
}
141+
136142
List<Event> resultEvents = rearrangeEventsForLatestFunctionResponse(filteredEvents);
137143
resultEvents = rearrangeEventsForAsyncFunctionResponsesInHistory(resultEvents, modelName);
138144

@@ -142,6 +148,67 @@ private ImmutableList<Content> getContents(
142148
.collect(toImmutableList());
143149
}
144150

151+
/**
152+
* Check if an event has missing or empty content.
153+
*
154+
* <p>This can happen to the events that only changed session state. When both content and
155+
* transcriptions are empty, the event will be considered as empty. The content is considered
156+
* empty if none of its parts contain text, inline data, file data, function call, or function
157+
* response. Parts with only thoughts are also considered empty.
158+
*
159+
* @param event the event to check.
160+
* @return {@code true} if the event is considered to have empty content, {@code false} otherwise.
161+
*/
162+
private boolean isEmptyContent(Event event) {
163+
if (event.content().isEmpty()) {
164+
return true;
165+
}
166+
var content = event.content().get();
167+
return (content.role().isEmpty()
168+
|| content.role().get().isEmpty()
169+
|| content.parts().isEmpty()
170+
|| content.parts().get().isEmpty()
171+
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false));
172+
}
173+
174+
/**
175+
* Filters events that are covered by compaction events.
176+
*
177+
* @param events the list of event to filter.
178+
* @return a new list with compaction applied.
179+
*/
180+
private List<Event> processCompactionEvent(List<Event> events) {
181+
List<Event> result = new ArrayList<>();
182+
ListIterator<Event> iter = events.listIterator(events.size());
183+
Long lastCompactionStartTime = null;
184+
185+
while (iter.hasPrevious()) {
186+
Event event = iter.previous();
187+
EventCompaction compaction = event.actions().compaction().orElse(null);
188+
if (compaction == null) {
189+
if (lastCompactionStartTime == null || event.timestamp() < lastCompactionStartTime) {
190+
result.add(event);
191+
}
192+
continue;
193+
}
194+
// Create a new event for the compaction event in the result.
195+
result.add(
196+
Event.builder()
197+
.timestamp(compaction.endTimestamp())
198+
.author("model")
199+
.content(compaction.compactedContent())
200+
.branch(event.branch())
201+
.invocationId(event.invocationId())
202+
.actions(event.actions())
203+
.build());
204+
lastCompactionStartTime =
205+
lastCompactionStartTime == null
206+
? compaction.startTimestamp()
207+
: Long.min(lastCompactionStartTime, compaction.startTimestamp());
208+
}
209+
return Lists.reverse(result);
210+
}
211+
145212
/** Whether the event is a reply from another agent. */
146213
private static boolean isOtherAgentReply(String agentName, Event event) {
147214
return !agentName.isEmpty()

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@
2929
import com.google.adk.events.EventActions;
3030
import com.google.adk.flows.llmflows.ResumabilityConfig;
3131
import com.google.adk.memory.BaseMemoryService;
32+
import com.google.adk.models.Model;
3233
import com.google.adk.plugins.BasePlugin;
3334
import com.google.adk.plugins.PluginManager;
3435
import com.google.adk.sessions.BaseSessionService;
3536
import com.google.adk.sessions.InMemorySessionService;
3637
import com.google.adk.sessions.Session;
38+
import com.google.adk.summarizer.EventsCompactionConfig;
39+
import com.google.adk.summarizer.LlmEventSummarizer;
40+
import com.google.adk.summarizer.SlidingWindowEventCompactor;
3741
import com.google.adk.tools.BaseTool;
3842
import com.google.adk.tools.FunctionTool;
3943
import com.google.adk.utils.CollectionUtils;
@@ -68,6 +72,7 @@ public class Runner {
6872
@Nullable private final BaseMemoryService memoryService;
6973
private final PluginManager pluginManager;
7074
private final ResumabilityConfig resumabilityConfig;
75+
@Nullable private final EventsCompactionConfig eventsCompactionConfig;
7176

7277
/** Builder for {@link Runner}. */
7378
public static class Builder {
@@ -78,6 +83,7 @@ public static class Builder {
7883
@Nullable private BaseMemoryService memoryService = null;
7984
private List<BasePlugin> plugins = ImmutableList.of();
8085
private ResumabilityConfig resumabilityConfig = new ResumabilityConfig();
86+
@Nullable private EventsCompactionConfig eventsCompactionConfig;
8187

8288
@CanIgnoreReturnValue
8389
public Builder agent(BaseAgent agent) {
@@ -121,6 +127,12 @@ public Builder resumabilityConfig(ResumabilityConfig resumabilityConfig) {
121127
return this;
122128
}
123129

130+
@CanIgnoreReturnValue
131+
public Builder eventsCompactionConfig(EventsCompactionConfig eventsCompactionConfig) {
132+
this.eventsCompactionConfig = eventsCompactionConfig;
133+
return this;
134+
}
135+
124136
public Runner build() {
125137
if (agent == null) {
126138
throw new IllegalStateException("Agent must be provided.");
@@ -141,7 +153,8 @@ public Runner build() {
141153
sessionService,
142154
memoryService,
143155
plugins,
144-
resumabilityConfig);
156+
resumabilityConfig,
157+
eventsCompactionConfig);
145158
}
146159
}
147160

@@ -208,13 +221,43 @@ public Runner(
208221
@Nullable BaseMemoryService memoryService,
209222
List<BasePlugin> plugins,
210223
ResumabilityConfig resumabilityConfig) {
224+
this(
225+
agent,
226+
appName,
227+
artifactService,
228+
sessionService,
229+
memoryService,
230+
plugins,
231+
resumabilityConfig,
232+
null);
233+
}
234+
235+
/**
236+
* Creates a new {@code Runner} with a list of plugins and resumability config.
237+
*
238+
* @deprecated Use {@link Runner.Builder} instead.
239+
*/
240+
@Deprecated
241+
protected Runner(
242+
BaseAgent agent,
243+
String appName,
244+
BaseArtifactService artifactService,
245+
BaseSessionService sessionService,
246+
@Nullable BaseMemoryService memoryService,
247+
List<BasePlugin> plugins,
248+
ResumabilityConfig resumabilityConfig,
249+
@Nullable EventsCompactionConfig eventsCompactionConfig) {
211250
this.agent = agent;
212251
this.appName = appName;
213252
this.artifactService = artifactService;
214253
this.sessionService = sessionService;
215254
this.memoryService = memoryService;
216255
this.pluginManager = new PluginManager(plugins);
217256
this.resumabilityConfig = resumabilityConfig;
257+
this.eventsCompactionConfig =
258+
Optional.ofNullable(eventsCompactionConfig)
259+
.map(c -> populateBaseEventSummarizer(agent, c))
260+
.orElse(null);
218261
}
219262

220263
/**
@@ -491,7 +534,10 @@ public Flowable<Event> runAsync(
491534
Completable.defer(
492535
() ->
493536
pluginManager.runAfterRunCallback(
494-
contextWithUpdatedSession)));
537+
contextWithUpdatedSession)))
538+
.concatWith(
539+
Completable.defer(
540+
() -> compactEvents(updatedSession)));
495541
});
496542
}))
497543
.doOnError(
@@ -507,6 +553,28 @@ public Flowable<Event> runAsync(
507553
}
508554
}
509555

556+
private Completable compactEvents(Session session) {
557+
EventsCompactionConfig config = eventsCompactionConfig;
558+
if (config == null) {
559+
return Completable.complete();
560+
}
561+
config =
562+
new EventsCompactionConfig(
563+
config.compactionInterval(),
564+
config.overlapSize(),
565+
config
566+
.summarizer()
567+
.or(
568+
() ->
569+
Optional.of(agent())
570+
.filter(LlmAgent.class::isInstance)
571+
.map(LlmAgent.class::cast)
572+
.map(LlmAgent::resolvedModel)
573+
.flatMap(Model::model)
574+
.map(LlmEventSummarizer::new)));
575+
return new SlidingWindowEventCompactor(config).compact(session, sessionService);
576+
}
577+
510578
private void copySessionStates(Session source, Session target) {
511579
// TODO: remove this hack when deprecating all runAsync with Session.
512580
for (var entry : source.state().entrySet()) {
@@ -761,5 +829,27 @@ private boolean hasLiveRequestQueueParameter(FunctionTool functionTool) {
761829
.anyMatch(parameter -> parameter.getType().equals(LiveRequestQueue.class));
762830
}
763831

832+
/**
833+
* Creates a new {@link EventsCompactionConfig} based on the given configuration. If the {@link
834+
* com.google.adk.summarizer.BaseEventSummarizer} is missing, it will be default to the {@link
835+
* LlmEventSummarizer} using the same model as the LLM base agent.
836+
*/
837+
private static EventsCompactionConfig populateBaseEventSummarizer(
838+
BaseAgent agent, EventsCompactionConfig config) {
839+
return new EventsCompactionConfig(
840+
config.compactionInterval(),
841+
config.overlapSize(),
842+
config
843+
.summarizer()
844+
.or(
845+
() ->
846+
Optional.of(agent)
847+
.filter(LlmAgent.class::isInstance)
848+
.map(LlmAgent.class::cast)
849+
.map(LlmAgent::resolvedModel)
850+
.flatMap(Model::model)
851+
.map(LlmEventSummarizer::new)));
852+
}
853+
764854
// TODO: run statelessly
765855
}

core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
import java.util.List;
1313
import java.util.ListIterator;
1414
import java.util.Set;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517

1618
/**
1719
* This class performs events compaction in a sliding window fashion based on the {@link
1820
* EventsCompactionConfig}.
1921
*/
2022
public final class SlidingWindowEventCompactor implements EventCompactor {
2123

24+
private static final Logger logger = LoggerFactory.getLogger(SlidingWindowEventCompactor.class);
25+
2226
private final EventsCompactionConfig config;
2327
private final BaseEventSummarizer summarizer;
2428

@@ -80,6 +84,8 @@ public SlidingWindowEventCompactor(EventsCompactionConfig config) {
8084
*/
8185
@Override
8286
public Completable compact(Session session, BaseSessionService sessionService) {
87+
logger.debug("Running event compaction for session {}", session.id());
88+
8389
return Completable.fromMaybe(
8490
getCompactionEvents(session)
8591
.flatMap(summarizer::summarizeEvents)

0 commit comments

Comments
 (0)