@@ -89,6 +89,9 @@ import io.a2a.server.tasks.TaskUpdater;
8989import io.a2a.spec.JSONRPCError ;
9090import io.a2a.spec.Message ;
9191import io.a2a.spec.Part ;
92+ import io.a2a.spec.Task ;
93+ import io.a2a.spec.TaskNotCancelableError ;
94+ import io.a2a.spec.TaskState ;
9295import io.a2a.spec.TextPart ;
9396...
9497
@@ -98,126 +101,73 @@ public class WeatherAgentExecutorProducer {
98101 @Inject
99102 WeatherAgent weatherAgent;
100103
101- // Thread pool for background execution
102- private final Executor taskExecutor = Executors . newCachedThreadPool();
103-
104- // Track active sessions for potential cancellation
105- private final ConcurrentHashMap<String , CompletableFuture<Void > > activeSessions = new ConcurrentHashMap<> ();
106-
107104 @Produces
108105 public AgentExecutor agentExecutor () {
109- return new WeatherAgentExecutor (weatherAgent, taskExecutor, activeSessions );
106+ return new WeatherAgentExecutor (weatherAgent);
110107 }
111108
112109 private static class WeatherAgentExecutor implements AgentExecutor {
113110
114111 private final WeatherAgent weatherAgent;
115- private final Executor taskExecutor;
116- private final ConcurrentHashMap<String , CompletableFuture<Void > > activeSessions;
117112
118- public WeatherAgentExecutor (WeatherAgent weatherAgent , Executor taskExecutor ,
119- ConcurrentHashMap<String , CompletableFuture<Void > > activeSessions ) {
113+ public WeatherAgentExecutor (WeatherAgent weatherAgent ) {
120114 this . weatherAgent = weatherAgent;
121- this . taskExecutor = taskExecutor;
122- this . activeSessions = activeSessions;
123115 }
124116
125117 @Override
126118 public void execute (RequestContext context , EventQueue eventQueue ) throws JSONRPCError {
127119 TaskUpdater updater = new TaskUpdater (context, eventQueue);
128120
129- // Immediately notify that the task is submitted
121+ // mark the task as submitted and start working on it
130122 if (context. getTask() == null ) {
131123 updater. submit();
132124 }
133125 updater. startWork();
134126
135- CompletableFuture<Void > taskFuture = CompletableFuture . runAsync(() - > {
136- try {
137- processRequest(context, updater);
138- } catch (Exception e) {
139- System . err. println(" Weather agent execution failed: " + e. getMessage());
140- e. printStackTrace();
141- }
142- }, taskExecutor);
127+ // extract the text from the message
128+ String userMessage = extractTextFromMessage(context. getMessage());
143129
144- // Track the active session
145- activeSessions. put(context. getContextId(), taskFuture);
146- taskFuture. join();
147- }
130+ // call the weather agent with the user's message
131+ String response = weatherAgent. chat(userMessage);
148132
149- private void processRequest (RequestContext context , TaskUpdater updater ) {
150- String contextId = context. getContextId();
151-
152- try {
153- // Check for interruption before starting work
154- if (Thread . currentThread(). isInterrupted()) {
155- return ;
156- }
157-
158- // Extract text from message parts
159- String userMessage = extractTextFromMessage(context. getMessage());
160-
161- // Call the weather agent with the user's message
162- String response = weatherAgent. chat(userMessage);
163-
164- // Check for interruption after agent call
165- if (Thread . currentThread(). isInterrupted()) {
166- return ;
167- }
168-
169- // Create response part
170- TextPart responsePart = new TextPart (response, null );
171- List<Part<?> > parts = List . of(responsePart);
172-
173- // Add response as artifact and complete the task
174- updater. addArtifact(parts, null , null , null );
175- updater. complete();
176-
177- } catch (Exception e) {
178- // Task failed
179- System . err. println(" Weather agent task failed: " + contextId);
180- e. printStackTrace();
181-
182- // Mark task as failed using TaskUpdater
183- updater. fail();
184-
185- } finally {
186- // Clean up active session
187- activeSessions. remove(contextId);
188- }
133+ // create the response part
134+ TextPart responsePart = new TextPart (response, null );
135+ List<Part<?> > parts = List . of(responsePart);
136+
137+ // add the response as an artifact and complete the task
138+ updater. addArtifact(parts, null , null , null );
139+ updater. complete();
189140 }
190141
191142 private String extractTextFromMessage (Message message ) {
192143 StringBuilder textBuilder = new StringBuilder ();
193-
194144 if (message. getParts() != null ) {
195145 for (Part part : message. getParts()) {
196146 if (part instanceof TextPart textPart) {
197147 textBuilder. append(textPart. getText());
198148 }
199149 }
200150 }
201-
202151 return textBuilder. toString();
203152 }
204153
205154 @Override
206155 public void cancel (RequestContext context , EventQueue eventQueue ) throws JSONRPCError {
207- String contextId = context. getContextId();
208- CompletableFuture<Void > taskFuture = activeSessions. get(contextId);
209-
210- if (taskFuture != null ) {
211- // Cancel the future
212- taskFuture. cancel(true );
213- activeSessions. remove(contextId);
214-
215- // Update task status to cancelled using TaskUpdater
216- TaskUpdater updater = new TaskUpdater (context, eventQueue);
217- updater. cancel();
218- } else {
219- System . out. println(" Cancellation requested for inactive weather session: " + contextId);
156+ Task task = context. getTask();
157+
158+ if (task. getStatus(). state() == TaskState . CANCELED ) {
159+ // task already cancelled
160+ throw new TaskNotCancelableError ();
161+ }
162+
163+ if (task. getStatus(). state() == TaskState . COMPLETED ) {
164+ // task already completed
165+ throw new TaskNotCancelableError ();
220166 }
167+
168+ // cancel the task
169+ TaskUpdater updater = new TaskUpdater (context, eventQueue);
170+ updater. cancel();
221171 }
222172 }
223173}
0 commit comments