Skip to content

Commit b95f675

Browse files
authored
Merge pull request #121 from kabir/one-executor
Share the Executor used for async operations
2 parents a5df910 + 0ddd7cc commit b95f675

File tree

10 files changed

+80
-24
lines changed

10 files changed

+80
-24
lines changed

core/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.a2a.server.requesthandlers;
22

3-
import static io.a2a.util.AsyncUtils.convertingProcessor;
4-
import static io.a2a.util.AsyncUtils.createTubeConfig;
5-
import static io.a2a.util.AsyncUtils.processor;
3+
import static io.a2a.util.async.AsyncUtils.convertingProcessor;
4+
import static io.a2a.util.async.AsyncUtils.createTubeConfig;
5+
import static io.a2a.util.async.AsyncUtils.processor;
66

77
import java.util.ArrayList;
88
import java.util.Collections;
@@ -12,7 +12,6 @@
1212
import java.util.Objects;
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.Executor;
15-
import java.util.concurrent.Executors;
1615
import java.util.concurrent.Flow;
1716
import java.util.concurrent.atomic.AtomicReference;
1817
import java.util.function.Supplier;
@@ -47,6 +46,7 @@
4746
import io.a2a.spec.TaskQueryParams;
4847
import io.a2a.spec.UnsupportedOperationError;
4948
import io.a2a.util.TempLoggerWrapper;
49+
import io.a2a.util.async.Internal;
5050
import org.slf4j.Logger;
5151
import org.slf4j.LoggerFactory;
5252

@@ -63,15 +63,16 @@ public class DefaultRequestHandler implements RequestHandler {
6363

6464
private final Map<String, CompletableFuture<Void>> runningAgents = Collections.synchronizedMap(new HashMap<>());
6565

66-
private final Executor executor = Executors.newCachedThreadPool();
66+
private final Executor executor;
6767

6868
@Inject
6969
public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
70-
QueueManager queueManager, PushNotifier pushNotifier) {
70+
QueueManager queueManager, PushNotifier pushNotifier, @Internal Executor executor) {
7171
this.agentExecutor = agentExecutor;
7272
this.taskStore = taskStore;
7373
this.queueManager = queueManager;
7474
this.pushNotifier = pushNotifier;
75+
this.executor = executor;
7576
// TODO In Python this is also a constructor parameter defaulting to this SimpleRequestContextBuilder
7677
// implementation if the parameter is null. Skip that for now, since otherwise I get CDI errors, and
7778
// I am unsure about the correct scope.

core/src/main/java/io/a2a/server/requesthandlers/JSONRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.a2a.server.requesthandlers;
22

3-
import static io.a2a.util.AsyncUtils.createTubeConfig;
3+
import static io.a2a.util.async.AsyncUtils.createTubeConfig;
44

55
import java.util.concurrent.Flow;
66

core/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.a2a.server.tasks;
22

3-
import static io.a2a.util.AsyncUtils.consumer;
4-
import static io.a2a.util.AsyncUtils.createTubeConfig;
5-
import static io.a2a.util.AsyncUtils.processor;
3+
import static io.a2a.util.async.AsyncUtils.consumer;
4+
import static io.a2a.util.async.AsyncUtils.createTubeConfig;
5+
import static io.a2a.util.async.AsyncUtils.processor;
66

77
import java.util.concurrent.Flow;
88
import java.util.concurrent.atomic.AtomicBoolean;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.a2a.util.async;
2+
3+
import java.util.concurrent.Executor;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
7+
import jakarta.annotation.PostConstruct;
8+
import jakarta.annotation.PreDestroy;
9+
import jakarta.enterprise.inject.Produces;
10+
import jakarta.inject.Singleton;
11+
12+
@Singleton
13+
public class AsyncExecutorProducer {
14+
15+
private ExecutorService executor;
16+
17+
@PostConstruct
18+
public void init() {
19+
executor = Executors.newCachedThreadPool();
20+
}
21+
22+
@PreDestroy
23+
public void close() {
24+
executor.shutdown();
25+
}
26+
27+
@Produces
28+
@Internal
29+
public Executor produce() {
30+
return executor;
31+
}
32+
33+
}

core/src/main/java/io/a2a/util/AsyncUtils.java renamed to core/src/main/java/io/a2a/util/async/AsyncUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.a2a.util;
1+
package io.a2a.util.async;
22

33
import java.util.concurrent.Flow;
44
import java.util.concurrent.atomic.AtomicBoolean;
@@ -7,6 +7,7 @@
77
import java.util.function.Consumer;
88
import java.util.function.Function;
99

10+
import io.a2a.util.Assert;
1011
import mutiny.zero.BackpressureStrategy;
1112
import mutiny.zero.Tube;
1213
import mutiny.zero.TubeConfiguration;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.a2a.util.async;
2+
3+
import java.lang.annotation.Retention;
4+
import java.lang.annotation.RetentionPolicy;
5+
import java.lang.annotation.Target;
6+
7+
import jakarta.inject.Qualifier;
8+
9+
@Qualifier
10+
@Retention(RetentionPolicy.RUNTIME)
11+
public @interface Internal {
12+
}

core/src/test/java/io/a2a/server/requesthandlers/JSONRPCHandlerTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.List;
1515
import java.util.concurrent.CompletableFuture;
1616
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.Executor;
1718
import java.util.concurrent.Executors;
1819
import java.util.concurrent.Flow;
1920
import java.util.concurrent.TimeUnit;
@@ -104,6 +105,8 @@ public class JSONRPCHandlerTest {
104105
private InMemoryQueueManager queueManager;
105106
private TestHttpClient httpClient;
106107

108+
private final Executor internalExecutor = Executors.newCachedThreadPool();
109+
107110

108111
@BeforeEach
109112
public void init() {
@@ -128,7 +131,7 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
128131
httpClient = new TestHttpClient();
129132
PushNotifier pushNotifier = new InMemoryPushNotifier(httpClient);
130133

131-
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushNotifier);
134+
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushNotifier, internalExecutor);
132135
}
133136

