Skip to content

Commit 54c826c

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Integrate event compaction in Java ADK runner
This change integrates the event compaction mechanism into the ADK Runner. The Contents class is updated to filter out events that fall within the timestamp range of a compaction event, replacing them with the compacted summary. The Runner now uses a SlidingWindowEventCompactor to periodically summarize events based on the provided EventsCompactionConfig. Tests are added to verify the compaction logic in both Contents and Runner. PiperOrigin-RevId: 854310265
1 parent dace210 commit 54c826c

File tree

7 files changed

+361
-15
lines changed

7 files changed

+361
-15
lines changed

core/src/main/java/com/google/adk/flows/llmflows/Contents.java

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import com.google.adk.agents.InvocationContext;
2424
import com.google.adk.agents.LlmAgent;
2525
import com.google.adk.events.Event;
26+
import com.google.adk.events.EventCompaction;
2627
import com.google.adk.models.LlmRequest;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.common.collect.Iterables;
30+
import com.google.common.collect.Lists;
2931
import com.google.genai.types.Content;
3032
import com.google.genai.types.FunctionCall;
3133
import com.google.genai.types.FunctionResponse;
@@ -36,6 +38,7 @@
3638
import java.util.HashMap;
3739
import java.util.HashSet;
3840
import java.util.List;
41+
import java.util.ListIterator;
3942
import java.util.Map;
4043
import java.util.Optional;
4144
import java.util.Set;
@@ -99,24 +102,25 @@ private ImmutableList<Content> getCurrentTurnContents(
99102
private ImmutableList<Content> getContents(
100103
Optional<String> currentBranch, List<Event> events, String agentName, String modelName) {
101104
List<Event> filteredEvents = new ArrayList<>();
105+
boolean hasCompactEvent = false;
102106

103107
// Filter the events, leaving the contents and the function calls and responses from the current
104108
// agent.
105109
for (Event event : events) {
106-
// Skip events without content, or generated neither by user nor by model or has empty text.
107-
// E.g. events purely for mutating session states.
108-
if (event.content().isEmpty()) {
110+
if (event.actions().compaction().isPresent()) {
111+
// Always include the compaction event for the later processCompactionEvent call.
112+
// The compaction event is used to filter out normal events that are covered by the
113+
// compaction event.
114+
hasCompactEvent = true;
115+
filteredEvents.add(event);
109116
continue;
110117
}
111-
var content = event.content().get();
112-
if (content.role().isEmpty()
113-
|| content.role().get().isEmpty()
114-
|| content.parts().isEmpty()
115-
|| content.parts().get().isEmpty()
116-
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false)) {
118+
119+
// Skip events without content, or generated neither by user nor by model or has empty text.
120+
// E.g. events purely for mutating session states.
121+
if (isEmptyContent(event)) {
117122
continue;
118123
}
119-
120124
if (!isEventBelongsToBranch(currentBranch, event)) {
121125
continue;
122126
}
@@ -133,6 +137,10 @@ private ImmutableList<Content> getContents(
133137
}
134138
}
135139

140+
if (hasCompactEvent) {
141+
filteredEvents = processCompactionEvent(filteredEvents);
142+
}
143+
136144
List<Event> resultEvents = rearrangeEventsForLatestFunctionResponse(filteredEvents);
137145
resultEvents = rearrangeEventsForAsyncFunctionResponsesInHistory(resultEvents, modelName);
138146

@@ -142,6 +150,93 @@ private ImmutableList<Content> getContents(
142150
.collect(toImmutableList());
143151
}
144152

153+
/**
154+
* Check if an event has missing or empty content.
155+
*
156+
* <p>This can happen to the events that only changed session state. When both content and
157+
* transcriptions are empty, the event will be considered as empty. The content is considered
158+
* empty if none of its parts contain text, inline data, file data, function call, or function
159+
* response. Parts with only thoughts are also considered empty.
160+
*
161+
* @param event the event to check.
162+
* @return {@code true} if the event is considered to have empty content, {@code false} otherwise.
163+
*/
164+
private boolean isEmptyContent(Event event) {
165+
if (event.content().isEmpty()) {
166+
return true;
167+
}
168+
var content = event.content().get();
169+
return (content.role().isEmpty()
170+
|| content.role().get().isEmpty()
171+
|| content.parts().isEmpty()
172+
|| content.parts().get().isEmpty()
173+
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false));
174+
}
175+
176+
/**
177+
* Filters events that are covered by compaction events by identifying compacted ranges and
178+
* filters out events that are covered by compaction summaries
179+
*
180+
* <p>Example of input
181+
*
182+
* <pre>
183+
* [
184+
* event_1(timestamp=1),
185+
* event_2(timestamp=2),
186+
* compaction_1(event_1, event_2, timestamp=3, content=summary_1_2, startTime=1, endTime=2),
187+
* event_3(timestamp=4),
188+
* compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=3),
189+
* event_4(timestamp=6)
190+
* ]
191+
* </pre>
192+
*
193+
* Will result in the following events output
194+
*
195+
* <pre>
196+
* [
197+
* compaction_1,
198+
* compaction_2
199+
* event_4
200+
* ]
201+
* </pre>
202+
*
203+
* Compaction events are always strictly in order based on event timestamp.
204+
*
205+
* @param events the list of event to filter.
206+
* @return a new list with compaction applied.
207+
*/
208+
private List<Event> processCompactionEvent(List<Event> events) {
209+
List<Event> result = new ArrayList<>();
210+
ListIterator<Event> iter = events.listIterator(events.size());
211+
Long lastCompactionStartTime = null;
212+
213+
while (iter.hasPrevious()) {
214+
Event event = iter.previous();
215+
EventCompaction compaction = event.actions().compaction().orElse(null);
216+
if (compaction == null) {
217+
if (lastCompactionStartTime == null || event.timestamp() < lastCompactionStartTime) {
218+
result.add(event);
219+
}
220+
continue;
221+
}
222+
// Create a new event for the compaction event in the result.
223+
result.add(
224+
Event.builder()
225+
.timestamp(compaction.endTimestamp())
226+
.author("model")
227+
.content(compaction.compactedContent())
228+
.branch(event.branch())
229+
.invocationId(event.invocationId())
230+
.actions(event.actions())
231+
.build());
232+
lastCompactionStartTime =
233+
lastCompactionStartTime == null
234+
? compaction.startTimestamp()
235+
: Long.min(lastCompactionStartTime, compaction.startTimestamp());
236+
}
237+
return Lists.reverse(result);
238+
}
239+
145240
/** Whether the event is a reply from another agent. */
146241
private static boolean isOtherAgentReply(String agentName, Event event) {
147242
return !agentName.isEmpty()

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@
2929
import com.google.adk.events.EventActions;
3030
import com.google.adk.flows.llmflows.ResumabilityConfig;
3131
import com.google.adk.memory.BaseMemoryService;
32+
import com.google.adk.models.Model;
3233
import com.google.adk.plugins.BasePlugin;
3334
import com.google.adk.plugins.PluginManager;
3435
import com.google.adk.sessions.BaseSessionService;
3536
import com.google.adk.sessions.InMemorySessionService;
3637
import com.google.adk.sessions.Session;
38+
import com.google.adk.summarizer.EventsCompactionConfig;
39+
import com.google.adk.summarizer.LlmEventSummarizer;
40+
import com.google.adk.summarizer.SlidingWindowEventCompactor;
3741
import com.google.adk.tools.BaseTool;
3842
import com.google.adk.tools.FunctionTool;
3943
import com.google.adk.utils.CollectionUtils;
@@ -68,6 +72,7 @@ public class Runner {
6872
@Nullable private final BaseMemoryService memoryService;
6973
private final PluginManager pluginManager;
7074
private final ResumabilityConfig resumabilityConfig;
75+
@Nullable private final EventsCompactionConfig eventsCompactionConfig;
7176

7277
/** Builder for {@link Runner}. */
7378
public static class Builder {
@@ -78,6 +83,7 @@ public static class Builder {
7883
@Nullable private BaseMemoryService memoryService = null;
7984
private List<BasePlugin> plugins = ImmutableList.of();
8085
private ResumabilityConfig resumabilityConfig = new ResumabilityConfig();
86+
@Nullable private EventsCompactionConfig eventsCompactionConfig;
8187

8288
@CanIgnoreReturnValue
8389
public Builder agent(BaseAgent agent) {
@@ -121,6 +127,12 @@ public Builder resumabilityConfig(ResumabilityConfig resumabilityConfig) {
121127
return this;
122128
}
123129

130+
@CanIgnoreReturnValue
131+
public Builder eventsCompactionConfig(EventsCompactionConfig eventsCompactionConfig) {
132+
this.eventsCompactionConfig = eventsCompactionConfig;
133+
return this;
134+
}
135+
124136
public Runner build() {
125137
if (agent == null) {
126138
throw new IllegalStateException("Agent must be provided.");
@@ -141,7 +153,8 @@ public Runner build() {
141153
sessionService,
142154
memoryService,
143155
plugins,
144-
resumabilityConfig);
156+
resumabilityConfig,
157+
eventsCompactionConfig);
145158
}
146159
}
147160

@@ -208,13 +221,43 @@ public Runner(
208221
@Nullable BaseMemoryService memoryService,
209222
List<BasePlugin> plugins,
210223
ResumabilityConfig resumabilityConfig) {
224+
this(
225+
agent,
226+
appName,
227+
artifactService,
228+
sessionService,
229+
memoryService,
230+
plugins,
231+
resumabilityConfig,
232+
null);
233+
}
234+
235+
/**
236+
* Creates a new {@code Runner} with a list of plugins and resumability config.
237+
*
238+
* @deprecated Use {@link Runner.Builder} instead.
239+
*/
240+
@Deprecated
241+
protected Runner(
242+
BaseAgent agent,
243+
String appName,
244+
BaseArtifactService artifactService,
245+
BaseSessionService sessionService,
246+
@Nullable BaseMemoryService memoryService,
247+
List<BasePlugin> plugins,
248+
ResumabilityConfig resumabilityConfig,
249+
@Nullable EventsCompactionConfig eventsCompactionConfig) {
211250
this.agent = agent;
212251
this.appName = appName;
213252
this.artifactService = artifactService;
214253
this.sessionService = sessionService;
215254
this.memoryService = memoryService;
216255
this.pluginManager = new PluginManager(plugins);
217256
this.resumabilityConfig = resumabilityConfig;
257+
this.eventsCompactionConfig =
258+
Optional.ofNullable(eventsCompactionConfig)
259+
.map(c -> createEventsCompactionConfig(agent, c))
260+
.orElse(null);
218261
}
219262

220263
/**
@@ -493,7 +536,10 @@ public Flowable<Event> runAsync(
493536
Completable.defer(
494537
() ->
495538
pluginManager.runAfterRunCallback(
496-
contextWithUpdatedSession)));
539+
contextWithUpdatedSession)))
540+
.concatWith(
541+
Completable.defer(
542+
() -> compactEvents(updatedSession)));
497543
});
498544
}))
499545
.doOnError(
@@ -509,6 +555,13 @@ public Flowable<Event> runAsync(
509555
}
510556
}
511557

558+
private Completable compactEvents(Session session) {
559+
return Optional.ofNullable(eventsCompactionConfig)
560+
.map(SlidingWindowEventCompactor::new)
561+
.map(c -> c.compact(session, sessionService))
562+
.orElse(Completable.complete());
563+
}
564+
512565
private void copySessionStates(Session source, Session target) {
513566
// TODO: remove this hack when deprecating all runAsync with Session.
514567
for (var entry : source.state().entrySet()) {
@@ -740,5 +793,27 @@ private boolean hasLiveRequestQueueParameter(FunctionTool functionTool) {
740793
.anyMatch(parameter -> parameter.getType().equals(LiveRequestQueue.class));
741794
}
742795

796+
/**
797+
* Creates a new {@link EventsCompactionConfig} based on the given configuration. If the {@link
798+
* com.google.adk.summarizer.BaseEventSummarizer} is missing, it will be default to the {@link
799+
* LlmEventSummarizer} using the same model as the LLM base agent.
800+
*/
801+
private static EventsCompactionConfig createEventsCompactionConfig(
802+
BaseAgent agent, EventsCompactionConfig config) {
803+
return new EventsCompactionConfig(
804+
config.compactionInterval(),
805+
config.overlapSize(),
806+
config
807+
.summarizer()
808+
.or(
809+
() ->
810+
Optional.of(agent)
811+
.filter(LlmAgent.class::isInstance)
812+
.map(LlmAgent.class::cast)
813+
.map(LlmAgent::resolvedModel)
814+
.flatMap(Model::model)
815+
.map(LlmEventSummarizer::new)));
816+
}
817+
743818
// TODO: run statelessly
744819
}

core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,27 @@
1212
import java.util.List;
1313
import java.util.ListIterator;
1414
import java.util.Set;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517

1618
/**
1719
* This class performs events compaction in a sliding window fashion based on the {@link
1820
* EventsCompactionConfig}.
1921
*/
2022
public final class SlidingWindowEventCompactor implements EventCompactor {
2123

24+
private static final Logger logger = LoggerFactory.getLogger(SlidingWindowEventCompactor.class);
25+
2226
private final EventsCompactionConfig config;
2327
private final BaseEventSummarizer summarizer;
2428

2529
public SlidingWindowEventCompactor(EventsCompactionConfig config) {
2630
this.config = config;
27-
// TODO default to LLM summarizer
28-
this.summarizer = config.summarizer().orElseThrow();
31+
this.summarizer =
32+
config
33+
.summarizer()
34+
.orElseThrow(
35+
() -> new IllegalArgumentException("Summarizer is required for event compaction."));
2936
}
3037

3138
/**
@@ -80,6 +87,8 @@ public SlidingWindowEventCompactor(EventsCompactionConfig config) {
8087
*/
8188
@Override
8289
public Completable compact(Session session, BaseSessionService sessionService) {
90+
logger.debug("Running event compaction for session {}", session.id());
91+
8392
return Completable.fromMaybe(
8493
getCompactionEvents(session)
8594
.flatMap(summarizer::summarizeEvents)

0 commit comments

Comments
 (0)