Skip to content

Commit fcc02ec

Browse files
authored
fix: Store push notification config for newly created tasks (#483)
This was missing in the non-streaming message sends Follows up on #383 and #482 The TCK is not testing this, but it is introduced in a2aproject/a2a-tck#94
1 parent 8ecb546 commit fcc02ec

File tree

2 files changed

+166
-0
lines changed

2 files changed

+166
-0
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
306306
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
307307
// during the consumption loop itself.
308308
kind = etai.eventType();
309+
310+
// Store push notification config for newly created tasks (mirrors streaming logic)
311+
// Only for NEW tasks - existing tasks are handled by initMessageSend()
312+
if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) {
313+
LOGGER.debug("Storing push notification config for new task {}", createdTask.getId());
314+
pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig());
315+
}
316+
309317
if (blocking && interruptedOrNonBlocking) {
310318
// For blocking calls: ensure all events are processed before returning
311319
// Order of operations is critical to avoid circular dependency:

server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
import static org.junit.jupiter.api.Assertions.assertNotNull;
55
import static org.junit.jupiter.api.Assertions.assertTrue;
66

7+
import java.util.ArrayList;
78
import java.util.List;
89
import java.util.Map;
910
import java.util.Set;
11+
import java.util.concurrent.ConcurrentHashMap;
1012
import java.util.concurrent.CountDownLatch;
1113
import java.util.concurrent.Executors;
1214
import java.util.concurrent.TimeUnit;
@@ -18,12 +20,14 @@
1820
import io.a2a.server.auth.UnauthenticatedUser;
1921
import io.a2a.server.events.EventQueue;
2022
import io.a2a.server.events.InMemoryQueueManager;
23+
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
2124
import io.a2a.server.tasks.InMemoryTaskStore;
2225
import io.a2a.server.tasks.TaskUpdater;
2326
import io.a2a.spec.JSONRPCError;
2427
import io.a2a.spec.Message;
2528
import io.a2a.spec.MessageSendConfiguration;
2629
import io.a2a.spec.MessageSendParams;
30+
import io.a2a.spec.PushNotificationConfig;
2731
import io.a2a.spec.Task;
2832
import io.a2a.spec.TaskState;
2933
import io.a2a.spec.TaskStatus;
@@ -794,6 +798,160 @@ void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception {
794798
returnedTask.getArtifacts().size());
795799
}
796800

