11package io .a2a .examples .cloud ;
22
3+ import java .io .IOException ;
4+ import java .util .Collections ;
5+ import java .util .HashMap ;
6+ import java .util .List ;
7+ import java .util .Map ;
8+ import java .util .concurrent .CountDownLatch ;
9+ import java .util .concurrent .TimeUnit ;
10+ import java .util .concurrent .atomic .AtomicBoolean ;
11+ import java .util .concurrent .atomic .AtomicInteger ;
12+
313import io .a2a .A2A ;
414import io .a2a .client .Client ;
515import io .a2a .client .ClientEvent ;
919import io .a2a .client .config .ClientConfig ;
1020import io .a2a .client .transport .jsonrpc .JSONRPCTransport ;
1121import io .a2a .client .transport .jsonrpc .JSONRPCTransportConfigBuilder ;
12- import io .a2a .spec .A2AClientError ;
13- import io .a2a .spec .A2AClientException ;
1422import io .a2a .spec .AgentCard ;
1523import io .a2a .spec .Message ;
1624import io .a2a .spec .Part ;
17- import io .a2a .spec .Task ;
1825import io .a2a .spec .TaskArtifactUpdateEvent ;
1926import io .a2a .spec .TaskIdParams ;
2027import io .a2a .spec .TextPart ;
2128
22- import java .io .IOException ;
23- import java .util .Collections ;
24- import java .util .HashMap ;
25- import java .util .List ;
26- import java .util .Map ;
27- import java .util .concurrent .CountDownLatch ;
28- import java .util .concurrent .TimeUnit ;
29- import java .util .concurrent .atomic .AtomicBoolean ;
30- import java .util .concurrent .atomic .AtomicInteger ;
31-
3229/**
3330 * Test client demonstrating multi-pod A2A agent deployment with modernized message protocol.
3431 * <p>
5047public class A2ACloudExampleClient {
5148
5249 private static final String AGENT_URL = System .getProperty ("agent.url" , "http://localhost:8080" );
53- private static final int PROCESS_MESSAGE_COUNT = 8 ; // Number of "process" messages to send
54- private static final int MESSAGE_INTERVAL_MS = 1500 ;
50+ private static final int PROCESS_MESSAGE_COUNT = Integer .parseInt (System .getProperty ("process.message.count" , "8" )); // Number of "process" messages to send
51+ private static final int MESSAGE_INTERVAL_MS = Integer .parseInt (System .getProperty ("message.interval.ms" , "1500" ));
52+ private static final boolean CI_MODE = Boolean .parseBoolean (System .getProperty ("ci.mode" , "false" )); // Early exit when 2 pods observed
53+ private static final int MAX_MESSAGES_UNTIL_TWO_PODS = Integer .parseInt (System .getProperty ("max.messages.until.two.pods" , "20" )); // Max messages before giving up
54+ private static final int MIN_PODS_TO_OBSERVE = 2 ;
5555
5656 // Test state
5757 private final Map <String , Integer > observedPods = Collections .synchronizedMap (new HashMap <>());
@@ -253,11 +253,19 @@ private void handleSubscriptionError(Throwable error) {
253253
254254 private void sendProcessMessages () throws InterruptedException {
255255 System .out .println ();
256- System .out .println ("Step 3: Sending " + PROCESS_MESSAGE_COUNT + " 'process' messages (interval: " + MESSAGE_INTERVAL_MS + "ms)..." );
256+ if (CI_MODE ) {
257+ System .out .println ("Step 3: Sending 'process' messages until 2 pods observed (CI mode, max: " + MAX_MESSAGES_UNTIL_TWO_PODS + ", interval: " + MESSAGE_INTERVAL_MS + "ms)..." );
258+ } else {
259+ System .out .println ("Step 3: Sending " + PROCESS_MESSAGE_COUNT + " 'process' messages (interval: " + MESSAGE_INTERVAL_MS + "ms)..." );
260+ }
257261 System .out .println ("--------------------------------------------" );
258262
259- for (int i = 1 ; i <= PROCESS_MESSAGE_COUNT ; i ++) {
260- final int messageNum = i ;
263+ int messageCount = 0 ;
264+ int maxMessages = CI_MODE ? MAX_MESSAGES_UNTIL_TWO_PODS : PROCESS_MESSAGE_COUNT ;
265+
266+ while (messageCount < maxMessages ) {
267+ messageCount ++;
268+ final int messageNum = messageCount ;
261269
262270 // Create a new client for each request to force new HTTP connection
263271 Client freshClient = Client .builder (streamingClient .getAgentCard ())
@@ -282,8 +290,15 @@ private void sendProcessMessages() throws InterruptedException {
282290 });
283291
284292 Thread .sleep (MESSAGE_INTERVAL_MS );
293+
294+ // In CI mode, check if we've observed 2 pods and can exit early
295+ if (CI_MODE && observedPods .size () >= 2 ) {
296+ System .out .println ();
297+ System .out .println ("✓ CI mode: Successfully observed 2 pods after " + messageNum + " messages. Stopping early." );
298+ break ;
299+ }
285300 } catch (Exception e ) {
286- System .err .println ("✗ Failed to send process message " + i + ": " + e .getMessage ());
301+ System .err .println ("✗ Failed to send process message " + messageNum + ": " + e .getMessage ());
287302 testFailed .set (true );
288303 }
289304 }
@@ -345,8 +360,8 @@ private void printResults() {
345360 if (testFailed .get ()) {
346361 System .out .println ("✗ TEST FAILED - Errors occurred during execution" );
347362 System .exit (1 );
348- } else if (observedPods .size () < 2 ) {
349- System .out .println ("✗ TEST FAILED - Expected at least 2 different pods, but only saw: " + observedPods .size ());
363+ } else if (observedPods .size () < MIN_PODS_TO_OBSERVE ) {
364+ System .out .printf ("✗ TEST FAILED - Expected at least %d different pods, but only saw: %d \n " , MIN_PODS_TO_OBSERVE , observedPods .size ());
350365 System .out .println (" This suggests load balancing is not working correctly." );
351366 System .exit (1 );
352367 } else {
0 commit comments