Skip to content

Commit f8f0d54

Browse files
[AINode] Refix AINodeClientManager
1 parent 18cb394 commit f8f0d54

File tree

11 files changed

+73
-54
lines changed

11 files changed

+73
-54
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2424
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2526
import org.apache.iotdb.commons.client.IClientManager;
2627
import org.apache.iotdb.commons.client.exception.ClientManagerException;
2728
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -181,7 +182,7 @@ protected void initEnvironment(
181182

182183
clientManager =
183184
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
184-
.createClientManager(new ConfigNodeClientManager.ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
185+
.createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
185186

186187
final String testClassName = getTestClassName();
187188

integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
package org.apache.iotdb.it.env.remote.env;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientPoolFactory;
2324
import org.apache.iotdb.commons.client.IClientManager;
2425
import org.apache.iotdb.commons.client.exception.ClientManagerException;
2526
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
2627
import org.apache.iotdb.commons.cluster.NodeStatus;
2728
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
28-
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
2929
import org.apache.iotdb.isession.ISession;
3030
import org.apache.iotdb.isession.ITableSession;
3131
import org.apache.iotdb.isession.SessionConfig;
@@ -84,7 +84,7 @@ public void initClusterEnvironment() {
8484
}
8585
clientManager =
8686
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
87-
.createClientManager(new ConfigNodeClientManager.ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
87+
.createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
8888
}
8989

9090
@Override

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,11 @@ private AINodeClient getAINodeClient() throws NoAvailableAINodeException, Client
232232
}
233233
TEndPoint targetAINodeEndPoint =
234234
new TEndPoint(aiNodeInfo.get(0).getInternalAddress(), aiNodeInfo.get(0).getInternalPort());
235-
return AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint);
235+
try {
236+
return AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint);
237+
} catch (Exception e) {
238+
throw new RuntimeException(e);
239+
}
236240
}
237241