801+
/**
802+
* Test that pushNotificationConfig from SendMessageConfiguration is stored for NEW tasks
803+
* in non-streaming (blocking) mode. This reproduces the bug from issue #84.
804+
*
805+
* Expected behavior:
806+
* 1. Client sends message with pushNotificationConfig in SendMessageConfiguration
807+
* 2. Agent creates a new task
808+
* 3. pushNotificationConfig should be stored in PushNotificationConfigStore
809+
* 4. Config should be retrievable via getInfo()
810+
*/
811+
@Test
812+
@Timeout(10)
813+
void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exception {
814+
String taskId = "push-config-blocking-new-task";
815+
String contextId = "push-config-ctx";
816+
817+
// Create test config store
818+
InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
819+
820+
// Re-create request handler with pushConfigStore
821+
requestHandler = DefaultRequestHandler.create(
822+
agentExecutor,
823+
taskStore,
824+
queueManager,
825+
pushConfigStore, // Add push config store
826+
null, // pushSender
827+
Executors.newCachedThreadPool()
828+
);
829+
830+
// Create push notification config
831+
PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
832+
.id("config-1")
833+
.url("https://example.com/webhook")
834+
.token("test-token-123")
835+
.build();
836+
837+
// Create message with pushNotificationConfig
838+
Message message = new Message.Builder()
839+
.messageId("msg-push-config")
840+
.role(Message.Role.USER)
841+
.parts(new TextPart("test message"))
842+
.taskId(taskId)
843+
.contextId(contextId)
844+
.build();
845+
846+
MessageSendConfiguration config = new MessageSendConfiguration.Builder()
847+
.blocking(true)
848+
.pushNotificationConfig(pushConfig)
849+
.build();
850+
851+
MessageSendParams params = new MessageSendParams(message, config, null);
852+
853+
// Agent creates a new task
854+
agentExecutor.setExecuteCallback((context, queue) -> {
855+
TaskUpdater updater = new TaskUpdater(context, queue);
856+
updater.submit(); // Creates new task in SUBMITTED state
857+
updater.complete();
858+
});
859+
860+
// Call blocking onMessageSend
861+
Object result = requestHandler.onMessageSend(params, serverCallContext);
862+
863+
// Verify result is a task
864+
assertTrue(result instanceof Task, "Result should be a Task");
865+
Task returnedTask = (Task) result;
866+
assertEquals(taskId, returnedTask.getId());
867+
868+
// THE KEY ASSERTION: Verify pushNotificationConfig was stored
869+
List<PushNotificationConfig> storedConfigs = pushConfigStore.getInfo(taskId);
870+
assertNotNull(storedConfigs, "Push notification config should be stored for new task");
871+
assertEquals(1, storedConfigs.size(),
872+
"Should have exactly 1 push config stored");
873+
assertEquals("config-1", storedConfigs.get(0).id());
874+
assertEquals("https://example.com/webhook", storedConfigs.get(0).url());
875+
}
876+
877+
/**
878+
* Test that pushNotificationConfig is stored for EXISTING tasks.
879+
* This verifies the initMessageSend logic works correctly.
880+
*/
881+
@Test
882+
@Timeout(10)
883+
void testMessageStoresPushNotificationConfigForExistingTask() throws Exception {
884+
String taskId = "push-config-existing-task";
885+
String contextId = "push-config-existing-ctx";
886+
887+
// Create test config store
888+
InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
889+
890+
// Re-create request handler with pushConfigStore
891+
requestHandler = DefaultRequestHandler.create(
892+
agentExecutor,
893+
taskStore,
894+
queueManager,
895+
pushConfigStore, // Add push config store
896+
null, // pushSender
897+
Executors.newCachedThreadPool()
898+
);
899+
900+
// Create EXISTING task in store
901+
Task existingTask = new Task.Builder()
902+
.id(taskId)
903+
.contextId(contextId)
904+
.status(new TaskStatus(TaskState.WORKING))
905+
.build();
906+
taskStore.save(existingTask);
907+
908+
// Create push notification config
909+
PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
910+
.id("config-existing-1")
911+
.url("https://example.com/existing-webhook")
912+
.token("existing-token-789")
913+
.build();
914+
915+
Message message = new Message.Builder()
916+
.messageId("msg-push-existing")
917+
.role(Message.Role.USER)
918+
.parts(new TextPart("update existing task"))
919+
.taskId(taskId)
920+
.contextId(contextId)
921+
.build();
922+
923+
MessageSendConfiguration config = new MessageSendConfiguration.Builder()
924+
.blocking(true)
925+
.pushNotificationConfig(pushConfig)
926+
.build();
927+
928+
MessageSendParams params = new MessageSendParams(message, config, null);
929+
930+
// Agent updates the existing task
931+
agentExecutor.setExecuteCallback((context, queue) -> {
932+
TaskUpdater updater = new TaskUpdater(context, queue);
933+
updater.addArtifact(
934+
List.of(new TextPart("update artifact", null)),
935+
"artifact-1", "Update", null);
936+
updater.complete();
937+
});
938+
939+
// Call blocking onMessageSend
940+
Object result = requestHandler.onMessageSend(params, serverCallContext);
941+
942+
// Verify result
943+
assertTrue(result instanceof Task, "Result should be a Task");
944+
945+
// Verify pushNotificationConfig was stored (initMessageSend path)
946+
List<PushNotificationConfig> storedConfigs = pushConfigStore.getInfo(taskId);
947+
assertNotNull(storedConfigs,
948+
"Push notification config should be stored for existing task");
949+
assertEquals(1, storedConfigs.size(),
950+
"Should have exactly 1 push config stored");
951+
assertEquals("config-existing-1", storedConfigs.get(0).id());
952+
assertEquals("https://example.com/existing-webhook", storedConfigs.get(0).url());
953+
}
954+
797955
/**
798956
* Simple test agent executor that allows controlling execution timing
799957
*/

0 commit comments

Comments
 (0)