99import io .a2a .client .config .ClientConfig ;
1010import io .a2a .client .transport .jsonrpc .JSONRPCTransport ;
1111import io .a2a .client .transport .jsonrpc .JSONRPCTransportConfigBuilder ;
12+ import io .a2a .spec .A2AClientError ;
1213import io .a2a .spec .A2AClientException ;
1314import io .a2a .spec .AgentCard ;
1415import io .a2a .spec .Message ;
2122import java .io .IOException ;
2223import java .util .Collections ;
2324import java .util .HashMap ;
24- import java .util .HashSet ;
2525import java .util .List ;
2626import java .util .Map ;
27- import java .util .Set ;
2827import java .util .concurrent .CountDownLatch ;
2928import java .util .concurrent .TimeUnit ;
3029import java .util .concurrent .atomic .AtomicBoolean ;
@@ -54,7 +53,41 @@ public class A2ACloudExampleClient {
5453 private static final int PROCESS_MESSAGE_COUNT = 8 ; // Number of "process" messages to send
5554 private static final int MESSAGE_INTERVAL_MS = 1500 ;
5655
56+ // Test state
57+ private final Map <String , Integer > observedPods = Collections .synchronizedMap (new HashMap <>());
58+ private final AtomicInteger artifactCount = new AtomicInteger (0 );
59+ private final AtomicBoolean testFailed = new AtomicBoolean (false );
60+ private final CountDownLatch taskCreationLatch = new CountDownLatch (1 );
61+ private final CountDownLatch completionLatch = new CountDownLatch (1 );
62+ private String serverTaskId ;
63+
64+ // Clients
65+ private Client streamingClient ;
66+ private Client nonStreamingClient ;
67+ private ClientConfig nonStreamingConfig ;
68+
5769 public static void main (String [] args ) throws Exception {
70+ new A2ACloudExampleClient ().run ();
71+ }
72+
73+ private void run () throws Exception {
74+ printHeader ();
75+
76+ AgentCard agentCard = fetchAndConfigureAgentCard ();
77+ String clientTaskId = generateClientTaskId ();
78+
79+ createClients (agentCard );
80+
81+ sendStartMessage (clientTaskId );
82+ subscribeToTaskUpdates ();
83+ sendProcessMessages ();
84+ sendCompleteMessage ();
85+
86+ waitForCompletion ();
87+ printResults ();
88+ }
89+
90+ private void printHeader () {
5891 System .out .println ("=============================================" );
5992 System .out .println ("A2A Cloud Deployment Example Client" );
6093 System .out .println ("=============================================" );
@@ -63,8 +96,9 @@ public static void main(String[] args) throws Exception {
6396 System .out .println ("Process messages: " + PROCESS_MESSAGE_COUNT );
6497 System .out .println ("Message interval: " + MESSAGE_INTERVAL_MS + "ms" );
6598 System .out .println ();
99+ }
66100
67- // Fetch agent card
101+ private AgentCard fetchAndConfigureAgentCard () throws A2AClientException , A2AClientError {
68102 System .out .println ("Fetching agent card..." );
69103 AgentCard fetchedCard = A2A .getAgentCard (AGENT_URL );
70104 System .out .println ("✓ Agent: " + fetchedCard .name ());
@@ -75,78 +109,82 @@ public static void main(String[] args) throws Exception {
75109 .url (AGENT_URL ) // Use localhost URL for port-forwarded connection
76110 .build ();
77111 System .out .println ();
112+ return agentCard ;
113+ }
78114
79- // Generate unique task ID for this test run
115+ private String generateClientTaskId () {
80116 String clientTaskId = "cloud-test-" + System .currentTimeMillis ();
81117 System .out .println ("Client task ID: " + clientTaskId );
82118 System .out .println ();
119+ return clientTaskId ;
120+ }
83121
84- // Track observed pod names and actual server task ID
85- Map <String , Integer > observedPods = Collections .synchronizedMap (new HashMap <>());
86- AtomicInteger artifactCount = new AtomicInteger (0 );
87- AtomicBoolean testFailed = new AtomicBoolean (false );
88- CountDownLatch completionLatch = new CountDownLatch (1 ); // Wait for COMPLETED state
89- final String [] serverTaskId = new String [1 ]; // Will be set by server response
90-
91- // Create streaming client for subscribing
122+ private void createClients (AgentCard agentCard ) throws A2AClientException {
92123 System .out .println ("Creating streaming client for subscription..." );
93124 ClientConfig streamingConfig = new ClientConfig .Builder ()
94125 .setStreaming (true )
95126 .build ();
96127
97- Client streamingClient = Client .builder (agentCard )
128+ streamingClient = Client .builder (agentCard )
98129 .clientConfig (streamingConfig )
99130 .withTransport (JSONRPCTransport .class , new JSONRPCTransportConfigBuilder ())
100131 .build ();
101132
102- // Create non-streaming client for sending messages
103133 System .out .println ("Creating non-streaming client for sending messages..." );
104- ClientConfig nonStreamingConfig = new ClientConfig .Builder ()
134+ nonStreamingConfig = new ClientConfig .Builder ()
105135 .setStreaming (false )
106136 .build ();
107137
108- Client nonStreamingClient = Client .builder (agentCard )
138+ nonStreamingClient = Client .builder (agentCard )
109139 .clientConfig (nonStreamingConfig )
110140 .withTransport (JSONRPCTransport .class , new JSONRPCTransportConfigBuilder ())
111141 .build ();
112142
113143 System .out .println ("✓ Clients created" );
114144 System .out .println ();
145+ }
115146
116- // Create initial task by sending "start" message (using non-streaming client)
147+ private void sendStartMessage ( String clientTaskId ) throws InterruptedException {
117148 System .out .println ("Step 1: Sending 'start' to create task..." );
118149 Message startMessage = A2A .toUserMessage ("start" , clientTaskId );
150+
119151 try {
120152 nonStreamingClient .sendMessage (startMessage , List .of ((ClientEvent event , AgentCard card ) -> {
121153 if (event instanceof TaskEvent te ) {
122- serverTaskId [ 0 ] = te .getTask ().getId (); // Capture server-generated task ID
123- System .out .println ("✓ Task created: " + serverTaskId [ 0 ] );
154+ serverTaskId = te .getTask ().getId ();
155+ System .out .println ("✓ Task created: " + serverTaskId );
124156 System .out .println (" State: " + te .getTask ().getStatus ().state ());
157+ taskCreationLatch .countDown ();
125158 }
126159 }), error -> {
127160 System .err .println ("✗ Failed to create task: " + error .getMessage ());
128161 testFailed .set (true );
162+ taskCreationLatch .countDown ();
129163 });
130164
131- // Wait for task to be fully processed before resubscription
132- Thread .sleep (1500 );
165+ // Wait for task creation to complete (max 5 seconds)
166+ if (!taskCreationLatch .await (5 , TimeUnit .SECONDS )) {
167+ System .err .println ("✗ Timeout waiting for task creation" );
168+ System .exit (1 );
169+ }
133170
134- if (serverTaskId [ 0 ] == null ) {
171+ if (serverTaskId == null ) {
135172 System .err .println ("✗ Failed to get server task ID" );
136173 System .exit (1 );
137174 }
138175 } catch (Exception e ) {
139176 System .err .println ("✗ Failed to create task: " + e .getMessage ());
140177 System .exit (1 );
141178 }
179+ }
142180
143- // Subscribe to task updates AFTER initial task is created
144- // In multi-pod setup, may need retry if we hit a pod that hasn't received Kafka events yet
181+ private void subscribeToTaskUpdates () throws InterruptedException {
145182 System .out .println ();
146183 System .out .println ("Step 2: Subscribing to task for streaming updates..." );
147184
148185 AtomicBoolean subscribed = new AtomicBoolean (false );
149186 int maxRetries = 3 ;
187+
150188 for (int attempt = 1 ; attempt <= maxRetries ; attempt ++) {
151189 try {
152190 if (attempt > 1 ) {
@@ -155,40 +193,11 @@ public static void main(String[] args) throws Exception {
155193 }
156194
157195 streamingClient .resubscribe (
158- new TaskIdParams (serverTaskId [0 ]),
159- List .of ((ClientEvent event , AgentCard card ) -> {
160- if (event instanceof TaskUpdateEvent tue ) {
161- if (tue .getUpdateEvent () instanceof TaskArtifactUpdateEvent artifactEvent ) {
162- int count = artifactCount .incrementAndGet ();
163- String artifactText = extractTextFromArtifact (artifactEvent );
164- System .out .println (" Artifact #" + count + ": " + artifactText );
165-
166- // Extract pod name from artifact text
167- String podName = extractPodName (artifactText );
168- if (podName != null && !podName .equals ("unknown-pod" )) {
169- int invokeCount = observedPods .getOrDefault (podName , 0 );
170- observedPods .put (podName , ++invokeCount );
171- System .out .println (" → Pod: " + podName + " (Total unique pods: " + observedPods .size () + ")" );
172- }
173- }
174- } else if (event instanceof TaskEvent te ) {
175- // Check for task completion
176- if (te .getTask ().getStatus ().state ().isFinal ()) {
177- System .out .println (" Task reached final state: " + te .getTask ().getStatus ().state ());
178- completionLatch .countDown ();
179- }
180- }
181- }),
182- error -> {
183- // Filter out normal stream closure errors (expected when task completes)
184- if (!isStreamClosedError (error )) {
185- System .err .println ("✗ Subscription error: " + error .getMessage ());
186- testFailed .set (true );
187- } else {
188- System .out .println ("ℹ Subscription stream closed (expected after task completion)" );
189- }
190- }
196+ new TaskIdParams (serverTaskId ),
197+ List .of (this ::handleSubscriptionEvent ),
198+ this ::handleSubscriptionError
191199 );
200+
192201 System .out .println ("✓ Subscribed to task updates" );
193202 subscribed .set (true );
194203 break ;
@@ -206,28 +215,60 @@ public static void main(String[] args) throws Exception {
206215 System .err .println ("✗ Failed to establish subscription" );
207216 System .exit (1 );
208217 }
218+ }
219+
220+ private void handleSubscriptionEvent (ClientEvent event , AgentCard card ) {
221+ if (event instanceof TaskUpdateEvent tue ) {
222+ if (tue .getUpdateEvent () instanceof TaskArtifactUpdateEvent artifactEvent ) {
223+ int count = artifactCount .incrementAndGet ();
224+ String artifactText = extractTextFromArtifact (artifactEvent );
225+ System .out .println (" Artifact #" + count + ": " + artifactText );
226+
227+ // Extract pod name from artifact text
228+ String podName = extractPodName (artifactText );
229+ if (podName != null && !podName .equals ("unknown-pod" )) {
230+ int invokeCount = observedPods .getOrDefault (podName , 0 );
231+ observedPods .put (podName , ++invokeCount );
232+ System .out .println (" → Pod: " + podName + " (Total unique pods: " + observedPods .size () + ")" );
233+ }
234+ }
235+ } else if (event instanceof TaskEvent te ) {
236+ // Check for task completion
237+ if (te .getTask ().getStatus ().state ().isFinal ()) {
238+ System .out .println (" Task reached final state: " + te .getTask ().getStatus ().state ());
239+ completionLatch .countDown ();
240+ }
241+ }
242+ }
209243
210- // Send "process" messages in a loop
211- // Create a new client for each message to force new HTTP connections and demonstrate load balancing
244+ private void handleSubscriptionError (Throwable error ) {
245+ // Filter out normal stream closure errors (expected when task completes)
246+ if (!isStreamClosedError (error )) {
247+ System .err .println ("✗ Subscription error: " + error .getMessage ());
248+ testFailed .set (true );
249+ } else {
250+ System .out .println ("ℹ Subscription stream closed (expected after task completion)" );
251+ }
252+ }
253+
254+ private void sendProcessMessages () throws InterruptedException , A2AClientException {
212255 System .out .println ();
213256 System .out .println ("Step 3: Sending " + PROCESS_MESSAGE_COUNT + " 'process' messages (interval: " + MESSAGE_INTERVAL_MS + "ms)..." );
214257 System .out .println ("--------------------------------------------" );
215258
216259 for (int i = 1 ; i <= PROCESS_MESSAGE_COUNT ; i ++) {
217260 final int messageNum = i ;
218261
219- // Create a new client for each request to force new HTTP connection, which should load balance across
220- // the available nodes
221- Client freshClient = Client .builder (agentCard )
262+ // Create a new client for each request to force new HTTP connection
263+ Client freshClient = Client .builder (streamingClient .getAgentCard ())
222264 .clientConfig (nonStreamingConfig )
223265 .withTransport (JSONRPCTransport .class , new JSONRPCTransportConfigBuilder ())
224266 .build ();
225267
226- // Build "process" message with explicit taskId
227268 Message message = new Message .Builder ()
228269 .role (Message .Role .USER )
229270 .parts (new TextPart ("process" ))
230- .taskId (serverTaskId [ 0 ] )
271+ .taskId (serverTaskId )
231272 .build ();
232273
233274 try {
@@ -247,25 +288,25 @@ public static void main(String[] args) throws Exception {
247288 }
248289 }
249290
250- // Wait a bit for process artifacts to arrive via subscription
291+ // Wait for process artifacts to arrive via subscription
251292 System .out .println ();
252293 System .out .println ("Waiting for process artifacts to arrive..." );
253294 Thread .sleep (2000 );
295+ }
254296
255- // Send "complete" message to finalize task
297+ private void sendCompleteMessage () {
256298 System .out .println ();
257299 System .out .println ("Step 4: Sending 'complete' to finalize task..." );
258300
259- // Create fresh client for complete message too
260- Client completeClient = Client .builder (agentCard )
301+ Client completeClient = Client .builder (streamingClient .getAgentCard ())
261302 .clientConfig (nonStreamingConfig )
262303 .withTransport (JSONRPCTransport .class , new JSONRPCTransportConfigBuilder ())
263304 .build ();
264305
265306 Message completeMessage = new Message .Builder ()
266307 .role (Message .Role .USER )
267308 .parts (new TextPart ("complete" ))
268- .taskId (serverTaskId [ 0 ] )
309+ .taskId (serverTaskId )
269310 .build ();
270311
271312 try {
@@ -281,15 +322,17 @@ public static void main(String[] args) throws Exception {
281322 System .err .println ("✗ Failed to send complete message: " + e .getMessage ());
282323 testFailed .set (true );
283324 }
325+ }
284326
285- // Wait for task to reach final state
327+ private void waitForCompletion () throws InterruptedException {
286328 System .out .println ();
287329 System .out .println ("Waiting for task to complete..." );
288330 if (!completionLatch .await (10 , TimeUnit .SECONDS )) {
289331 System .err .println ("⚠ Timeout waiting for task completion" );
290332 }
333+ }
291334
292- // Print results
335+ private void printResults () {
293336 System .out .println ();
294337 System .out .println ("=============================================" );
295338 System .out .println ("Test Results" );
0 commit comments