134137
@AfterEach
@@ -1050,7 +1053,8 @@ public void testPushNotificationsNotSupportedError() {
10501053
@Test
10511054
public void testOnGetPushNotificationNoPushNotifier() {
10521055
// Create request handler without a push notifier
1053-
DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null);
1056+
DefaultRequestHandler requestHandler =
1057+
new DefaultRequestHandler(executor, taskStore, queueManager, null, internalExecutor);
10541058
AgentCard card = createAgentCard(false, true, false);
10551059
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler);
10561060

@@ -1068,7 +1072,8 @@ public void testOnGetPushNotificationNoPushNotifier() {
10681072
@Test
10691073
public void testOnSetPushNotificationNoPushNotifier() {
10701074
// Create request handler without a push notifier
1071-
DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null);
1075+
DefaultRequestHandler requestHandler =
1076+
new DefaultRequestHandler(executor, taskStore, queueManager, null, internalExecutor);
10721077
AgentCard card = createAgentCard(false, true, false);
10731078
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler);
10741079

@@ -1157,7 +1162,8 @@ public void testDefaultRequestHandlerWithCustomComponents() {
11571162

11581163
@Test
11591164
public void testOnMessageSendErrorHandling() {
1160-
DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null);
1165+
DefaultRequestHandler requestHandler =
1166+
new DefaultRequestHandler(executor, taskStore, queueManager, null, internalExecutor);
11611167
AgentCard card = createAgentCard(false, true, false);
11621168
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler);
11631169

core/src/test/java/io/a2a/util/AsyncUtilsTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package io.a2a.util;
22

3-
import static io.a2a.util.AsyncUtils.consumer;
4-
import static io.a2a.util.AsyncUtils.convertingProcessor;
5-
import static io.a2a.util.AsyncUtils.createTubeConfig;
6-
import static io.a2a.util.AsyncUtils.processor;
3+
import static io.a2a.util.async.AsyncUtils.consumer;
4+
import static io.a2a.util.async.AsyncUtils.convertingProcessor;
5+
import static io.a2a.util.async.AsyncUtils.createTubeConfig;
6+
import static io.a2a.util.async.AsyncUtils.processor;
77
import static org.junit.jupiter.api.Assertions.assertEquals;
88
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
99
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -13,8 +13,6 @@
1313
import java.util.Collections;
1414
import java.util.List;
1515
import java.util.concurrent.CountDownLatch;
16-
import java.util.concurrent.Executor;
17-
import java.util.concurrent.Executors;
1816
import java.util.concurrent.Flow;
1917
import java.util.concurrent.TimeUnit;
2018
import java.util.concurrent.atomic.AtomicReference;

sdk-jakarta/src/main/java/io/a2a/server/apps/jakarta/A2AServerResource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.a2a.spec.StreamingJSONRPCRequest;
4848
import io.a2a.spec.TaskResubscriptionRequest;
4949
import io.a2a.spec.UnsupportedOperationError;
50+
import io.a2a.util.async.Internal;
5051

5152
@Path("/")
5253
public class A2AServerResource {
@@ -61,8 +62,9 @@ public class A2AServerResource {
6162
// Hook so testing can wait until the async Subscription is subscribed.
6263
private static volatile Runnable streamingIsSubscribedRunnable;
6364

64-
65-
private final Executor executor = Executors.newCachedThreadPool();
65+
@Inject
66+
@Internal
67+
Executor executor;
6668

6769
/**
6870
* Handles incoming POST requests to the main A2A endpoint. Dispatches the

sdk-quarkus/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.a2a.spec.TaskResubscriptionRequest;
4646
import io.a2a.spec.UnsupportedOperationError;
4747
import io.a2a.util.Utils;
48+
import io.a2a.util.async.Internal;
4849
import io.quarkus.vertx.web.Body;
4950
import io.quarkus.vertx.web.ReactiveRoutes;
5051
import io.quarkus.vertx.web.Route;
@@ -71,7 +72,9 @@ public class A2AServerRoutes {
7172
// Hook so testing can wait until the MultiSseSupport is subscribed.
7273
private static volatile Runnable streamingMultiSseSupportSubscribedRunnable;
7374

74-
private final Executor executor = Executors.newCachedThreadPool();
75+
@Inject
76+
@Internal
77+
Executor executor;
7578

7679
@Route(path = "/", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
7780
public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {

0 commit comments

Comments
 (0)