|
6 | 6 | import java.util.List; |
7 | 7 | import java.util.concurrent.CountDownLatch; |
8 | 8 | import java.util.concurrent.TimeUnit; |
| 9 | +import java.util.concurrent.atomic.AtomicBoolean; |
9 | 10 |
|
10 | 11 | import com.google.protobuf.Empty; |
11 | 12 | import com.google.protobuf.Struct; |
@@ -744,6 +745,72 @@ public void testOnGetAuthenticatedExtendedAgentCard() throws Exception { |
744 | 745 | // TODO - getting the authenticated extended agent card isn't support for gRPC right now |
745 | 746 | } |
746 | 747 |
|
| 748 | + @Test |
| 749 | + public void testStreamingDoesNotBlockMainThread() throws Exception { |
| 750 | + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); |
| 751 | + |
| 752 | + // Track if the main thread gets blocked during streaming |
| 753 | + AtomicBoolean eventReceived = new AtomicBoolean(false); |
| 754 | + CountDownLatch streamStarted = new CountDownLatch(1); |
| 755 | + CountDownLatch eventProcessed = new CountDownLatch(1); |
| 756 | + |
| 757 | + agentExecutorExecute = (context, eventQueue) -> { |
| 758 | + // Wait a bit to ensure the main thread continues |
| 759 | + try { |
| 760 | + Thread.sleep(100); |
| 761 | + } catch (InterruptedException e) { |
| 762 | + Thread.currentThread().interrupt(); |
| 763 | + } |
| 764 | + eventQueue.enqueueEvent(context.getMessage()); |
| 765 | + }; |
| 766 | + |
| 767 | + // Start streaming with a custom StreamObserver |
| 768 | + List<StreamResponse> results = new ArrayList<>(); |
| 769 | + List<Throwable> errors = new ArrayList<>(); |
| 770 | + StreamObserver<StreamResponse> streamObserver = new StreamObserver<>() { |
| 771 | + @Override |
| 772 | + public void onNext(StreamResponse streamResponse) { |
| 773 | + results.add(streamResponse); |
| 774 | + eventReceived.set(true); |
| 775 | + eventProcessed.countDown(); |
| 776 | + } |
| 777 | + |
| 778 | + @Override |
| 779 | + public void onError(Throwable throwable) { |
| 780 | + errors.add(throwable); |
| 781 | + eventProcessed.countDown(); |
| 782 | + } |
| 783 | + |
| 784 | + @Override |
| 785 | + public void onCompleted() { |
| 786 | + eventProcessed.countDown(); |
| 787 | + } |
| 788 | + }; |
| 789 | + |
| 790 | + sendStreamingMessageRequest(handler, streamObserver); |
| 791 | + streamStarted.countDown(); // Simulate subscription started |
| 792 | + |
| 793 | + // The main thread should not be blocked - we should be able to continue immediately |
| 794 | + Assertions.assertTrue(streamStarted.await(100, TimeUnit.MILLISECONDS), |
| 795 | + "Streaming subscription should start quickly without blocking main thread"); |
| 796 | + |
| 797 | + // This proves the main thread is not blocked - we can do other work |
| 798 | + long startTime = System.currentTimeMillis(); |
| 799 | + while (System.currentTimeMillis() - startTime < 50) { |
| 800 | + // Simulate main thread doing other work |
| 801 | + Thread.sleep(1); |
| 802 | + } |
| 803 | + |
| 804 | + // Wait for the actual event processing to complete |
| 805 | + Assertions.assertTrue(eventProcessed.await(2, TimeUnit.SECONDS), |
| 806 | + "Event should be processed within reasonable time"); |
| 807 | + |
| 808 | + // Verify we received the event and no errors occurred |
| 809 | + Assertions.assertTrue(eventReceived.get(), "Should have received streaming event"); |
| 810 | + Assertions.assertTrue(errors.isEmpty(), "Should not have any errors"); |
| 811 | + Assertions.assertEquals(1, results.size(), "Should have received exactly one event"); |
| 812 | + } |
| 813 | + |
747 | 814 | private StreamRecorder<SendMessageResponse> sendMessageRequest(GrpcHandler handler) throws Exception { |
748 | 815 | SendMessageRequest request = SendMessageRequest.newBuilder() |
749 | 816 | .setRequest(GRPC_MESSAGE) |
|
0 commit comments