@@ -384,56 +384,13 @@ public void testGetAgentCard() throws A2AClientException {
384384 }
385385
386386 @ Test
387- public void testSendMessageStreamExistingTaskSuccess () throws Exception {
388- saveTaskInTaskStore (MINIMAL_TASK );
389- try {
390- Message message = new Message .Builder (MESSAGE )
391- .taskId (MINIMAL_TASK .getId ())
392- .contextId (MINIMAL_TASK .getContextId ())
393- .build ();
394-
395- CountDownLatch latch = new CountDownLatch (1 );
396- AtomicReference <Message > receivedMessage = new AtomicReference <>();
397- AtomicBoolean wasUnexpectedEvent = new AtomicBoolean (false );
398- AtomicReference <Throwable > errorRef = new AtomicReference <>();
399-
400- BiConsumer <ClientEvent , AgentCard > consumer = (event , agentCard ) -> {
401- if (event instanceof MessageEvent messageEvent ) {
402- if (latch .getCount () > 0 ) {
403- receivedMessage .set (messageEvent .getMessage ());
404- latch .countDown ();
405- } else {
406- wasUnexpectedEvent .set (true );
407- }
408- } else {
409- wasUnexpectedEvent .set (true );
410- }
411- };
412-
413- Consumer <Throwable > errorHandler = error -> {
414- errorRef .set (error );
415- latch .countDown ();
416- };
417-
418- // testing the streaming send message
419- getClient ().sendMessage (message , List .of (consumer ), errorHandler );
420-
421- assertTrue (latch .await (10 , TimeUnit .SECONDS ));
422- assertFalse (wasUnexpectedEvent .get ());
423- assertNull (errorRef .get ());
387+ public void testSendMessageStreamNewMessageSuccess () throws Exception {
388+ testSendStreamingMessage (false );
389+ }
424390
425- Message messageResponse = receivedMessage .get ();
426- assertNotNull (messageResponse );
427- assertEquals (MESSAGE .getMessageId (), messageResponse .getMessageId ());
428- assertEquals (MESSAGE .getRole (), messageResponse .getRole ());
429- Part <?> part = messageResponse .getParts ().get (0 );
430- assertEquals (Part .Kind .TEXT , part .getKind ());
431- assertEquals ("test message" , ((TextPart ) part ).getText ());
432- } catch (A2AClientException e ) {
433- fail ("Unexpected exception during sendMessage: " + e .getMessage (), e );
434- } finally {
435- deleteTaskInTaskStore (MINIMAL_TASK .getId ());
436- }
391+ @ Test
392+ public void testSendMessageStreamExistingTaskSuccess () throws Exception {
393+ testSendStreamingMessage (true );
437394 }
438395
439396 @ Test
@@ -955,19 +912,19 @@ public void testStreamingMethodWithAcceptHeader() throws Exception {
955912 assumeTrue (TransportProtocol .JSONRPC .asString ().equals (getTransportProtocol ()),
956913 "JSONRPC-specific test" );
957914
958- testSendStreamingMessage (MediaType .SERVER_SENT_EVENTS );
915+ testSendStreamingMessageWithHttpClient (MediaType .SERVER_SENT_EVENTS );
959916 }
960917
961918 @ Test
962- public void testSendMessageStreamNewMessageSuccess () throws Exception {
919+ public void testStreamingMethodWithoutAcceptHeader () throws Exception {
963920 // skip this test for non-JSONRPC transports
964921 assumeTrue (TransportProtocol .JSONRPC .asString ().equals (getTransportProtocol ()),
965922 "JSONRPC-specific test" );
966-
967- testSendStreamingMessage (null );
923+
924+ testSendStreamingMessageWithHttpClient (null );
968925 }
969926
970- private void testSendStreamingMessage (String mediaType ) throws Exception {
927+ private void testSendStreamingMessageWithHttpClient (String mediaType ) throws Exception {
971928 Message message = new Message .Builder (MESSAGE )
972929 .taskId (MINIMAL_TASK .getId ())
973930 .contextId (MINIMAL_TASK .getContextId ())
@@ -1017,6 +974,62 @@ private void testSendStreamingMessage(String mediaType) throws Exception {
1017974
1018975 }
1019976
977+ public void testSendStreamingMessage (boolean createTask ) throws Exception {
978+ if (createTask ) {
979+ saveTaskInTaskStore (MINIMAL_TASK );
980+ }
981+ try {
982+ Message message = new Message .Builder (MESSAGE )
983+ .taskId (MINIMAL_TASK .getId ())
984+ .contextId (MINIMAL_TASK .getContextId ())
985+ .build ();
986+
987+ CountDownLatch latch = new CountDownLatch (1 );
988+ AtomicReference <Message > receivedMessage = new AtomicReference <>();
989+ AtomicBoolean wasUnexpectedEvent = new AtomicBoolean (false );
990+ AtomicReference <Throwable > errorRef = new AtomicReference <>();
991+
992+ BiConsumer <ClientEvent , AgentCard > consumer = (event , agentCard ) -> {
993+ if (event instanceof MessageEvent messageEvent ) {
994+ if (latch .getCount () > 0 ) {
995+ receivedMessage .set (messageEvent .getMessage ());
996+ latch .countDown ();
997+ } else {
998+ wasUnexpectedEvent .set (true );
999+ }
1000+ } else {
1001+ wasUnexpectedEvent .set (true );
1002+ }
1003+ };
1004+
1005+ Consumer <Throwable > errorHandler = error -> {
1006+ errorRef .set (error );
1007+ latch .countDown ();
1008+ };
1009+
1010+ // testing the streaming send message
1011+ getClient ().sendMessage (message , List .of (consumer ), errorHandler );
1012+
1013+ assertTrue (latch .await (10 , TimeUnit .SECONDS ));
1014+ assertFalse (wasUnexpectedEvent .get ());
1015+ assertNull (errorRef .get ());
1016+
1017+ Message messageResponse = receivedMessage .get ();
1018+ assertNotNull (messageResponse );
1019+ assertEquals (MESSAGE .getMessageId (), messageResponse .getMessageId ());
1020+ assertEquals (MESSAGE .getRole (), messageResponse .getRole ());
1021+ Part <?> part = messageResponse .getParts ().get (0 );
1022+ assertEquals (Part .Kind .TEXT , part .getKind ());
1023+ assertEquals ("test message" , ((TextPart ) part ).getText ());
1024+ } catch (A2AClientException e ) {
1025+ fail ("Unexpected exception during sendMessage: " + e .getMessage (), e );
1026+ } finally {
1027+ if (createTask ) {
1028+ deleteTaskInTaskStore (MINIMAL_TASK .getId ());
1029+ }
1030+ }
1031+ }
1032+
10201033 private CompletableFuture <HttpResponse <Stream <String >>> initialiseStreamingRequest (
10211034 StreamingJSONRPCRequest <?> request , String mediaType ) throws Exception {
10221035
0 commit comments