|
13 | 13 | import io.opentelemetry.sdk.common.CompletableResultCode; |
14 | 14 | import io.opentelemetry.sdk.common.export.ProxyOptions; |
15 | 15 | import io.opentelemetry.sdk.common.export.RetryPolicy; |
| 16 | +import io.opentelemetry.sdk.internal.DaemonThreadFactory; |
16 | 17 | import java.io.ByteArrayOutputStream; |
17 | 18 | import java.io.IOException; |
18 | 19 | import java.io.OutputStream; |
|
32 | 33 | import java.util.concurrent.CompletableFuture; |
33 | 34 | import java.util.concurrent.ConcurrentLinkedQueue; |
34 | 35 | import java.util.concurrent.ExecutorService; |
35 | | -import java.util.concurrent.Executors; |
| 36 | +import java.util.concurrent.SynchronousQueue; |
36 | 37 | import java.util.concurrent.ThreadLocalRandom; |
| 38 | +import java.util.concurrent.ThreadPoolExecutor; |
37 | 39 | import java.util.concurrent.TimeUnit; |
38 | 40 | import java.util.function.Consumer; |
39 | 41 | import java.util.function.Predicate; |
@@ -101,7 +103,7 @@ public final class JdkHttpSender implements HttpSender { |
101 | 103 | .map(RetryPolicy::getRetryExceptionPredicate) |
102 | 104 | .orElse(JdkHttpSender::isRetryableException); |
103 | 105 | if (executorService == null) { |
104 | | - this.executorService = Executors.newFixedThreadPool(5); |
| 106 | + this.executorService = newExecutor(); |
105 | 107 | this.managedExecutor = true; |
106 | 108 | } else { |
107 | 109 | this.executorService = executorService; |
@@ -133,6 +135,16 @@ public final class JdkHttpSender implements HttpSender { |
133 | 135 | executorService); |
134 | 136 | } |
135 | 137 |
|
| 138 | + private static ExecutorService newExecutor() { |
| 139 | + return new ThreadPoolExecutor( |
| 140 | + 0, |
| 141 | + Integer.MAX_VALUE, |
| 142 | + 60, |
| 143 | + TimeUnit.SECONDS, |
| 144 | + new SynchronousQueue<>(), |
| 145 | + new DaemonThreadFactory("jdkhttp-executor")); |
| 146 | + } |
| 147 | + |
136 | 148 | private static HttpClient configureClient( |
137 | 149 | @Nullable SSLContext sslContext, |
138 | 150 | long connectionTimeoutNanos, |
@@ -224,7 +236,8 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException { |
224 | 236 | Thread.currentThread().interrupt(); |
225 | 237 | break; // Break out and return response or throw |
226 | 238 | } |
227 | | - // If after sleeping we've exceeded timeoutNanos, break out and return response or throw |
| 239 | + // If after sleeping we've exceeded timeoutNanos, break out and return |
| 240 | + // response or throw |
228 | 241 | if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) { |
229 | 242 | break; |
230 | 243 | } |
@@ -305,12 +318,15 @@ private HttpResponse<byte[]> sendRequest( |
305 | 318 | } |
306 | 319 |
|
307 | 320 | private static boolean isRetryableException(IOException throwable) { |
308 | | - // Almost all IOExceptions we've encountered are transient retryable, so we opt out of specific |
| 321 | + // Almost all IOExceptions we've encountered are transient retryable, so we |
| 322 | + // opt out of specific |
309 | 323 | // IOExceptions that are unlikely to resolve rather than opting in. |
310 | | - // Known retryable IOException messages: "Connection reset", "/{remote ip}:{remote port} GOAWAY |
| 324 | + // Known retryable IOException messages: "Connection reset", "/{remote |
| 325 | + // ip}:{remote port} GOAWAY |
311 | 326 | // received" |
312 | 327 | // Known retryable HttpTimeoutException messages: "request timed out" |
313 | | - // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed out" |
| 328 | + // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed |
| 329 | + // out" |
314 | 330 | return !(throwable instanceof SSLException); |
315 | 331 | } |
316 | 332 |
|
|
0 commit comments