Skip to content

Commit 8c639ed

Browse files
authored
Use SharedExecutorService in more locations (#42468)
Use SharedExecutorService in more locations
1 parent c9b6501 commit 8c639ed

File tree

10 files changed

+78
-204
lines changed

10 files changed

+78
-204
lines changed

sdk/identity/azure-identity/src/main/java/com/azure/identity/implementation/IdentityClientBase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.azure.core.util.ClientOptions;
2323
import com.azure.core.util.Configuration;
2424
import com.azure.core.util.CoreUtils;
25+
import com.azure.core.util.SharedExecutorService;
2526
import com.azure.core.util.UserAgentUtil;
2627
import com.azure.core.util.builder.ClientBuilderUtil;
2728
import com.azure.core.util.logging.ClientLogger;
@@ -280,6 +281,8 @@ ConfidentialClientApplication getConfidentialClient(boolean enableCae) {
280281

281282
if (options.getExecutorService() != null) {
282283
applicationBuilder.executorService(options.getExecutorService());
284+
} else {
285+
applicationBuilder.executorService(SharedExecutorService.getInstance());
283286
}
284287

285288
TokenCachePersistenceOptions tokenCachePersistenceOptions = options.getTokenCacheOptions();
@@ -341,6 +344,8 @@ PublicClientApplication getPublicClient(boolean sharedTokenCacheCredential, bool
341344

342345
if (options.getExecutorService() != null) {
343346
builder.executorService(options.getExecutorService());
347+
} else {
348+
builder.executorService(SharedExecutorService.getInstance());
344349
}
345350

346351
if (enableCae) {
@@ -457,6 +462,8 @@ ConfidentialClientApplication getManagedIdentityConfidentialClient() {
457462

458463
if (options.getExecutorService() != null) {
459464
applicationBuilder.executorService(options.getExecutorService());
465+
} else {
466+
applicationBuilder.executorService(SharedExecutorService.getInstance());
460467
}
461468

462469
return applicationBuilder.build();
@@ -495,6 +502,8 @@ ManagedIdentityApplication getManagedIdentityMsalApplication() {
495502

496503
if (options.getExecutorService() != null) {
497504
miBuilder.executorService(options.getExecutorService());
505+
} else {
506+
miBuilder.executorService(SharedExecutorService.getInstance());
498507
}
499508

500509
return miBuilder.build();
@@ -537,6 +546,8 @@ ConfidentialClientApplication getWorkloadIdentityConfidentialClient() {
537546

538547
if (options.getExecutorService() != null) {
539548
applicationBuilder.executorService(options.getExecutorService());
549+
} else {
550+
applicationBuilder.executorService(SharedExecutorService.getInstance());
540551
}
541552

542553
return applicationBuilder.build();

sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.azure.core.http.rest.Response;
1818
import com.azure.core.util.BinaryData;
1919
import com.azure.core.util.Context;
20+
import com.azure.core.util.SharedExecutorService;
2021
import com.azure.core.util.logging.ClientLogger;
2122
import com.azure.monitor.ingestion.implementation.Batcher;
2223
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesClient;
@@ -29,16 +30,13 @@
2930
import java.util.List;
3031
import java.util.Objects;
3132
import java.util.concurrent.ExecutionException;
32-
import java.util.concurrent.ExecutorService;
3333
import java.util.function.Consumer;
3434
import java.util.stream.Collectors;
3535
import java.util.stream.Stream;
3636

3737
import static com.azure.monitor.ingestion.implementation.Utils.GZIP;
38-
import static com.azure.monitor.ingestion.implementation.Utils.createThreadPool;
3938
import static com.azure.monitor.ingestion.implementation.Utils.getConcurrency;
4039
import static com.azure.monitor.ingestion.implementation.Utils.gzipRequest;
41-
import static com.azure.monitor.ingestion.implementation.Utils.registerShutdownHook;
4240

4341
/**
4442
* <p>This class provides a synchronous client for uploading custom logs to an Azure Monitor Log Analytics workspace.
@@ -99,19 +97,13 @@ public final class LogsIngestionClient implements AutoCloseable {
9997
private static final ClientLogger LOGGER = new ClientLogger(LogsIngestionClient.class);
10098
private final IngestionUsingDataCollectionRulesClient client;
10199

102-
// dynamic thread pool that scales up and down on demand.
103-
private final ExecutorService threadPool;
104-
private final Thread shutdownHook;
105-
106100
/**
107101
* Creates a {@link LogsIngestionClient} that sends requests to the data collection endpoint.
108102
*
109103
* @param client The {@link IngestionUsingDataCollectionRulesClient} that the client routes its request through.
110104
*/
111105
LogsIngestionClient(IngestionUsingDataCollectionRulesClient client) {
112106
this.client = client;
113-
this.threadPool = createThreadPool();
114-
this.shutdownHook = registerShutdownHook(this.threadPool, 5);
115107
}
116108

117109
/**
@@ -251,7 +243,7 @@ private Stream<UploadLogsResponseHolder> submit(Stream<UploadLogsResponseHolder>
251243
}
252244

253245
try {
254-
return threadPool.submit(() -> responseStream).get();
246+
return SharedExecutorService.getInstance().submit(() -> responseStream).get();
255247
} catch (InterruptedException | ExecutionException e) {
256248
throw LOGGER.logExceptionAsError(new RuntimeException(e));
257249
}
@@ -335,7 +327,5 @@ public Response<Void> uploadWithResponse(String ruleId, String streamName, Binar
335327

336328
@Override
337329
public void close() {
338-
threadPool.shutdown();
339-
Runtime.getRuntime().removeShutdownHook(shutdownHook);
340330
}
341331
}

sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Utils.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,13 @@
99
import java.io.ByteArrayOutputStream;
1010
import java.io.IOException;
1111
import java.io.UncheckedIOException;
12-
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.SynchronousQueue;
14-
import java.util.concurrent.ThreadPoolExecutor;
15-
import java.util.concurrent.TimeUnit;
1612
import java.util.zip.GZIPOutputStream;
1713

1814
public final class Utils {
1915
public static final long MAX_REQUEST_PAYLOAD_SIZE = 1024 * 1024; // 1 MB
2016
public static final String GZIP = "gzip";
2117

2218
private static final ClientLogger LOGGER = new ClientLogger(Utils.class);
23-
// similarly to Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, just puts a limit depending on logical processors count.
24-
private static final int MAX_CONCURRENCY = 10 * Runtime.getRuntime().availableProcessors();
2519

2620
private Utils() {
2721
}
@@ -50,41 +44,4 @@ public static int getConcurrency(LogsUploadOptions options) {
5044

5145
return 1;
5246
}
53-
54-
/**
55-
* Creates cached (that supports scaling) thread pool with shutdown hook to do best-effort graceful termination within timeout.
56-
*
57-
* @return {@link ExecutorService} instance.
58-
*/
59-
public static ExecutorService createThreadPool() {
60-
return new ThreadPoolExecutor(0, MAX_CONCURRENCY, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
61-
}
62-
63-
/**
64-
* Registers {@link ExecutorService} shutdown hook which will be called when JVM terminates.
65-
* First, stops accepting new tasks, then awaits their completion for
66-
* half of timeout, cancels remaining tasks and waits another half of timeout for them to get cancelled.
67-
*
68-
* @param threadPool Thread pool to shut down.
69-
* @param timeoutSec Timeout in seconds to wait for tasks to complete or terminate after JVM starting to shut down.
70-
* @return hook thread instance that can be used to unregister hook.
71-
*/
72-
public static Thread registerShutdownHook(ExecutorService threadPool, int timeoutSec) {
73-
// based on https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
74-
long halfTimeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSec) / 2;
75-
Thread hook = new Thread(() -> {
76-
try {
77-
threadPool.shutdown();
78-
if (!threadPool.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
79-
threadPool.shutdownNow();
80-
threadPool.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
81-
}
82-
} catch (InterruptedException e) {
83-
Thread.currentThread().interrupt();
84-
threadPool.shutdownNow();
85-
}
86-
});
87-
Runtime.getRuntime().addShutdownHook(hook);
88-
return hook;
89-
}
9047
}

sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/ConcurrencyLimitingSpliteratorTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.monitor.ingestion.implementation;
55

6+
import com.azure.core.util.SharedExecutorService;
67
import org.junit.jupiter.api.Test;
78
import org.junit.jupiter.api.parallel.Execution;
89
import org.junit.jupiter.api.parallel.ExecutionMode;
@@ -13,8 +14,6 @@
1314
import java.util.List;
1415
import java.util.concurrent.CountDownLatch;
1516
import java.util.concurrent.ExecutionException;
16-
import java.util.concurrent.ExecutorService;
17-
import java.util.concurrent.Executors;
1817
import java.util.concurrent.TimeUnit;
1918
import java.util.concurrent.atomic.AtomicInteger;
2019
import java.util.stream.Collectors;
@@ -31,7 +30,6 @@
3130
@Execution(ExecutionMode.SAME_THREAD)
3231
public class ConcurrencyLimitingSpliteratorTest {
3332
private static final int TEST_TIMEOUT_SEC = 30;
34-
private static final ExecutorService TEST_THREAD_POOL = Executors.newCachedThreadPool();
3533

3634
@Test
3735
public void invalidParams() {
@@ -53,7 +51,7 @@ public void concurrentCalls(int concurrency) throws ExecutionException, Interrup
5351

5452
int effectiveConcurrency = Math.min(list.size(), concurrency);
5553
CountDownLatch latch = new CountDownLatch(effectiveConcurrency);
56-
List<Integer> processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> {
54+
List<Integer> processed = SharedExecutorService.getInstance().submit(() -> stream.map(r -> {
5755
latch.countDown();
5856
try {
5957
Thread.sleep(10);
@@ -78,7 +76,7 @@ public void concurrencyHigherThanItemsCount() throws ExecutionException, Interru
7876

7977
AtomicInteger parallel = new AtomicInteger(0);
8078
AtomicInteger maxParallel = new AtomicInteger(0);
81-
List<Integer> processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> {
79+
List<Integer> processed = SharedExecutorService.getInstance().submit(() -> stream.map(r -> {
8280
int cur = parallel.incrementAndGet();
8381
int curMax = maxParallel.get();
8482
while (cur > curMax && !maxParallel.compareAndSet(curMax, cur)) {

sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/UtilsTest.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

sdk/search/azure-search-documents/src/main/java/com/azure/search/documents/implementation/batching/SearchIndexingPublisher.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.core.http.rest.Response;
88
import com.azure.core.util.Context;
99
import com.azure.core.util.CoreUtils;
10+
import com.azure.core.util.SharedExecutorService;
1011
import com.azure.core.util.logging.ClientLogger;
1112
import com.azure.core.util.serializer.JsonSerializer;
1213
import com.azure.search.documents.implementation.SearchIndexClientImpl;
@@ -29,8 +30,6 @@
2930
import java.util.List;
3031
import java.util.Objects;
3132
import java.util.concurrent.ExecutionException;
32-
import java.util.concurrent.ExecutorService;
33-
import java.util.concurrent.Executors;
3433
import java.util.concurrent.Future;
3534
import java.util.concurrent.TimeoutException;
3635
import java.util.concurrent.atomic.AtomicInteger;
@@ -54,7 +53,6 @@
5453
*/
5554
public final class SearchIndexingPublisher<T> {
5655
private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
57-
private static final ExecutorService EXECUTOR = getThreadPoolWithShutdownHook();
5856

5957
private final SearchIndexClientImpl restClient;
6058
private final JsonSerializer serializer;
@@ -154,7 +152,8 @@ public void flush(boolean awaitLock, boolean isClose, Duration timeout, Context
154152
private void flushLoop(boolean isClosed, Duration timeout, Context context) {
155153
if (timeout != null && !timeout.isNegative() && !timeout.isZero()) {
156154
final AtomicReference<List<TryTrackingIndexAction<T>>> batchActions = new AtomicReference<>();
157-
Future<?> future = EXECUTOR.submit(() -> flushLoopHelper(isClosed, context, batchActions));
155+
Future<?> future = SharedExecutorService.getInstance()
156+
.submit(() -> flushLoopHelper(isClosed, context, batchActions));
158157

159158
try {
160159
CoreUtils.getResultWithTimeout(future, timeout);
@@ -361,8 +360,4 @@ private static void sleep(long millis) {
361360
} catch (InterruptedException ignored) {
362361
}
363362
}
364-
365-
private static ExecutorService getThreadPoolWithShutdownHook() {
366-
return CoreUtils.addShutdownHookSafely(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
367-
}
368363
}

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/HttpFaultInjectingTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.azure.core.util.Context;
1717
import com.azure.core.util.CoreUtils;
1818
import com.azure.core.util.HttpClientOptions;
19+
import com.azure.core.util.SharedExecutorService;
1920
import com.azure.core.util.UrlBuilder;
2021
import com.azure.core.util.logging.ClientLogger;
2122
import com.azure.storage.blob.BlobClient;
@@ -49,8 +50,7 @@
4950
import java.util.Locale;
5051
import java.util.Set;
5152
import java.util.concurrent.Callable;
52-
import java.util.concurrent.ExecutorService;
53-
import java.util.concurrent.Executors;
53+
import java.util.concurrent.CountDownLatch;
5454
import java.util.concurrent.ThreadLocalRandom;
5555
import java.util.concurrent.TimeUnit;
5656
import java.util.concurrent.atomic.AtomicInteger;
@@ -130,8 +130,8 @@ public void downloadToFileWithFaultInjection() throws IOException, InterruptedEx
130130
StandardOpenOption.TRUNCATE_EXISTING, // If the file already exists and it is opened for WRITE access, then its length is truncated to 0.
131131
StandardOpenOption.READ, StandardOpenOption.WRITE));
132132

133-
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
134-
executorService.invokeAll(files.stream().map(it -> (Callable<Void>) () -> {
133+
CountDownLatch countDownLatch = new CountDownLatch(500);
134+
SharedExecutorService.getInstance().invokeAll(files.stream().map(it -> (Callable<Void>) () -> {
135135
try {
136136
downloadClient.downloadToFileWithResponse(new BlobDownloadToFileOptions(it.getAbsolutePath())
137137
.setOpenOptions(overwriteOptions)
@@ -148,13 +148,14 @@ public void downloadToFileWithFaultInjection() throws IOException, InterruptedEx
148148
LOGGER.atWarning()
149149
.addKeyValue("downloadFile", it.getAbsolutePath())
150150
.log("Failed to complete download.", ex);
151+
} finally {
152+
countDownLatch.countDown();
151153
}
152154

153155
return null;
154156
}).collect(Collectors.toList()));
155157

156-
executorService.shutdown();
157-
executorService.awaitTermination(10, TimeUnit.MINUTES);
158+
countDownLatch.await(10, TimeUnit.MINUTES);
158159

159160
assertTrue(successCount.get() >= 450);
160161
// cleanup

0 commit comments

Comments
 (0)