238242
public List<Integer> getModelDistributions(String modelName) {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public TGetAINodeLocationResp getAINodeLocation() throws TException {
660660
}
661661

662662
final TAINodeConfiguration cfg = registeredAINodes.get(0);
663-
final TAINodeLocation loc = (cfg == null) ? null : cfg.getLocation();
663+
final TAINodeLocation loc = cfg.getLocation();
664664

665665
boolean hasEndpoint = false;
666666
if (loc != null) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.io.IOException;
7575
import java.util.Map;
7676
import java.util.Optional;
77+
import java.util.concurrent.atomic.AtomicReference;
7778

7879
import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE;
7980
import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
@@ -107,8 +108,7 @@ private interface RemoteCall<R> {
107108
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
108109
ConfigNodeClientManager.getInstance();
109110

110-
private static final java.util.concurrent.atomic.AtomicReference<TAINodeLocation>
111-
CURRENT_LOCATION = new java.util.concurrent.atomic.AtomicReference<>();
111+
private static final AtomicReference<TAINodeLocation> CURRENT_LOCATION = new AtomicReference<>();
112112

113113
public static TEndPoint getCurrentEndpoint() {
114114
TAINodeLocation loc = CURRENT_LOCATION.get();
@@ -145,7 +145,7 @@ private <R> R executeRemoteCallWithRetry(RemoteCall<R> call) throws TException {
145145
this.endPoint = pickEndpointFrom(loc);
146146
}
147147
try {
148-
Thread.sleep(100L * attempt);
148+
Thread.sleep(1000L * attempt);
149149
} catch (InterruptedException ie) {
150150
Thread.currentThread().interrupt();
151151
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,52 @@
2424
import org.apache.iotdb.db.protocol.client.AINodeClientFactory;
2525

2626
public class AINodeClientManager {
27-
private AINodeClientManager() {
28-
// Empty constructor
29-
}
3027

31-
private static final class AINodeClientManagerHolder {
32-
private static final IClientManager<TEndPoint, AINodeClient> INSTANCE =
28+
public static final int DEFAULT_AINODE_ID = 0;
29+
30+
private static final AINodeClientManager INSTANCE = new AINodeClientManager();
31+
32+
private final IClientManager<TEndPoint, AINodeClient> clientManager;
33+
34+
private volatile TEndPoint defaultAINodeEndPoint;
35+
36+
private AINodeClientManager() {
37+
this.clientManager =
3338
new IClientManager.Factory<TEndPoint, AINodeClient>()
3439
.createClientManager(new AINodeClientFactory.AINodeClientPoolFactory());
3540
}
3641

37-
public static IClientManager<TEndPoint, AINodeClient> getInstance() {
38-
return AINodeClientManagerHolder.INSTANCE;
42+
public static AINodeClientManager getInstance() {
43+
return INSTANCE;
44+
}
45+
46+
public void updateDefaultAINodeLocation(TEndPoint endPoint) {
47+
this.defaultAINodeEndPoint = endPoint;
48+
}
49+
50+
public AINodeClient borrowClient(TEndPoint endPoint) throws Exception {
51+
return clientManager.borrowClient(endPoint);
52+
}
53+
54+
public AINodeClient borrowClient(int aiNodeId) throws Exception {
55+
if (aiNodeId != DEFAULT_AINODE_ID) {
56+
throw new IllegalArgumentException("Unsupported AINodeId: " + aiNodeId);
57+
}
58+
if (defaultAINodeEndPoint == null) {
59+
defaultAINodeEndPoint = AINodeClient.getCurrentEndpoint();
60+
}
61+
return clientManager.borrowClient(defaultAINodeEndPoint);
62+
}
63+
64+
public void clear(TEndPoint endPoint) {
65+
clientManager.clear(endPoint);
66+
}
67+
68+
public void clearAll() {
69+
clientManager.close();
70+
}
71+
72+
public IClientManager<TEndPoint, AINodeClient> getRawClientManager() {
73+
return clientManager;
3974
}
4075
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
3333
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
3434
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
35-
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
3635
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
3736
import org.apache.iotdb.common.rpc.thrift.TSStatus;
3837
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -45,7 +44,6 @@
4544
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
4645
import org.apache.iotdb.common.rpc.thrift.TThrottleQuota;
4746
import org.apache.iotdb.commons.client.IClientManager;
48-
import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
4947
import org.apache.iotdb.commons.client.exception.ClientManagerException;
5048
import org.apache.iotdb.commons.cluster.NodeStatus;
5149
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -376,9 +374,6 @@
376374

377375
public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
378376

379-
// NOTE: AINode location is now maintained globally inside AINodeClient.
380-
// We only resolve via ConfigNode when needed, then publish it back to AINodeClient.
381-
382377
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
383378

384379
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
@@ -3611,8 +3606,8 @@ public SettableFuture<ConfigTaskResult> dropModel(final String modelId) {
36113606
@Override
36123607
public SettableFuture<ConfigTaskResult> showModels(final String modelId) {
36133608
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3614-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3615-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3609+
try (final AINodeClient ai =
3610+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
36163611
final TShowModelsReq req = new TShowModelsReq();
36173612
if (modelId != null) {
36183613
req.setModelId(modelId);
@@ -3632,8 +3627,8 @@ public SettableFuture<ConfigTaskResult> showModels(final String modelId) {
36323627
@Override
36333628
public SettableFuture<ConfigTaskResult> showLoadedModels(List<String> deviceIdList) {
36343629
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3635-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3636-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3630+
try (final AINodeClient ai =
3631+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
36373632
final TShowLoadedModelsReq req = new TShowLoadedModelsReq();
36383633
req.setDeviceIdList(deviceIdList != null ? deviceIdList : new ArrayList<>());
36393634
final TShowLoadedModelsResp resp = ai.showLoadedModels(req);
@@ -3651,8 +3646,8 @@ public SettableFuture<ConfigTaskResult> showLoadedModels(List<String> deviceIdLi
36513646
@Override
36523647
public SettableFuture<ConfigTaskResult> showAIDevices() {
36533648
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3654-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3655-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3649+
try (final AINodeClient ai =
3650+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
36563651
final TShowAIDevicesResp resp = ai.showAIDevices();
36573652
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
36583653
future.setException(new IoTDBException(resp.getStatus()));
@@ -3669,12 +3664,8 @@ public SettableFuture<ConfigTaskResult> showAIDevices() {
36693664
public SettableFuture<ConfigTaskResult> loadModel(
36703665
String existingModelId, List<String> deviceIdList) {
36713666
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3672-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3673-
if (ep == null) {
3674-
future.setException(new BorrowNullClientManagerException());
3675-
return future;
3676-
}
3677-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3667+
try (final AINodeClient ai =
3668+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
36783669
final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList);
36793670
final TSStatus result = ai.loadModel(req);
36803671
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
@@ -3692,12 +3683,8 @@ public SettableFuture<ConfigTaskResult> loadModel(
36923683
public SettableFuture<ConfigTaskResult> unloadModel(
36933684
String existingModelId, List<String> deviceIdList) {
36943685
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3695-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3696-
if (ep == null) {
3697-
future.setException(new BorrowNullClientManagerException());
3698-
return future;
3699-
}
3700-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3686+
try (final AINodeClient ai =
3687+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
37013688
final TUnloadModelReq req = new TUnloadModelReq(existingModelId, deviceIdList);
37023689
final TSStatus result = ai.unloadModel(req);
37033690
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
@@ -3721,8 +3708,8 @@ public SettableFuture<ConfigTaskResult> createTraining(
37213708
@Nullable String targetSql,
37223709
@Nullable List<String> pathList) {
37233710
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
3724-
final TEndPoint ep = AINodeClient.getCurrentEndpoint();
3725-
try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) {
3711+
try (final AINodeClient ai =
3712+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID)) {
37263713
final TTrainingReq req = new TTrainingReq();
37273714
req.setModelId(modelId);
37283715
req.setParameters(parameters);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24-
import org.apache.iotdb.commons.client.IClientManager;
2524
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2625
import org.apache.iotdb.db.exception.sql.SemanticException;
2726
import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
@@ -457,8 +456,7 @@ private static Map<String, String> parseOptions(String options) {
457456
private static class ForecastDataProcessor implements TableFunctionDataProcessor {
458457

459458
private static final TsBlockSerde SERDE = new TsBlockSerde();
460-
private static final IClientManager<TEndPoint, AINodeClient> CLIENT_MANAGER =
461-
AINodeClientManager.getInstance();
459+
private static final AINodeClientManager CLIENT_MANAGER = AINodeClientManager.getInstance();
462460

463461
private final TEndPoint targetAINode;
464462
private final String modelId;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24-
import org.apache.iotdb.commons.client.IClientManager;
2524
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2625
import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
2726
import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
@@ -55,8 +54,7 @@
5554

5655
public class UDTFForecast implements UDTF {
5756
private static final TsBlockSerde serde = new TsBlockSerde();
58-
private static final IClientManager<TEndPoint, AINodeClient> CLIENT_MANAGER =
59-
AINodeClientManager.getInstance();
57+
private static final AINodeClientManager CLIENT_MANAGER = AINodeClientManager.getInstance();
6058
private TEndPoint targetAINode = new TEndPoint("127.0.0.1", 10810);
6159
private String model_id;
6260
private int maxInputLength;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/iotv2/container/IoTV2GlobalComponentContainer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
package org.apache.iotdb.commons.consensus.iotv2.container;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23-
import org.apache.iotdb.commons.client.ClientPoolFactory;
23+
import org.apache.iotdb.commons.client.ClientPoolFactory.AsyncPipeConsensusServiceClientPoolFactory;
24+
import org.apache.iotdb.commons.client.ClientPoolFactory.SyncPipeConsensusServiceClientPoolFactory;
2425
import org.apache.iotdb.commons.client.IClientManager;
2526
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
2627
import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
@@ -64,12 +65,10 @@ private IoTV2GlobalComponentContainer() {
6465
.build();
6566
this.asyncClientManager =
6667
new IClientManager.Factory<TEndPoint, AsyncPipeConsensusServiceClient>()
67-
.createClientManager(
68-
new ClientPoolFactory.AsyncPipeConsensusServiceClientPoolFactory(config));
68+
.createClientManager(new AsyncPipeConsensusServiceClientPoolFactory(config));
6969
this.syncClientManager =
7070
new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
71-
.createClientManager(
72-
new ClientPoolFactory.SyncPipeConsensusServiceClientPoolFactory(config));
71+
.createClientManager(new SyncPipeConsensusServiceClientPoolFactory(config));
7372
this.backgroundTaskService =
7473
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
7574
ThreadName.PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR.getName());

0 commit comments

Comments
 (0)