Skip to content

Commit 18cb394

Browse files
[AINode] Revert ClientPoolFactory
1 parent d579c34 commit 18cb394

File tree

15 files changed

+107
-481
lines changed

15 files changed

+107
-481
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2424
import org.apache.iotdb.commons.client.IClientManager;
2525
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
26-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
26+
import org.apache.iotdb.db.protocol.client.AINodeClientFactory;
2727
import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient;
2828

2929
public class AsyncAINodeHeartbeatClientPool {
@@ -33,9 +33,7 @@ public class AsyncAINodeHeartbeatClientPool {
3333
private AsyncAINodeHeartbeatClientPool() {
3434
clientManager =
3535
new IClientManager.Factory<TEndPoint, AsyncAINodeServiceClient>()
36-
.createClientManager(
37-
new ConfigNodeClientManager.ClientPoolFactory
38-
.AsyncAINodeHeartbeatServiceClientPoolFactory());
36+
.createClientManager(new AINodeClientFactory.AINodeHeartbeatClientPoolFactory());
3937
}
4038

4139
public void getAINodeHeartBeat(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
package org.apache.iotdb.confignode.client.async;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2324
import org.apache.iotdb.commons.client.IClientManager;
2425
import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
2526
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
2627
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
27-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
2828

2929
public class AsyncConfigNodeHeartbeatClientPool {
3030

@@ -34,8 +34,7 @@ private AsyncConfigNodeHeartbeatClientPool() {
3434
clientManager =
3535
new IClientManager.Factory<TEndPoint, AsyncConfigNodeInternalServiceClient>()
3636
.createClientManager(
37-
new ConfigNodeClientManager.ClientPoolFactory
38-
.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
37+
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
3938
}
4039

4140
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
package org.apache.iotdb.confignode.client.async;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2324
import org.apache.iotdb.commons.client.IClientManager;
2425
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
2526
import org.apache.iotdb.confignode.client.async.handlers.audit.DataNodeWriteAuditLogHandler;
2627
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
27-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
2828
import org.apache.iotdb.mpp.rpc.thrift.TAuditLogReq;
2929
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
3030

@@ -37,8 +37,7 @@ private AsyncDataNodeHeartbeatClientPool() {
3737
clientManager =
3838
new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
3939
.createClientManager(
40-
new ConfigNodeClientManager.ClientPoolFactory
41-
.AsyncDataNodeHeartbeatServiceClientPoolFactory());
40+
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
4241
}
4342

4443
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
2525
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2626
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
27+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2728
import org.apache.iotdb.commons.client.IClientManager;
2829
import org.apache.iotdb.commons.client.exception.ClientManagerException;
2930
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
3031
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
3132
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
3233
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
33-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
3434
import org.apache.iotdb.rpc.RpcUtils;
3535
import org.apache.iotdb.rpc.TSStatusCode;
3636

@@ -54,9 +54,7 @@ public class SyncConfigNodeClientPool {
5454
private SyncConfigNodeClientPool() {
5555
clientManager =
5656
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
57-
.createClientManager(
58-
new ConfigNodeClientManager.ClientPoolFactory
59-
.SyncConfigNodeIServiceClientPoolFactory());
57+
.createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
6058
configNodeLeader = new TEndPoint();
6159
}
6260

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2525
import org.apache.iotdb.common.rpc.thrift.TSStatus;
26+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2627
import org.apache.iotdb.commons.client.IClientManager;
2728
import org.apache.iotdb.commons.client.exception.ClientManagerException;
2829
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
2930
import org.apache.iotdb.commons.exception.UncheckedStartupException;
30-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
3131
import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
3232
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
3333
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
@@ -73,8 +73,7 @@ private SyncDataNodeClientPool() {
7373
clientManager =
7474
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
7575
.createClientManager(
76-
new ConfigNodeClientManager.ClientPoolFactory
77-
.SyncDataNodeInternalServiceClientPoolFactory());
76+
new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
7877
buildActionMap();
7978
checkActionMapCompleteness();
8079
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
2828
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2929
import org.apache.iotdb.common.rpc.thrift.TSStatus;
30+
import org.apache.iotdb.commons.client.ClientPoolFactory;
3031
import org.apache.iotdb.commons.client.IClientManager;
3132
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
3233
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -44,7 +45,6 @@
4445
import org.apache.iotdb.confignode.manager.ConfigManager;
4546
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
4647
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
47-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
4848
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
4949
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
5050
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
@@ -84,8 +84,7 @@ public RegionMaintainHandler(ConfigManager configManager) {
8484
dataNodeClientManager =
8585
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
8686
.createClientManager(
87-
new ConfigNodeClientManager.ClientPoolFactory
88-
.SyncDataNodeInternalServiceClientPoolFactory());
87+
new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
8988
}
9089

9190
public static String getIdWithRpcEndpoint(TDataNodeLocation location) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.commons.audit.UserEntity;
24+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2425
import org.apache.iotdb.commons.client.IClientManager;
2526
import org.apache.iotdb.commons.client.ThriftClient;
2627
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
@@ -34,7 +35,6 @@
3435
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3536
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
3637
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
37-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
3838
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
3939
import org.apache.iotdb.pipe.api.exception.PipeException;
4040
import org.apache.iotdb.rpc.TSStatusCode;
@@ -134,10 +134,9 @@ public IoTDBDataNodeAsyncClientManager(
134134
new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
135135
.createClientManager(
136136
isTSFileUsed
137-
? new ConfigNodeClientManager.ClientPoolFactory
137+
? new ClientPoolFactory
138138
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
139-
: new ConfigNodeClientManager.ClientPoolFactory
140-
.AsyncPipeDataTransferServiceClientPoolFactory()));
139+
: new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
141140
}
142141
endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
143142

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,32 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.commons.client.ClientManager;
24+
import org.apache.iotdb.commons.client.ClientManagerMetrics;
25+
import org.apache.iotdb.commons.client.IClientPoolFactory;
2426
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
27+
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
2528
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
29+
import org.apache.iotdb.commons.concurrent.ThreadName;
30+
import org.apache.iotdb.commons.conf.CommonConfig;
31+
import org.apache.iotdb.commons.conf.CommonDescriptor;
2632
import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
33+
import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient;
2734

2835
import org.apache.commons.pool2.PooledObject;
2936
import org.apache.commons.pool2.impl.DefaultPooledObject;
37+
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
3038

3139
import java.util.Optional;
3240

33-
/**
34-
* A dedicated Thrift client factory for AINodeClient. This removes AINode pooling/creation from the
35-
* generic ClientPoolFactory to avoid cross-node client mixing.
36-
*/
41+
/** Dedicated factory for AINodeClient + AINodeClientPoolFactory. */
3742
public class AINodeClientFactory extends ThriftClientFactory<TEndPoint, AINodeClient> {
3843

44+
private static final int connectionTimeout =
45+
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
46+
3947
public AINodeClientFactory(
40-
ClientManager<TEndPoint, AINodeClient> clientManager,
41-
ThriftClientProperty thriftClientProperty) {
42-
super(clientManager, thriftClientProperty);
48+
ClientManager<TEndPoint, AINodeClient> manager, ThriftClientProperty thriftProperty) {
49+
super(manager, thriftProperty);
4350
}
4451

4552
@Override
@@ -59,4 +66,68 @@ public boolean validateObject(TEndPoint key, PooledObject<AINodeClient> pooledOb
5966
.map(org.apache.thrift.transport.TTransport::isOpen)
6067
.orElse(false);
6168
}
69+
70+
/** The PoolFactory originally inside ClientPoolFactory — now moved here. */
71+
public static class AINodeClientPoolFactory
72+
implements IClientPoolFactory<TEndPoint, AINodeClient> {
73+
74+
@Override
75+
public GenericKeyedObjectPool<TEndPoint, AINodeClient> createClientPool(
76+
ClientManager<TEndPoint, AINodeClient> manager) {
77+
78+
// Build thrift client properties
79+
ThriftClientProperty thriftProperty =
80+
new ThriftClientProperty.Builder()
81+
.setConnectionTimeoutMs(connectionTimeout)
82+
.setRpcThriftCompressionEnabled(
83+
CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled())
84+
.build();
85+
86+
GenericKeyedObjectPool<TEndPoint, AINodeClient> pool =
87+
new GenericKeyedObjectPool<>(
88+
new AINodeClientFactory(manager, thriftProperty),
89+
new ClientPoolProperty.Builder<AINodeClient>()
90+
.setMaxClientNumForEachNode(
91+
CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode())
92+
.build()
93+
.getConfig());
94+
95+
ClientManagerMetrics.getInstance()
96+
.registerClientManager(this.getClass().getSimpleName(), pool);
97+
98+
return pool;
99+
}
100+
}
101+
102+
public static class AINodeHeartbeatClientPoolFactory
103+
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
104+
105+
@Override
106+
public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> createClientPool(
107+
ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) {
108+
109+
final CommonConfig conf = CommonDescriptor.getInstance().getConfig();
110+
111+
GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> clientPool =
112+
new GenericKeyedObjectPool<>(
113+
new AsyncAINodeServiceClient.Factory(
114+
manager,
115+
new ThriftClientProperty.Builder()
116+
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
117+
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
118+
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
119+
.setPrintLogWhenEncounterException(false)
120+
.build(),
121+
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
122+
new ClientPoolProperty.Builder<AsyncAINodeServiceClient>()
123+
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
124+
.build()
125+
.getConfig());
126+
127+
ClientManagerMetrics.getInstance()
128+
.registerClientManager(this.getClass().getSimpleName(), clientPool);
129+
130+
return clientPool;
131+
}
132+
}
62133
}

0 commit comments

Comments
 (0)