Skip to content

Commit 32eed94

Browse files
committed
Remove queues when closed
1 parent 4f0931c commit 32eed94

File tree

2 files changed

+65
-9
lines changed

2 files changed

+65
-9
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ static EventQueueBuilder builder() {
4949
public static class EventQueueBuilder {
5050
private int queueSize = DEFAULT_QUEUE_SIZE;
5151
private EventEnqueueHook hook;
52+
private String taskId;
53+
private Runnable onCloseCallback;
5254

5355
public EventQueueBuilder queueSize(int queueSize) {
5456
this.queueSize = queueSize;
@@ -60,9 +62,19 @@ public EventQueueBuilder hook(EventEnqueueHook hook) {
6062
return this;
6163
}
6264

65+
public EventQueueBuilder taskId(String taskId) {
66+
this.taskId = taskId;
67+
return this;
68+
}
69+
70+
public EventQueueBuilder onClose(Runnable onCloseCallback) {
71+
this.onCloseCallback = onCloseCallback;
72+
return this;
73+
}
74+
6375
public EventQueue build() {
64-
if (hook != null) {
65-
return new MainQueue(queueSize, hook);
76+
if (hook != null || onCloseCallback != null) {
77+
return new MainQueue(queueSize, hook, taskId, onCloseCallback);
6678
} else {
6779
return new MainQueue(queueSize);
6880
}
@@ -154,11 +166,11 @@ public boolean isClosed() {
154166
return closed;
155167
}
156168

157-
public void doClose() {
169+
protected void doClose() {
158170
doClose(false);
159171
}
160172

161-
public void doClose(boolean immediate) {
173+
protected void doClose(boolean immediate) {
162174
synchronized (this) {
163175
if (closed) {
164176
return;
@@ -180,25 +192,43 @@ static class MainQueue extends EventQueue {
180192
private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
181193
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
182194
private final EventEnqueueHook enqueueHook;
195+
private final String taskId;
196+
private final Runnable onCloseCallback;
183197

184198
MainQueue() {
185199
super();
186200
this.enqueueHook = null;
201+
this.taskId = null;
202+
this.onCloseCallback = null;
187203
}
188204

189205
MainQueue(int queueSize) {
190206
super(queueSize);
191207
this.enqueueHook = null;
208+
this.taskId = null;
209+
this.onCloseCallback = null;
192210
}
193211

194212
MainQueue(EventEnqueueHook hook) {
195213
super();
196214
this.enqueueHook = hook;
215+
this.taskId = null;
216+
this.onCloseCallback = null;
197217
}
198218

199219
MainQueue(int queueSize, EventEnqueueHook hook) {
200220
super(queueSize);
201221
this.enqueueHook = hook;
222+
this.taskId = null;
223+
this.onCloseCallback = null;
224+
}
225+
226+
MainQueue(int queueSize, EventEnqueueHook hook, String taskId, Runnable onCloseCallback) {
227+
super(queueSize);
228+
this.enqueueHook = hook;
229+
this.taskId = taskId;
230+
this.onCloseCallback = onCloseCallback;
231+
LOGGER.debug("Created MainQueue for task {} with onClose callback: {}", taskId, onCloseCallback != null);
202232
}
203233

204234
EventQueue tap() {
@@ -254,6 +284,20 @@ public int getActiveChildCount() {
254284
return children.size();
255285
}
256286

287+
@Override
288+
protected void doClose(boolean immediate) {
289+
super.doClose(immediate);
290+
// Invoke callback after closing to notify QueueManager
291+
if (onCloseCallback != null) {
292+
LOGGER.debug("Invoking onClose callback for task {}", taskId);
293+
try {
294+
onCloseCallback.run();
295+
} catch (Exception e) {
296+
LOGGER.error("Error in onClose callback for task {}", taskId, e);
297+
}
298+
}
299+
}
300+
257301
@Override
258302
public void close() {
259303
close(false);

server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public void close(String taskId) {
5454

5555
@Override
5656
public EventQueue createOrTap(String taskId) {
57+
LOGGER.info("createOrTap called for task {}, current map size: {}", taskId, queues.size());
5758
EventQueue existing = queues.get(taskId);
5859

5960
// Lazy cleanup: remove closed queues from map
@@ -76,8 +77,8 @@ public EventQueue createOrTap(String taskId) {
7677
EventQueue result = main.tap(); // Always return ChildQueue
7778

7879
if (existing == null) {
79-
LOGGER.debug("Created new MainQueue {} for task {}, returning ChildQueue {}",
80-
System.identityHashCode(main), taskId, System.identityHashCode(result));
80+
LOGGER.info("Created new MainQueue {} for task {}, returning ChildQueue {} (map size: {})",
81+
System.identityHashCode(main), taskId, System.identityHashCode(result), queues.size());
8182
} else {
8283
LOGGER.debug("Tapped existing MainQueue {} -> ChildQueue {} for task {}",
8384
System.identityHashCode(main), System.identityHashCode(result), taskId);
@@ -104,11 +105,22 @@ public int getActiveChildQueueCount(String taskId) {
104105
return -1;
105106
}
106107

107-
private static class DefaultEventQueueFactory implements EventQueueFactory {
108+
private class DefaultEventQueueFactory implements EventQueueFactory {
108109
@Override
109110
public EventQueue.EventQueueBuilder builder(String taskId) {
110-
// Default implementation doesn't need task-specific configuration
111-
return EventQueue.builder();
111+
// Return builder with callback that removes queue from map when closed
112+
return EventQueue.builder()
113+
.taskId(taskId)
114+
.onClose(() -> {
115+
LOGGER.debug("Queue close callback invoked for task {}, removing from map", taskId);
116+
EventQueue removed = queues.remove(taskId);
117+
if (removed != null) {
118+
LOGGER.info("Removed closed queue for task {} from QueueManager (map size: {})",
119+
taskId, queues.size());
120+
} else {
121+
LOGGER.debug("Queue for task {} was already removed from map", taskId);
122+
}
123+
});
112124
}
113125
}
114126
}

0 commit comments

Comments
 (0)