@@ -486,7 +486,7 @@ public void testNonStreamingMethodWithAcceptHeader() throws Exception {
486486 testGetTask (MediaType .APPLICATION_JSON );
487487 }
488488
489-
489+
490490
491491 @ Test
492492 public void testSendMessageStreamExistingTaskSuccess () throws Exception {
@@ -504,32 +504,32 @@ public void testSendMessageStreamExistingTaskSuccess() throws Exception {
504504
505505 // Replace the native HttpClient with A2AClient's sendStreamingMessage method.
506506 client .sendStreamingMessage (
507- "1" ,
508- messageSendParams ,
509- // eventHandler
510- (streamingEvent ) -> {
511- try {
512- if (streamingEvent instanceof Message ) {
513- messageResponseRef .set ((Message ) streamingEvent );
507+ "1" ,
508+ messageSendParams ,
509+ // eventHandler
510+ (streamingEvent ) -> {
511+ try {
512+ if (streamingEvent instanceof Message ) {
513+ messageResponseRef .set ((Message ) streamingEvent );
514+ latch .countDown ();
515+ }
516+ } catch (Exception e ) {
517+ errorRef .set (e );
514518 latch .countDown ();
515519 }
516- } catch (Exception e ) {
517- errorRef .set (e );
520+ },
521+ // errorHandler
522+ (jsonRpcError ) -> {
523+ errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
524+ latch .countDown ();
525+ },
526+ // failureHandler
527+ () -> {
528+ if (errorRef .get () == null ) {
529+ errorRef .set (new RuntimeException ("Stream processing failed" ));
530+ }
518531 latch .countDown ();
519532 }
520- },
521- // errorHandler
522- (jsonRpcError ) -> {
523- errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
524- latch .countDown ();
525- },
526- // failureHandler
527- () -> {
528- if (errorRef .get () == null ) {
529- errorRef .set (new RuntimeException ("Stream processing failed" ));
530- }
531- latch .countDown ();
532- }
533533 );
534534
535535 boolean dataRead = latch .await (20 , TimeUnit .SECONDS );
@@ -577,39 +577,39 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
577577
578578 // Use A2AClient-like resubscribeToTask Method
579579 client .resubscribeToTask (
580- "1" , // requestId
581- taskIdParams ,
582- // eventHandler
583- (streamingEvent ) -> {
584- try {
585- if (streamingEvent instanceof TaskArtifactUpdateEvent ) {
586- if (taskResubscriptionResponseReceived .getCount () == 2 ) {
587- firstResponse .set ((TaskArtifactUpdateEvent ) streamingEvent );
588- } else {
589- secondResponse .set ((TaskStatusUpdateEvent ) streamingEvent );
580+ "1" , // requestId
581+ taskIdParams ,
582+ // eventHandler
583+ (streamingEvent ) -> {
584+ try {
585+ if (streamingEvent instanceof TaskArtifactUpdateEvent ) {
586+ if (taskResubscriptionResponseReceived .getCount () == 2 ) {
587+ firstResponse .set ((TaskArtifactUpdateEvent ) streamingEvent );
588+ } else {
589+ secondResponse .set ((TaskStatusUpdateEvent ) streamingEvent );
590+ }
591+ taskResubscriptionResponseReceived .countDown ();
590592 }
593+ } catch (Exception e ) {
594+ errorRef .set (e );
591595 taskResubscriptionResponseReceived .countDown ();
596+ taskResubscriptionResponseReceived .countDown (); // Make sure the counter is zeroed
597+ }
598+ },
599+ // errorHandler
600+ (jsonRpcError ) -> {
601+ errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
602+ taskResubscriptionResponseReceived .countDown ();
603+ taskResubscriptionResponseReceived .countDown (); // Make sure the counter is zeroed
604+ },
605+ // failureHandler
606+ () -> {
607+ if (errorRef .get () == null ) {
608+ errorRef .set (new RuntimeException ("Stream processing failed" ));
592609 }
593- } catch (Exception e ) {
594- errorRef .set (e );
595610 taskResubscriptionResponseReceived .countDown ();
596611 taskResubscriptionResponseReceived .countDown (); // Make sure the counter is zeroed
597612 }
598- },
599- // errorHandler
600- (jsonRpcError ) -> {
601- errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
602- taskResubscriptionResponseReceived .countDown ();
603- taskResubscriptionResponseReceived .countDown (); // Make sure the counter is zeroed
604- },
605- // failureHandler
606- () -> {
607- if (errorRef .get () == null ) {
608- errorRef .set (new RuntimeException ("Stream processing failed" ));
609- }
610- taskResubscriptionResponseReceived .countDown ();
611- taskResubscriptionResponseReceived .countDown (); // Make sure the counter is zeroed
612- }
613613 );
614614
615615 try {
@@ -675,26 +675,26 @@ public void testResubscribeNoExistingTaskError() throws Exception {
675675
676676 // Use A2AClient-like resubscribeToTask Method
677677 client .resubscribeToTask (
678- "1" , // requestId
679- taskIdParams ,
680- // eventHandler
681- (streamingEvent ) -> {
682- // Do not expect to receive any success events, as the task does not exist
683- errorRef .set (new RuntimeException ("Unexpected event received for non-existent task" ));
684- latch .countDown ();
685- },
686- // errorHandler
687- (jsonRpcError ) -> {
688- jsonRpcErrorRef .set (jsonRpcError );
689- latch .countDown ();
690- },
691- // failureHandler
692- () -> {
693- if (errorRef .get () == null && jsonRpcErrorRef .get () == null ) {
694- errorRef .set (new RuntimeException ("Expected error for non-existent task" ));
678+ "1" , // requestId
679+ taskIdParams ,
680+ // eventHandler
681+ (streamingEvent ) -> {
682+ // Do not expect to receive any success events, as the task does not exist
683+ errorRef .set (new RuntimeException ("Unexpected event received for non-existent task" ));
684+ latch .countDown ();
685+ },
686+ // errorHandler
687+ (jsonRpcError ) -> {
688+ jsonRpcErrorRef .set (jsonRpcError );
689+ latch .countDown ();
690+ },
691+ // failureHandler
692+ () -> {
693+ if (errorRef .get () == null && jsonRpcErrorRef .get () == null ) {
694+ errorRef .set (new RuntimeException ("Expected error for non-existent task" ));
695+ }
696+ latch .countDown ();
695697 }
696- latch .countDown ();
697- }
698698 );
699699
700700 boolean dataRead = latch .await (20 , TimeUnit .SECONDS );
@@ -730,32 +730,32 @@ private void testSendStreamingMessage(String mediaType) throws Exception {
730730
731731 // Using A2AClient's sendStreamingMessage method
732732 client .sendStreamingMessage (
733- "1" , // requestId
734- messageSendParams ,
735- // eventHandler
736- (streamingEvent ) -> {
737- try {
738- if (streamingEvent instanceof Message ) {
739- messageResponseRef .set ((Message ) streamingEvent );
733+ "1" , // requestId
734+ messageSendParams ,
735+ // eventHandler
736+ (streamingEvent ) -> {
737+ try {
738+ if (streamingEvent instanceof Message ) {
739+ messageResponseRef .set ((Message ) streamingEvent );
740+ latch .countDown ();
741+ }
742+ } catch (Exception e ) {
743+ errorRef .set (e );
740744 latch .countDown ();
741745 }
742- } catch (Exception e ) {
743- errorRef .set (e );
746+ },
747+ // errorHandler
748+ (jsonRpcError ) -> {
749+ errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
750+ latch .countDown ();
751+ },
752+ // failureHandler
753+ () -> {
754+ if (errorRef .get () == null ) {
755+ errorRef .set (new RuntimeException ("Stream processing failed" ));
756+ }
744757 latch .countDown ();
745758 }
746- },
747- // errorHandler
748- (jsonRpcError ) -> {
749- errorRef .set (new RuntimeException ("JSON-RPC Error: " + jsonRpcError .getMessage ()));
750- latch .countDown ();
751- },
752- // failureHandler
753- () -> {
754- if (errorRef .get () == null ) {
755- errorRef .set (new RuntimeException ("Stream processing failed" ));
756- }
757- latch .countDown ();
758- }
759759 );
760760
761761 boolean dataRead = latch .await (20 , TimeUnit .SECONDS );
0 commit comments