Skip to content

Commit 1e08b28

Browse files
committed
task-2
1 parent 4762372 commit 1e08b28

File tree

3 files changed

+106
-7
lines changed

3 files changed

+106
-7
lines changed

rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Response.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
2828
import org.apache.hc.core5.http.message.RequestLine;
2929

30+
import java.io.Closeable;
31+
import java.io.IOException;
3032
import java.util.ArrayList;
3133
import java.util.List;
3234
import java.util.Objects;
@@ -37,7 +39,7 @@
3739
* Holds an elasticsearch response. It wraps the {@link BasicClassicHttpResponse} returned and associates
3840
* it with its corresponding {@link RequestLine} and {@link HttpHost}.
3941
*/
40-
public class Response {
42+
public class Response implements Closeable {
4143

4244
private final RequestLine requestLine;
4345
private final HttpHost host;
@@ -201,6 +203,11 @@ ClassicHttpResponse getHttpResponse() {
201203
return response;
202204
}
203205

206+
@Override
207+
public void close() throws IOException {
208+
response.close();
209+
}
210+
204211
@Override
205212
public String toString() {
206213
return "Response{requestLine=" + requestLine + ", host=" + host + ", response=" + response.getCode() + '}';

rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@
7878
import java.util.concurrent.ConcurrentMap;
7979
import java.util.concurrent.ExecutionException;
8080
import java.util.concurrent.Future;
81+
import java.util.concurrent.ScheduledExecutorService;
82+
import java.util.concurrent.ScheduledThreadPoolExecutor;
83+
import java.util.concurrent.TimeUnit;
8184
import java.util.concurrent.atomic.AtomicInteger;
8285
import java.util.stream.Collectors;
8386

@@ -127,6 +130,8 @@ public class Rest5Client implements Closeable {
127130
private final WarningsHandler warningsHandler;
128131
private final boolean compressionEnabled;
129132
private final boolean metaHeaderEnabled;
133+
private final ScheduledExecutorService healthCheckExecutor;
134+
private volatile boolean closed = false;
130135

131136
Rest5Client(
132137
CloseableHttpAsyncClient client,
@@ -148,6 +153,19 @@ public class Rest5Client implements Closeable {
148153
this.compressionEnabled = compressionEnabled;
149154
this.metaHeaderEnabled = metaHeaderEnabled;
150155
setNodes(nodes);
156+
157+
// 初始化连接池健康检查执行器
158+
this.healthCheckExecutor = new ScheduledThreadPoolExecutor(1, r -> {
159+
Thread thread = new Thread(r, "elasticsearch-rest-client-health-check");
160+
thread.setDaemon(true);
161+
return thread;
162+
});
163+
164+
// 启动定期健康检查(每30秒执行一次)
165+
this.healthCheckExecutor.scheduleAtFixedRate(
166+
this::performHealthCheck,
167+
30, 30, TimeUnit.SECONDS
168+
);
151169
}
152170

153171
/**
@@ -584,9 +602,66 @@ private void onFailure(Node node) {
584602
failureListener.onFailure(node);
585603
}
586604

605+
@Override
606+
/**
607+
* 执行连接池健康检查
608+
*/
609+
private void performHealthCheck() {
610+
if (closed) {
611+
return;
612+
}
613+
614+
try {
615+
// 检查客户端状态
616+
if (client instanceof org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) {
617+
org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl clientImpl =
618+
(org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) client;
619+
620+
// 获取连接管理器
621+
org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager connectionManager =
622+
(org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager) clientImpl.getConnectionManager();
623+
624+
// 检查连接池状态
625+
int totalConnections = connectionManager.getTotalStats().getAvailable() +
626+
connectionManager.getTotalStats().getLeased() +
627+
connectionManager.getTotalStats().getPending() +
628+
connectionManager.getTotalStats().getMax();
629+
630+
// 如果连接数超过阈值,输出警告日志
631+
if (totalConnections > connectionManager.getMaxTotal()) {
632+
logger.warn("Connection pool usage exceeded maximum limit. Total connections: " + totalConnections + ", Max: " + connectionManager.getMaxTotal());
633+
}
634+
635+
// 定期清理过期连接
636+
connectionManager.closeExpiredConnections();
637+
connectionManager.closeIdleConnections(Timeout.of(5, TimeUnit.MINUTES));
638+
}
639+
} catch (Exception e) {
640+
logger.debug("Failed to perform connection pool health check", e);
641+
}
642+
}
643+
587644
@Override
588645
public void close() throws IOException {
589-
client.close();
646+
closed = true;
647+
648+
// 关闭健康检查执行器
649+
if (healthCheckExecutor != null) {
650+
healthCheckExecutor.shutdown();
651+
try {
652+
if (!healthCheckExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
653+
healthCheckExecutor.shutdownNow();
654+
}
655+
} catch (InterruptedException e) {
656+
Thread.currentThread().interrupt();
657+
healthCheckExecutor.shutdownNow();
658+
}
659+
}
660+
661+
// 关闭HTTP客户端
662+
if (client != null) {
663+
client.close();
664+
}
590665
}
591666

592667
/**

rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public final class Rest5ClientBuilder {
8080
private Consumer<RequestConfig.Builder> requestConfigCallback;
8181
private Consumer<ConnectionConfig.Builder> connectionConfigCallback;
8282
private Consumer<PoolingAsyncClientConnectionManagerBuilder> connectionManagerCallback;
83+
private Consumer<PoolingAsyncClientConnectionManager> connectionManagerMonitor;
8384
private Header[] defaultHeaders = EMPTY_HEADERS;
8485
private Rest5Client.FailureListener failureListener;
8586
private SSLContext sslContext;
@@ -369,14 +370,21 @@ public Rest5ClientBuilder setConnectionConfigCallback(Consumer<ConnectionConfig.
369370
* and used by the {@link CloseableHttpAsyncClient}.
370371
* Commonly used to customize {@link PoolingAsyncClientConnectionManager} without losing any other useful default
371372
* value that the {@link Rest5ClientBuilder} internally sets.
372-
* @throws NullPointerException if {@code connectionManagerCallback} is {@code null}.
373373
*/
374374
public Rest5ClientBuilder setConnectionManagerCallback(Consumer<PoolingAsyncClientConnectionManagerBuilder> connectionManagerCallback) {
375-
Objects.requireNonNull(connectionManagerCallback, "connectionManagerCallback must not be null");
376375
this.connectionManagerCallback = connectionManagerCallback;
377376
return this;
378377
}
379378

379+
/**
380+
* Allows to monitor the {@link PoolingAsyncClientConnectionManager} after it has been created.
381+
* This can be used to add custom monitoring or alerting for connection pool usage.
382+
*/
383+
public Rest5ClientBuilder setConnectionManagerMonitor(Consumer<PoolingAsyncClientConnectionManager> connectionManagerMonitor) {
384+
this.connectionManagerMonitor = connectionManagerMonitor;
385+
return this;
386+
}
387+
380388
/**
381389
* Creates a new {@link Rest5Client} based on the provided configuration.
382390
*/
@@ -446,20 +454,29 @@ private CloseableHttpAsyncClient createHttpClient() {
446454
connectionConfigCallback.accept(connectionConfigBuilder);
447455
}
448456

449-
PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder =
457+
PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder =
450458
PoolingAsyncClientConnectionManagerBuilder.create()
451459
.setDefaultConnectionConfig(connectionConfigBuilder.build())
452460
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE)
453461
.setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
454-
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext));
462+
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext))
463+
// 配置连接池定期清理机制
464+
.setValidateAfterInactivity(Timeout.of(5, TimeUnit.SECONDS))
465+
// 配置连接存活时间
466+
.setConnectionTimeToLive(Timeout.of(30, TimeUnit.MINUTES));
455467

456468
if (connectionManagerCallback != null) {
457469
connectionManagerCallback.accept(connectionManagerBuilder);
458470
}
471+
PoolingAsyncClientConnectionManager connectionManager = connectionManagerBuilder.build();
472+
// 调用连接池监控回调
473+
if (connectionManagerMonitor != null) {
474+
connectionManagerMonitor.accept(connectionManager);
475+
}
459476

460477
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create()
461478
.setDefaultRequestConfig(requestConfigBuilder.build())
462-
.setConnectionManager(connectionManagerBuilder.build())
479+
.setConnectionManager(connectionManager)
463480
.setUserAgent(USER_AGENT_HEADER_VALUE)
464481
.setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy())
465482
.setThreadFactory(new RestClientThreadFactory());

0 commit comments

Comments
 (0)