Skip to content

Commit 57a9e27

Browse files
committed
feat: Rework Event Queues to use central event bus
1 parent cb084ec commit 57a9e27

File tree

22 files changed

+891
-257
lines changed

22 files changed

+891
-257
lines changed

extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.a2a.server.events.EventQueueFactory;
1818
import io.a2a.server.events.EventQueueItem;
1919
import io.a2a.server.events.InMemoryQueueManager;
20+
import io.a2a.server.events.MainEventBus;
2021
import io.a2a.server.events.QueueManager;
2122

2223
@ApplicationScoped
@@ -32,10 +33,12 @@ public class ReplicatedQueueManager implements QueueManager {
3233
private TaskStateProvider taskStateProvider;
3334

3435
@Inject
35-
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
36+
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
37+
TaskStateProvider taskStateProvider,
38+
MainEventBus mainEventBus) {
3639
this.replicationStrategy = replicationStrategy;
3740
this.taskStateProvider = taskStateProvider;
38-
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
41+
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
3942
}
4043

4144

@@ -139,12 +142,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
139142
// which sends the QueueClosedEvent after the database transaction commits.
140143
// This ensures proper ordering and transactional guarantees.
141144

142-
// Return the builder with callbacks
143-
return delegate.getEventQueueBuilder(taskId)
144-
.taskId(taskId)
145-
.hook(new ReplicationHook(taskId))
146-
.addOnCloseCallback(delegate.getCleanupCallback(taskId))
147-
.taskStateProvider(taskStateProvider);
145+
// Call createBaseEventQueueBuilder() directly to avoid infinite recursion
146+
// (getEventQueueBuilder() would delegate back to this factory, creating a loop)
147+
// The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
148+
return delegate.createBaseEventQueueBuilder(taskId)
149+
.hook(new ReplicationHook(taskId));
148150
}
149151
}
150152

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
import io.a2a.server.events.EventQueueClosedException;
2222
import io.a2a.server.events.EventQueueItem;
2323
import io.a2a.server.events.EventQueueTestHelper;
24+
import io.a2a.server.events.EventQueueUtil;
25+
import io.a2a.server.events.MainEventBus;
26+
import io.a2a.server.events.MainEventBusProcessor;
2427
import io.a2a.server.events.QueueClosedEvent;
28+
import io.a2a.server.tasks.InMemoryTaskStore;
29+
import io.a2a.server.tasks.PushNotificationSender;
2530
import io.a2a.spec.Event;
2631
import io.a2a.spec.StreamingEventKind;
2732
import io.a2a.spec.TaskState;
@@ -35,10 +40,24 @@ class ReplicatedQueueManagerTest {
3540

3641
private ReplicatedQueueManager queueManager;
3742
private StreamingEventKind testEvent;
43+
private MainEventBus mainEventBus;
44+
private MainEventBusProcessor mainEventBusProcessor;
45+
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
3846

3947
@BeforeEach
4048
void setUp() {
41-
queueManager = new ReplicatedQueueManager(new NoOpReplicationStrategy(), new MockTaskStateProvider(true));
49+
// Create MainEventBus and MainEventBusProcessor for tests
50+
InMemoryTaskStore taskStore = new InMemoryTaskStore();
51+
mainEventBus = new MainEventBus();
52+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
53+
EventQueueUtil.start(mainEventBusProcessor);
54+
55+
queueManager = new ReplicatedQueueManager(
56+
new NoOpReplicationStrategy(),
57+
new MockTaskStateProvider(true),
58+
mainEventBus
59+
);
60+
4261
testEvent = new TaskStatusUpdateEvent.Builder()
4362
.taskId("test-task")
4463
.contextId("test-context")
@@ -50,7 +69,7 @@ void setUp() {
5069
@Test
5170
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
5271
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
53-
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
72+
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
5473

5574
String taskId = "test-task-1";
5675
EventQueue queue = queueManager.createOrTap(taskId);
@@ -65,7 +84,7 @@ void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedExcepti
6584
@Test
6685
void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedException {
6786
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
68-
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
87+
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
6988

7089
String taskId = "test-task-2";
7190
EventQueue queue = queueManager.createOrTap(taskId);
@@ -79,7 +98,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx
7998
@Test
8099
void testReplicationStrategyWithCountingImplementation() throws InterruptedException {
81100
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
82-
queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
101+
queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
83102

84103
String taskId = "test-task-3";
85104
EventQueue queue = queueManager.createOrTap(taskId);
@@ -170,7 +189,7 @@ void testBasicQueueManagerFunctionality() throws InterruptedException {
170189
void testQueueToTaskIdMappingMaintained() throws InterruptedException {
171190
String taskId = "test-task-6";
172191
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
173-
queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
192+
queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
174193

175194
EventQueue queue = queueManager.createOrTap(taskId);
176195
queue.enqueueEvent(testEvent);
@@ -217,7 +236,7 @@ void testReplicatedEventJsonSerialization() throws Exception {
217236
@Test
218237
void testParallelReplicationBehavior() throws InterruptedException {
219238
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
220-
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
239+
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
221240

222241
String taskId = "parallel-test-task";
223242
EventQueue queue = queueManager.createOrTap(taskId);
@@ -297,7 +316,7 @@ void testParallelReplicationBehavior() throws InterruptedException {
297316
void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
298317
// Create a task state provider that returns false (task is inactive)
299318
MockTaskStateProvider stateProvider = new MockTaskStateProvider(false);
300-
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
319+
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
301320

302321
String taskId = "inactive-task";
303322

@@ -316,7 +335,7 @@ void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
316335
void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
317336
// Create a task state provider that returns true (task is active)
318337
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
319-
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
338+
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
320339

321340
String taskId = "active-task";
322341

@@ -347,7 +366,7 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
347366
void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws InterruptedException {
348367
// Create a task state provider that returns true initially
349368
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
350-
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
369+
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
351370

352371
String taskId = "task-becomes-inactive";
353372

@@ -387,7 +406,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup
387406
@Test
388407
void testPoisonPillSentViaTransactionAwareEvent() throws InterruptedException {
389408
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
390-
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
409+
queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
391410

392411
String taskId = "poison-pill-test";
393412
EventQueue queue = queueManager.createOrTap(taskId);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.a2a.server.events;
2+
3+
public class EventQueueUtil {
4+
// Since EventQueue.builder() is package protected, add a method to expose it
5+
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
6+
return EventQueue.builder();
7+
}
8+
9+
public static void start(MainEventBusProcessor processor) {
10+
processor.start();
11+
}
12+
13+
public static void stop(MainEventBusProcessor processor) {
14+
processor.stop();
15+
}
16+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
11
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient
2+
3+
# Debug logging for event processing and request handling
4+
quarkus.log.category."io.a2a.server.events".level=DEBUG
5+
quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG
6+
quarkus.log.category."io.a2a.server.tasks".level=DEBUG

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public static class EventQueueBuilder {
5353
private String taskId;
5454
private List<Runnable> onCloseCallbacks = new java.util.ArrayList<>();
5555
private TaskStateProvider taskStateProvider;
56+
private MainEventBus mainEventBus;
5657

5758
public EventQueueBuilder queueSize(int queueSize) {
5859
this.queueSize = queueSize;
@@ -81,9 +82,14 @@ public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider)
8182
return this;
8283
}
8384

85+
public EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
86+
this.mainEventBus = mainEventBus;
87+
return this;
88+
}
89+
8490
public EventQueue build() {
85-
if (hook != null || !onCloseCallbacks.isEmpty() || taskStateProvider != null) {
86-
return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider);
91+
if (hook != null || !onCloseCallbacks.isEmpty() || taskStateProvider != null || mainEventBus != null) {
92+
return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider, mainEventBus);
8793
} else {
8894
return new MainQueue(queueSize);
8995
}
@@ -222,13 +228,15 @@ static class MainQueue extends EventQueue {
222228
private final String taskId;
223229
private final List<Runnable> onCloseCallbacks;
224230
private final TaskStateProvider taskStateProvider;
231+
private final MainEventBus mainEventBus;
225232

226233
MainQueue() {
227234
super();
228235
this.enqueueHook = null;
229236
this.taskId = null;
230237
this.onCloseCallbacks = List.of();
231238
this.taskStateProvider = null;
239+
this.mainEventBus = null;
232240
}
233241

234242
MainQueue(int queueSize) {
@@ -237,6 +245,7 @@ static class MainQueue extends EventQueue {
237245
this.taskId = null;
238246
this.onCloseCallbacks = List.of();
239247
this.taskStateProvider = null;
248+
this.mainEventBus = null;
240249
}
241250

242251
MainQueue(EventEnqueueHook hook) {
@@ -245,6 +254,7 @@ static class MainQueue extends EventQueue {
245254
this.taskId = null;
246255
this.onCloseCallbacks = List.of();
247256
this.taskStateProvider = null;
257+
this.mainEventBus = null;
248258
}
249259

250260
MainQueue(int queueSize, EventEnqueueHook hook) {
@@ -253,6 +263,7 @@ static class MainQueue extends EventQueue {
253263
this.taskId = null;
254264
this.onCloseCallbacks = List.of();
255265
this.taskStateProvider = null;
266+
this.mainEventBus = null;
256267
}
257268

258269
MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List<Runnable> onCloseCallbacks, TaskStateProvider taskStateProvider) {
@@ -261,16 +272,36 @@ static class MainQueue extends EventQueue {
261272
this.taskId = taskId;
262273
this.onCloseCallbacks = List.copyOf(onCloseCallbacks); // Defensive copy
263274
this.taskStateProvider = taskStateProvider;
275+
this.mainEventBus = null;
264276
LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks and TaskStateProvider: {}",
265277
taskId, onCloseCallbacks.size(), taskStateProvider != null);
266278
}
267279

280+
MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List<Runnable> onCloseCallbacks, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
281+
super(queueSize);
282+
this.enqueueHook = hook;
283+
this.taskId = taskId;
284+
this.onCloseCallbacks = List.copyOf(onCloseCallbacks); // Defensive copy
285+
this.taskStateProvider = taskStateProvider;
286+
this.mainEventBus = mainEventBus;
287+
LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus: {}",
288+
taskId, onCloseCallbacks.size(), taskStateProvider != null, mainEventBus != null);
289+
}
290+
268291
public EventQueue tap() {
269292
ChildQueue child = new ChildQueue(this);
270293
children.add(child);
271294
return child;
272295
}
273296

297+
/**
298+
* Returns the current number of child queues.
299+
* Useful for debugging and logging event distribution.
300+
*/
301+
public int getChildCount() {
302+
return children.size();
303+
}
304+
274305
@Override
275306
public void enqueueItem(EventQueueItem item) {
276307
// MainQueue must accept events even when closed to support:
@@ -293,10 +324,15 @@ public void enqueueItem(EventQueueItem item) {
293324
queue.add(item);
294325
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
295326

296-
// Distribute to all ChildQueues (they will receive the event even if MainQueue is closed)
297-
children.forEach(eq -> eq.internalEnqueueItem(item));
327+
// Submit to MainEventBus for centralized persistence + distribution
328+
if (mainEventBus != null && taskId != null) {
329+
mainEventBus.submit(taskId, this, item);
330+
} else {
331+
// This should not happen in properly configured systems
332+
LOGGER.error("MainEventBus not configured for task {} - events will NOT be distributed to children!", taskId);
333+
}
298334

299-
// Trigger replication hook if configured
335+
// Trigger replication hook if configured (KEEP for inter-process replication)
300336
if (enqueueHook != null) {
301337
enqueueHook.onEnqueue(item);
302338
}
@@ -350,6 +386,29 @@ void childClosing(ChildQueue child, boolean immediate) {
350386
this.doClose(immediate);
351387
}
352388

389+
/**
390+
* Distribute event to all ChildQueues.
391+
* Called by MainEventBusProcessor after TaskStore persistence.
392+
*/
393+
void distributeToChildren(EventQueueItem item) {
394+
synchronized (children) {
395+
int childCount = children.size();
396+
if (LOGGER.isDebugEnabled()) {
397+
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
398+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
399+
}
400+
children.forEach(child -> {
401+
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
402+
taskId, item.getEvent().getClass().getSimpleName());
403+
child.internalEnqueueItem(item);
404+
});
405+
if (LOGGER.isDebugEnabled()) {
406+
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
407+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
408+
}
409+
}
410+
}
411+
353412
/**
354413
* Get the count of active child queues.
355414
* Used for testing to verify reference counting mechanism.

0 commit comments

Comments
 (0)