From 72e766a3e98c381687a3590251ea035f9877f70e Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Thu, 23 Oct 2025 20:12:17 +0800 Subject: [PATCH 01/14] [AINode] Update AINodeClient and ClusterConfigTaskExecutor --- .../executor/ClusterConfigTaskExecutor.java | 12 ++++-- .../commons/client/ainode/AINodeClient.java | 39 ++++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index aab631825e328..d942367d16848 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -36,6 +38,8 @@ import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TThrottleQuota; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.ainode.AINodeClient; +import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -129,7 +133,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; -import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp; @@ -3669,8 +3672,9 @@ public SettableFuture showAIDevices() { public SettableFuture loadModel( String existingModelId, List deviceIdList) { final SettableFuture future = SettableFuture.create(); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TEndPoint AINODE_KEY_PLACEHOLDER = new TEndPoint("AINODE_KEY", 0); + try (final AINodeClient client = + AINodeClientManager.getInstance().borrowClient(AINODE_KEY_PLACEHOLDER)) { final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList); final TSStatus result = client.loadModel(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { @@ -3678,7 +3682,7 @@ public SettableFuture loadModel( } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final ClientManagerException | TException e) { + } catch (final Exception e) { future.setException(e); } return future; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java index 0058bc7a2fc64..03eb4fbfd6132 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java @@ -86,6 +86,12 @@ public class AINodeClient implements AutoCloseable, ThriftClient { public static final String MSG_CONNECTION_FAIL = "Fail to connect to AINode. Please check status of AINode"; + private static final int MAX_RETRY = 3; + + @FunctionalInterface + private interface RemoteCall { + R apply(IAINodeRPCService.Client c) throws TException; + } private final TsBlockSerde tsBlockSerde = new TsBlockSerde(); @@ -102,6 +108,29 @@ public AINodeClient( init(); } + private R executeRemoteCallWithRetry(RemoteCall call) throws TException { + TException last = null; + for (int i = 0; i < MAX_RETRY; i++) { + try { + if (transport == null || !transport.isOpen()) { + init(); + } + return call.apply(client); + } catch (TException e) { + last = e; + try { + close(); + } catch (Exception ignore) { + // ignore + } + } + } + if (last != null) { + throw last; + } + throw new TException(MSG_CONNECTION_FAIL); + } + private void init() throws TException { try { if (commonConfig.isEnableInternalSSL()) { @@ -201,15 +230,7 @@ public TSStatus deleteModel(String modelId) throws TException { } public TSStatus loadModel(TLoadModelReq req) throws TException { - try { - return client.loadModel(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.loadModel(req)); } public TSStatus unloadModel(TUnloadModelReq req) throws TException { From 81bb2727cfc35a4097a91369f9d3f31a6885b2c8 Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Thu, 30 Oct 2025 14:54:11 +0800 Subject: [PATCH 02/14] [AINode] Update ClusterConfigTaskExecutor for datanode to fetch AINodeLocation --- .../executor/ClusterConfigTaskExecutor.java | 75 +++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d942367d16848..807fd25118e72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -123,6 +123,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; @@ -365,6 +367,10 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -377,6 +383,47 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { + private static final AtomicReference CACHED_AINODE = new AtomicReference<>(); + private static final ReentrantLock AINODE_REFRESH_LOCK = new ReentrantLock(); + private static final long AINODE_TTL_NANOS = TimeUnit.SECONDS.toNanos(3); + private static final AtomicLong AINODE_LAST_REFRESH = new AtomicLong(0L); + + /** Return cached AINode if fresh; otherwise refresh from ConfigNode using modelId. */ + private TEndPoint resolveAINodeEp(String modelIdOrNull) { + final TEndPoint ep = CACHED_AINODE.get(); + if (ep != null && System.nanoTime() - AINODE_LAST_REFRESH.get() < AINODE_TTL_NANOS) { + return ep; + } + return refreshAINodeFromConfigNode(modelIdOrNull); + } + + /** Ask ConfigNode for the latest AINode location (precise by modelId when available). */ + private TEndPoint refreshAINodeFromConfigNode(String modelIdOrNull) { + if (!AINODE_REFRESH_LOCK.tryLock()) { + return CACHED_AINODE.get(); + } + try (ConfigNodeClient cn = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TEndPoint ep = null; + if (modelIdOrNull != null && !modelIdOrNull.isEmpty()) { + final TGetModelInfoResp resp = cn.getModelInfo(new TGetModelInfoReq(modelIdOrNull)); + if (resp != null && resp.isSetAiNodeAddress()) { + ep = resp.getAiNodeAddress(); + } + } + // if no modelId or location return,use showAIDevices to fetch an AINode + if (ep != null) { + CACHED_AINODE.set(ep); + AINODE_LAST_REFRESH.set(System.nanoTime()); + } + return CACHED_AINODE.get(); + } catch (Exception e) { + return CACHED_AINODE.get(); + } finally { + AINODE_REFRESH_LOCK.unlock(); + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class); private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = @@ -3672,18 +3719,34 @@ public SettableFuture showAIDevices() { public SettableFuture loadModel( String existingModelId, List deviceIdList) { final SettableFuture future = SettableFuture.create(); - final TEndPoint AINODE_KEY_PLACEHOLDER = new TEndPoint("AINODE_KEY", 0); - try (final AINodeClient client = - AINodeClientManager.getInstance().borrowClient(AINODE_KEY_PLACEHOLDER)) { + // 1) Try direct DataNode → AINode with cached endpoint + TEndPoint ep = resolveAINodeEp(existingModelId); + try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(ep)) { final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList); - final TSStatus result = client.loadModel(req); + final TSStatus result = ai.loadModel(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { future.setException(new IoTDBException(result)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (final Exception e) { - future.setException(e); + } catch (final Exception first) { + // 2) Fallback: ask ConfigNode for latest AINode location, update cache and retry once + final TEndPoint refreshed = refreshAINodeFromConfigNode(existingModelId); + if (refreshed == null || (ep != null && refreshed.equals(ep))) { + future.setException(first); + return future; + } + try (final AINodeClient ai = AINodeClientManager.getInstance().borrowClient(refreshed)) { + final TLoadModelReq req = new TLoadModelReq(existingModelId, deviceIdList); + final TSStatus result = ai.loadModel(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { + future.setException(new IoTDBException(result)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final Exception second) { + future.setException(second); + } } return future; } From d35b32fed40be75dc336504cf7a69d7be1bce301 Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Fri, 31 Oct 2025 16:56:32 +0800 Subject: [PATCH 03/14] [AINode] Update apis which have modelId # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java --- .../commons/client/ainode/AINodeClient.java | 65 +++---------------- 1 file changed, 9 insertions(+), 56 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java index 03eb4fbfd6132..4110b505e76b9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java @@ -218,15 +218,8 @@ private ModelInformation parseModelInformation( } public TSStatus deleteModel(String modelId) throws TException { - try { - return client.deleteModel(new TDeleteModelReq(modelId)); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + final TDeleteModelReq req = new TDeleteModelReq(modelId); + return executeRemoteCallWithRetry(c -> c.deleteModel(req)); } public TSStatus loadModel(TLoadModelReq req) throws TException { @@ -234,51 +227,19 @@ public TSStatus loadModel(TLoadModelReq req) throws TException { } public TSStatus unloadModel(TUnloadModelReq req) throws TException { - try { - return client.unloadModel(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.unloadModel(req)); } public TShowModelsResp showModels(TShowModelsReq req) throws TException { - try { - return client.showModels(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.showModels(req)); } public TShowLoadedModelsResp showLoadedModels(TShowLoadedModelsReq req) throws TException { - try { - return client.showLoadedModels(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.showLoadedModels(req)); } public TShowAIDevicesResp showAIDevices() throws TException { - try { - return client.showAIDevices(); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode from ConfigNode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(IAINodeRPCService.Client::showAIDevices); } public TInferenceResp inference( @@ -295,7 +256,7 @@ public TInferenceResp inference( if (inferenceAttributes != null) { inferenceReq.setInferenceAttributes(inferenceAttributes); } - return client.inference(inferenceReq); + return executeRemoteCallWithRetry(c -> c.inference(inferenceReq)); } catch (IOException e) { throw new TException("An exception occurred while serializing input data", e); } catch (TException e) { @@ -313,7 +274,7 @@ public TForecastResp forecast( TForecastReq forecastReq = new TForecastReq(modelId, tsBlockSerde.serialize(inputTsBlock), outputLength); forecastReq.setOptions(options); - return client.forecast(forecastReq); + return executeRemoteCallWithRetry(c -> c.forecast(forecastReq)); } catch (IOException e) { TSStatus tsStatus = new TSStatus(INTERNAL_SERVER_ERROR.getStatusCode()); tsStatus.setMessage(String.format("Failed to serialize input tsblock %s", e.getMessage())); @@ -329,15 +290,7 @@ public TForecastResp forecast( } public TSStatus createTrainingTask(TTrainingReq req) throws TException { - try { - return client.createTrainingTask(req); - } catch (TException e) { - logger.warn( - "Failed to connect to AINode when executing {}: {}", - Thread.currentThread().getStackTrace()[1].getMethodName(), - e.getMessage()); - throw new TException(MSG_CONNECTION_FAIL); - } + return executeRemoteCallWithRetry(c -> c.createTrainingTask(req)); } @Override From 0fe9f1176f5c1427029acf81fdee78749be94ed1 Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Fri, 31 Oct 2025 17:07:54 +0800 Subject: [PATCH 04/14] revert --- .../executor/ClusterConfigTaskExecutor.java | 8596 ++++++++--------- 1 file changed, 4267 insertions(+), 4329 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 807fd25118e72..d2477e07ed1f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -79,10 +79,8 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.commons.utils.SerializeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TAINodeRemoveReq; -import org.apache.iotdb.confignode.rpc.thrift.TAlterEncodingCompressorReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; @@ -251,7 +249,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCluster; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterEncodingCompressorStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; @@ -383,4382 +380,4323 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { - private static final AtomicReference CACHED_AINODE = new AtomicReference<>(); - private static final ReentrantLock AINODE_REFRESH_LOCK = new ReentrantLock(); - private static final long AINODE_TTL_NANOS = TimeUnit.SECONDS.toNanos(3); - private static final AtomicLong AINODE_LAST_REFRESH = new AtomicLong(0L); - - /** Return cached AINode if fresh; otherwise refresh from ConfigNode using modelId. */ - private TEndPoint resolveAINodeEp(String modelIdOrNull) { - final TEndPoint ep = CACHED_AINODE.get(); - if (ep != null && System.nanoTime() - AINODE_LAST_REFRESH.get() < AINODE_TTL_NANOS) { - return ep; - } - return refreshAINodeFromConfigNode(modelIdOrNull); - } - - /** Ask ConfigNode for the latest AINode location (precise by modelId when available). */ - private TEndPoint refreshAINodeFromConfigNode(String modelIdOrNull) { - if (!AINODE_REFRESH_LOCK.tryLock()) { - return CACHED_AINODE.get(); - } - try (ConfigNodeClient cn = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TEndPoint ep = null; - if (modelIdOrNull != null && !modelIdOrNull.isEmpty()) { - final TGetModelInfoResp resp = cn.getModelInfo(new TGetModelInfoReq(modelIdOrNull)); - if (resp != null && resp.isSetAiNodeAddress()) { - ep = resp.getAiNodeAddress(); - } - } - // if no modelId or location return,use showAIDevices to fetch an AINode - if (ep != null) { - CACHED_AINODE.set(ep); - AINODE_LAST_REFRESH.set(System.nanoTime()); - } - return CACHED_AINODE.get(); - } catch (Exception e) { - return CACHED_AINODE.get(); - } finally { - AINODE_REFRESH_LOCK.unlock(); - } - } - - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class); - - private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = - ConfigNodeClientManager.getInstance(); - - /** FIXME Consolidate this clientManager with the upper one. */ - private static final IClientManager - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER = - new IClientManager.Factory() - .createClientManager( - new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory()); - - private static final class ClusterConfigTaskExecutorHolder { - - private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor(); - - private ClusterConfigTaskExecutorHolder() {} - } - - private static final SettableFuture SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE; - - static { - SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create(); - SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException( - new IoTDBException( - "Subscription not enabled, please set config `subscription_enabled` to true.", - TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode())); - } - - public static ClusterConfigTaskExecutor getInstance() { - return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE; - } - - @Override - public SettableFuture setDatabase( - final DatabaseSchemaStatement databaseSchemaStatement) { - final SettableFuture future = SettableFuture.create(); - - final String databaseName = databaseSchemaStatement.getDatabasePath().getFullPath(); - if (databaseName.length() > MAX_DATABASE_NAME_LENGTH - || TsFileConstant.PATH_ROOT.equals(databaseName)) { - final IllegalPathException illegalPathException = - new IllegalPathException( - databaseName, - TsFileConstant.PATH_ROOT.equals(databaseName) - ? "the database name in tree model must start with 'root.'." - : "the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH); - future.setException( - new IoTDBException( - illegalPathException.getMessage(), illegalPathException.getErrorCode())); - return future; - } - - // Construct request using statement - final TDatabaseSchema databaseSchema = - DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement); - try (final ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // Send request to some API server - final TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema); - // Get response or throw exception - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - // If database already exists when loading, we do not throw exceptions to avoid printing too - // many logs - if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode() - && !databaseSchemaStatement.getEnablePrintExceptionLog()) { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + private static final AtomicReference CACHED_AINODE = new AtomicReference<>(); + private static final ReentrantLock AINODE_REFRESH_LOCK = new ReentrantLock(); + private static final long AINODE_TTL_NANOS = TimeUnit.SECONDS.toNanos(3); + private static final AtomicLong AINODE_LAST_REFRESH = new AtomicLong(0L); + + /** Return cached AINode if fresh; otherwise refresh from ConfigNode using modelId. */ + private TEndPoint resolveAINodeEp(String modelIdOrNull) { + final TEndPoint ep = CACHED_AINODE.get(); + if (ep != null && System.nanoTime() - AINODE_LAST_REFRESH.get() < AINODE_TTL_NANOS) { + return ep; + } + return refreshAINodeFromConfigNode(modelIdOrNull); + } + + /** Ask ConfigNode for the latest AINode location (precise by modelId when available). */ + private TEndPoint refreshAINodeFromConfigNode(String modelIdOrNull) { + if (!AINODE_REFRESH_LOCK.tryLock()) { + return CACHED_AINODE.get(); + } + try (ConfigNodeClient cn = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TEndPoint ep = null; + if (modelIdOrNull != null && !modelIdOrNull.isEmpty()) { + final TGetModelInfoResp resp = cn.getModelInfo(new TGetModelInfoReq(modelIdOrNull)); + if (resp != null && resp.isSetAiNodeAddress()) { + ep = resp.getAiNodeAddress(); + } + } + // if no modelId or location return,use showAIDevices to fetch an AINode + if (ep != null) { + CACHED_AINODE.set(ep); + AINODE_LAST_REFRESH.set(System.nanoTime()); + } + return CACHED_AINODE.get(); + } catch (Exception e) { + return CACHED_AINODE.get(); + } finally { + AINODE_REFRESH_LOCK.unlock(); + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class); + + private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = + ConfigNodeClientManager.getInstance(); + + /** FIXME Consolidate this clientManager with the upper one. */ + private static final IClientManager + CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER = + new IClientManager.Factory() + .createClientManager( + new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory()); + + private static final class ClusterConfigTaskExecutorHolder { + + private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor(); + + private ClusterConfigTaskExecutorHolder() {} + } + + private static final SettableFuture SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE; + + static { + SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create(); + SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException( + new IoTDBException( + "Subscription not enabled, please set config `subscription_enabled` to true.", + TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode())); + } + + public static ClusterConfigTaskExecutor getInstance() { + return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE; + } + + @Override + public SettableFuture setDatabase( + final DatabaseSchemaStatement databaseSchemaStatement) { + final SettableFuture future = SettableFuture.create(); + + final String databaseName = databaseSchemaStatement.getDatabasePath().getFullPath(); + if (databaseName.length() > MAX_DATABASE_NAME_LENGTH + || TsFileConstant.PATH_ROOT.equals(databaseName)) { + final IllegalPathException illegalPathException = + new IllegalPathException( + databaseName, + TsFileConstant.PATH_ROOT.equals(databaseName) + ? "the database name in tree model must start with 'root.'." + : "the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH); + future.setException( + new IoTDBException( + illegalPathException.getMessage(), illegalPathException.getErrorCode())); + return future; + } + + // Construct request using statement + final TDatabaseSchema databaseSchema = + DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement); + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + final TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema); + // Get response or throw exception + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + // If database already exists when loading, we do not throw exceptions to avoid printing too + // many logs + if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode() + && !databaseSchemaStatement.getEnablePrintExceptionLog()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + LOGGER.warn( + "Failed to execute create database {} in config node, status is {}.", + databaseSchemaStatement.getDatabasePath(), + tsStatus); + future.setException(new IoTDBException(tsStatus)); + } + + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture alterDatabase( + final DatabaseSchemaStatement databaseSchemaStatement) { + final SettableFuture future = SettableFuture.create(); + // Construct request using statement + final TDatabaseSchema databaseSchema = + DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement); + databaseSchema.setIsTableModel(false); + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + final TSStatus tsStatus = configNodeClient.alterDatabase(databaseSchema); + // Get response or throw exception + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + if (databaseSchemaStatement.getEnablePrintExceptionLog()) { + LOGGER.warn( + "Failed to execute alter database {} in config node, status is {}.", + databaseSchemaStatement.getDatabasePath(), + tsStatus); + } + future.setException(new IoTDBException(tsStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showDatabase( + final ShowDatabaseStatement showDatabaseStatement) { + final SettableFuture future = SettableFuture.create(); + // Construct request using statement + final List databasePathPattern = + Arrays.asList(showDatabaseStatement.getPathPattern().getNodes()); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + final TGetDatabaseReq req = + new TGetDatabaseReq( + databasePathPattern, showDatabaseStatement.getAuthorityScope().serialize()) + .setIsTableModel(false) + .setCanSeeAuditDB(showDatabaseStatement.isCanSeeAuditDB()); + final TShowDatabaseResp resp = client.showDatabase(req); + // build TSBlock + showDatabaseStatement.buildTSBlock(resp.getDatabaseInfoMap(), future); + } catch (final IOException | ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture countDatabase( + CountDatabaseStatement countDatabaseStatement) { + SettableFuture future = SettableFuture.create(); + int databaseNum; + List databasePathPattern = + Arrays.asList(countDatabaseStatement.getPathPattern().getNodes()); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TGetDatabaseReq req = + new TGetDatabaseReq( + databasePathPattern, countDatabaseStatement.getAuthorityScope().serialize()) + .setCanSeeAuditDB(countDatabaseStatement.isCanSeeAuditDB()); + TCountDatabaseResp resp = client.countMatchedDatabases(req); + databaseNum = resp.getCount(); + // build TSBlock + CountDatabaseTask.buildTSBlock(databaseNum, future); + } catch (IOException | ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture deleteDatabase( + final DeleteDatabaseStatement deleteDatabaseStatement) { + final SettableFuture future = SettableFuture.create(); + final TDeleteDatabasesReq req = + new TDeleteDatabasesReq(deleteDatabaseStatement.getPrefixPath()).setIsTableModel(false); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus tsStatus = client.deleteDatabases(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Failed to execute delete database {} in config node, status is {}.", + deleteDatabaseStatement.getPrefixPath(), + tsStatus); + if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + future.setException( + new BatchProcessException(tsStatus.subStatus.toArray(new TSStatus[0]))); + } else { + future.setException(new IoTDBException(tsStatus)); + } + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture createFunction( + Model model, + String udfName, + String className, + Optional stringURI, + Class baseClazz) { + SettableFuture future = SettableFuture.create(); + if (UDFManagementService.getInstance().checkIsBuiltInFunctionName(model, udfName)) { + future.setException( + new IoTDBException( + String.format( + "Failed to create UDF [%s], the given function name conflicts with the built-in function name.", + udfName.toUpperCase()), + TSStatusCode.CREATE_UDF_ERROR.getStatusCode())); + return future; + } + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TCreateFunctionReq tCreateFunctionReq = + new TCreateFunctionReq(udfName, className, false).setModel(model); + String libRoot = UDFExecutableManager.getInstance().getLibRoot(); + String jarFileName; + ByteBuffer jarFile; + String jarMd5; + if (stringURI.isPresent()) { + String uriString = stringURI.get(); + jarFileName = new File(uriString).getName(); + try { + URI uri = new URI(uriString); + if (uri.getScheme() == null) { + future.setException( + new IoTDBException( + "The scheme of URI is not set, please specify the scheme of URI.", + TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + if (!uri.getScheme().equals("file")) { + // Download executable + ExecutableResource resource = + UDFExecutableManager.getInstance().request(Collections.singletonList(uriString)); + String jarFilePathUnderTempDir = + UDFExecutableManager.getInstance() + .getDirStringUnderTempRootByRequestId(resource.getRequestId()) + + jarFileName; + // libRoot should be the path of the specified jar + libRoot = jarFilePathUnderTempDir; + jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir); + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir))); + } else { + // libRoot should be the path of the specified jar + libRoot = new File(new URI(uriString)).getAbsolutePath(); + // If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to + // ConfigNode. + jarFile = ExecutableManager.transferToBytebuffer(libRoot); + // Set md5 of the jar file + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot))); + } + } catch (IOException | URISyntaxException e) { + LOGGER.warn("Failed to get executable for UDF({}) using URI: {}.", udfName, uriString, e); + future.setException( + new IoTDBException( + "Failed to get executable for UDF '" + udfName + "', please check the URI.", + TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + // modify req + tCreateFunctionReq.setJarFile(jarFile); + tCreateFunctionReq.setJarMD5(jarMd5); + tCreateFunctionReq.setIsUsingURI(true); + int index = jarFileName.lastIndexOf("."); + if (index < 0) { + tCreateFunctionReq.setJarName(String.format("%s-%s", jarFileName, jarMd5)); + } else { + tCreateFunctionReq.setJarName( + String.format( + "%s-%s.%s", + jarFileName.substring(0, index), jarMd5, jarFileName.substring(index + 1))); + } + } + + FunctionType functionType = FunctionType.NONE; + // try to create instance, this request will fail if creation is not successful + try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { + // ensure that jar file contains the class and the class is a UDF + Class clazz = Class.forName(className, true, classLoader); + Object o = baseClazz.cast(clazz.getDeclaredConstructor().newInstance()); + if (Model.TABLE.equals(model)) { + // we check function type for table model + if (o instanceof ScalarFunction) { + functionType = FunctionType.SCALAR; + } else if (o instanceof AggregateFunction) { + functionType = FunctionType.AGGREGATE; + } else if (o instanceof TableFunction) { + functionType = FunctionType.TABLE; + // check there is no duplicate argument specification for name + TableFunction tableFunction = (TableFunction) o; + Set argNames = new HashSet<>(); + for (ParameterSpecification specification : + tableFunction.getArgumentsSpecifications()) { + if (!argNames.add(specification.getName().toUpperCase())) { + future.setException( + new IoTDBException( + "Failed to create function '" + + udfName + + "', because there is duplicate argument name '" + + specification.getName() + + "'.", + TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); + return future; + } + } + } + } + tCreateFunctionReq.setFunctionType(functionType); + } catch (ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException + | ClassCastException e) { + LOGGER.warn( + "Failed to create function when try to create {}({}) instance first.", + baseClazz.getSimpleName(), + udfName, + e); + future.setException( + new IoTDBException( + "Failed to load class '" + + className + + "', because it's not found in jar file or is invalid: " + + stringURI.orElse(null), + TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); + return future; + } + + final TSStatus executionStatus = client.createFunction(tCreateFunctionReq); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn( + "Failed to create function {}({}) because {}", + udfName, + className, + executionStatus.getMessage()); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | IOException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture dropFunction(Model model, String udfName) { + SettableFuture future = SettableFuture.create(); + if (UDFManagementService.getInstance().checkIsBuiltInFunctionName(model, udfName)) { + future.setException( + new IoTDBException( + String.format("Built-in function %s can not be deregistered.", udfName.toUpperCase()), + TSStatusCode.DROP_UDF_ERROR.getStatusCode())); + return future; + } + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus executionStatus = + client.dropFunction(new TDropFunctionReq(udfName).setModel(model)); + + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn("[{}] Failed to drop function {}.", executionStatus, udfName); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showFunctions(Model model) { + SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TGetUDFTableResp getUDFTableResp = client.getUDFTable(new TGetUdfTableReq(model)); + if (getUDFTableResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + getUDFTableResp.getStatus().message, getUDFTableResp.getStatus().code)); + return future; + } + // convert UDFTable and buildTsBlock + ShowFunctionsTask.buildTsBlock(model, getUDFTableResp.getAllUDFInformation(), future); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + + return future; + } + + @Override + public SettableFuture createTrigger( + CreateTriggerStatement createTriggerStatement) { + SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + + TCreateTriggerReq tCreateTriggerReq = + new TCreateTriggerReq( + createTriggerStatement.getTriggerName(), + createTriggerStatement.getClassName(), + createTriggerStatement.getTriggerEvent().getId(), + createTriggerStatement.getTriggerType().getId(), + createTriggerStatement.getPathPattern().serialize(), + createTriggerStatement.getAttributes(), + FailureStrategy.OPTIMISTIC.getId(), + createTriggerStatement.isUsingURI()); // set default strategy + + String libRoot = TriggerExecutableManager.getInstance().getLibRoot(); + String jarFileName; + ByteBuffer jarFile; + String jarMd5; + if (createTriggerStatement.isUsingURI()) { + String uriString = createTriggerStatement.getUriString(); + if (uriString == null || uriString.isEmpty()) { + future.setException( + new IoTDBException( + "URI is empty, please specify the URI.", + TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + jarFileName = new File(uriString).getName(); + try { + URI uri = new URI(uriString); + if (uri.getScheme() == null) { + future.setException( + new IoTDBException( + "The scheme of URI is not set, please specify the scheme of URI.", + TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + if (!uri.getScheme().equals("file")) { + // download executable + ExecutableResource resource = + TriggerExecutableManager.getInstance() + .request(Collections.singletonList(uriString)); + String jarFilePathUnderTempDir = + TriggerExecutableManager.getInstance() + .getDirStringUnderTempRootByRequestId(resource.getRequestId()) + + jarFileName; + // libRoot should be the path of the specified jar + libRoot = jarFilePathUnderTempDir; + jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir); + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir))); + + } else { + // libRoot should be the path of the specified jar + libRoot = new File(new URI(uriString)).getAbsolutePath(); + // If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to + // ConfigNode. + jarFile = ExecutableManager.transferToBytebuffer(libRoot); + // set md5 of the jar file + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot))); + } + } catch (IOException | URISyntaxException e) { + LOGGER.warn( + "Failed to get executable for Trigger({}) using URI: {}.", + createTriggerStatement.getTriggerName(), + createTriggerStatement.getUriString(), + e); + future.setException( + new IoTDBException( + "Failed to get executable for Trigger '" + + createTriggerStatement.getUriString() + + "', please check the URI.", + TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + // modify req + tCreateTriggerReq.setJarFile(jarFile); + tCreateTriggerReq.setJarMD5(jarMd5); + tCreateTriggerReq.setIsUsingURI(true); + tCreateTriggerReq.setJarName( + String.format( + "%s-%s.%s", + jarFileName.substring(0, jarFileName.lastIndexOf(".")), + jarMd5, + jarFileName.substring(jarFileName.lastIndexOf(".") + 1))); + } + + // try to create instance, this request will fail if creation is not successful + try (TriggerClassLoader classLoader = new TriggerClassLoader(libRoot)) { + Class triggerClass = + Class.forName(createTriggerStatement.getClassName(), true, classLoader); + Trigger trigger = (Trigger) triggerClass.getDeclaredConstructor().newInstance(); + tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId()); + } catch (ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException + | ClassCastException e) { + LOGGER.warn( + "Failed to create trigger when try to create trigger({}) instance first.", + createTriggerStatement.getTriggerName(), + e); + future.setException( + new IoTDBException( + "Failed to load class '" + + createTriggerStatement.getClassName() + + "', because it's not found in jar file or is invalid: " + + createTriggerStatement.getUriString(), + TSStatusCode.TRIGGER_LOAD_CLASS_ERROR.getStatusCode())); + return future; + } + + final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq); + + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn( + "[{}] Failed to create trigger {}. TSStatus is {}", + executionStatus, + createTriggerStatement.getTriggerName(), + executionStatus.message); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | TException | IOException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture dropTrigger(String triggerName) { + SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus executionStatus = client.dropTrigger(new TDropTriggerReq(triggerName)); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn("[{}] Failed to drop trigger {}.", executionStatus, triggerName); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showTriggers() { + SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TGetTriggerTableResp getTriggerTableResp = client.getTriggerTable(); + if (getTriggerTableResp.getStatus().getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + getTriggerTableResp.getStatus().message, getTriggerTableResp.getStatus().code)); + return future; + } + // convert triggerTable and buildTsBlock + ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), future); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + + return future; + } + + @Override + public SettableFuture createPipePlugin( + final CreatePipePluginStatement createPipePluginStatement) { + final SettableFuture future = SettableFuture.create(); + final String pluginName = createPipePluginStatement.getPluginName(); + final String className = createPipePluginStatement.getClassName(); + final String uriString = createPipePluginStatement.getUriString(); + + if (uriString == null || uriString.isEmpty()) { + future.setException( + new IoTDBException( + "Failed to create pipe plugin, because the URI is empty.", + TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final String libRoot; + final ByteBuffer jarFile; + final String jarMd5; + + final String jarFileName = new File(uriString).getName(); + try { + final URI uri = new URI(uriString); + if (uri.getScheme() == null) { + future.setException( + new IoTDBException( + "The scheme of URI is not set, please specify the scheme of URI.", + TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + if (!uri.getScheme().equals("file")) { + // Download executable + final ExecutableResource resource = + PipePluginExecutableManager.getInstance() + .request(Collections.singletonList(uriString)); + final String jarFilePathUnderTempDir = + PipePluginExecutableManager.getInstance() + .getDirStringUnderTempRootByRequestId(resource.getRequestId()) + + jarFileName; + // libRoot should be the path of the specified jar + libRoot = jarFilePathUnderTempDir; + jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir); + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir))); + } else { + // libRoot should be the path of the specified jar + libRoot = new File(new URI(uriString)).getAbsolutePath(); + // If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to + // ConfigNode. + jarFile = ExecutableManager.transferToBytebuffer(libRoot); + // Set md5 of the jar file + jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot))); + } + } catch (final IOException | URISyntaxException e) { + LOGGER.warn( + "Failed to get executable for PipePlugin({}) using URI: {}.", + createPipePluginStatement.getPluginName(), + createPipePluginStatement.getUriString(), + e); + future.setException( + new IoTDBException( + "Failed to get executable for PipePlugin" + + createPipePluginStatement.getPluginName() + + "', please check the URI.", + TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode())); + return future; + } + + // try to create instance, this request will fail if creation is not successful + try (final PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) { + final Class clazz = + Class.forName(createPipePluginStatement.getClassName(), true, classLoader); + + final Visibility pluginVisibility = VisibilityUtils.calculateFromPluginClass(clazz); + final boolean isTableModel = createPipePluginStatement.isTableModel(); + if (!VisibilityUtils.isCompatible(pluginVisibility, isTableModel)) { + LOGGER.warn( + "Failed to create PipePlugin({}) because this plugin is not designed for {} model.", + createPipePluginStatement.getPluginName(), + isTableModel ? "table" : "tree"); + future.setException( + new IoTDBException( + "Failed to create PipePlugin '" + + createPipePluginStatement.getPluginName() + + "', because this plugin is not designed for " + + (isTableModel ? "table" : "tree") + + " model.", + TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode())); + return future; + } + + // ensure that jar file contains the class and the class is a pipe plugin + final PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance(); + } catch (final ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException + | ClassCastException e) { + LOGGER.warn( + "Failed to create function when try to create PipePlugin({}) instance first.", + createPipePluginStatement.getPluginName(), + e); + future.setException( + new IoTDBException( + "Failed to load class '" + + createPipePluginStatement.getClassName() + + "', because it's not found in jar file or is invalid: " + + createPipePluginStatement.getUriString(), + TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode())); + return future; + } + + final TSStatus executionStatus = + client.createPipePlugin( + new TCreatePipePluginReq() + .setPluginName(pluginName) + .setIfNotExistsCondition(createPipePluginStatement.hasIfNotExistsCondition()) + .setClassName(className) + .setJarFile(jarFile) + .setJarMD5(jarMd5) + .setJarName( + String.format( + "%s-%s.%s", + jarFileName.substring(0, jarFileName.lastIndexOf(".")), + jarMd5, + jarFileName.substring(jarFileName.lastIndexOf(".") + 1)))); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn( + "Failed to create PipePlugin {}({}) because {}", + pluginName, + className, + executionStatus.getMessage()); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final ClientManagerException | TException | IOException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture dropPipePlugin( + DropPipePluginStatement dropPipePluginStatement) { + final SettableFuture future = SettableFuture.create(); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus executionStatus = + client.dropPipePlugin( + new TDropPipePluginReq() + .setPluginName(dropPipePluginStatement.getPluginName()) + .setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()) + .setIsTableModel(dropPipePluginStatement.isTableModel())); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn( + "[{}] Failed to drop pipe plugin {}.", + executionStatus, + dropPipePluginStatement.getPluginName()); + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showPipePlugins( + ShowPipePluginsStatement showPipePluginsStatement) { + final SettableFuture future = SettableFuture.create(); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TGetPipePluginTableResp getPipePluginTableResp = + client.getPipePluginTableExtended( + new TShowPipePluginReq().setIsTableModel(showPipePluginsStatement.isTableModel())); + if (getPipePluginTableResp.getStatus().getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + getPipePluginTableResp.getStatus().message, + getPipePluginTableResp.getStatus().code)); + return future; + } + // convert PipePluginTable and buildTsBlock + ShowPipePluginsTask.buildTsBlock(getPipePluginTableResp.getAllPipePluginMeta(), future); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture setTTL(SetTTLStatement setTTLStatement, String taskName) { + SettableFuture future = SettableFuture.create(); + List pathPattern = Arrays.asList(setTTLStatement.getPath().getNodes()); + TSetTTLReq setTTLReq = new TSetTTLReq(pathPattern, setTTLStatement.getTTL(), false); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + TSStatus tsStatus = configNodeClient.setTTL(setTTLReq); + // Get response or throw exception + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Failed to execute {} {} in config node, status is {}.", + taskName, + setTTLStatement.getPath(), + tsStatus); + future.setException(new IoTDBException(tsStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture merge(boolean onCluster) { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.merge(); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } else { + try { + StorageEngine.getInstance().mergeAll(); + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (StorageEngineException e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture flush( + final TFlushReq tFlushReq, final boolean onCluster) { + final SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.flush(tFlushReq); + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + } else { + try { + StorageEngine.getInstance().operateFlush(tFlushReq); + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (final Exception e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture clearCache( + final boolean onCluster, final Set options) { + final SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = + client.clearCache( + options.stream().map(CacheClearOptions::ordinal).collect(Collectors.toSet())); + } catch (final ClientManagerException | TException e) { + future.setException(e); + } } else { - LOGGER.warn( - "Failed to execute create database {} in config node, status is {}.", - databaseSchemaStatement.getDatabasePath(), - tsStatus); - future.setException(new IoTDBException(tsStatus)); - } - - } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); - } - } catch (final ClientManagerException | TException e) { - future.setException(e); - } - return future; - } - - @Override - public SettableFuture alterDatabase( - final DatabaseSchemaStatement databaseSchemaStatement) { - final SettableFuture future = SettableFuture.create(); - // Construct request using statement - final TDatabaseSchema databaseSchema = - DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement); - databaseSchema.setIsTableModel(false); - try (final ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // Send request to some API server - final TSStatus tsStatus = configNodeClient.alterDatabase(databaseSchema); - // Get response or throw exception - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - if (databaseSchemaStatement.getEnablePrintExceptionLog()) { - LOGGER.warn( - "Failed to execute alter database {} in config node, status is {}.", - databaseSchemaStatement.getDatabasePath(), - tsStatus); - } - future.setException(new IoTDBException(tsStatus)); - } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); - } - } catch (final ClientManagerException | TException e) { - future.setException(e); - } - return future; - } - - @Override - public SettableFuture showDatabase( - final ShowDatabaseStatement showDatabaseStatement) { - final SettableFuture future = SettableFuture.create(); - // Construct request using statement - final List databasePathPattern = - Arrays.asList(showDatabaseStatement.getPathPattern().getNodes()); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // Send request to some API server - final TGetDatabaseReq req = - new TGetDatabaseReq( - databasePathPattern, showDatabaseStatement.getAuthorityScope().serialize()) - .setIsTableModel(false) - .setCanSeeAuditDB(showDatabaseStatement.isCanSeeAuditDB()); - final TShowDatabaseResp resp = client.showDatabase(req); - // build TSBlock - showDatabaseStatement.buildTSBlock(resp.getDatabaseInfoMap(), future); - } catch (final IOException | ClientManagerException | TException e) { - future.setException(e); - } - return future; - } - - @Override - public SettableFuture countDatabase( - CountDatabaseStatement countDatabaseStatement) { - SettableFuture future = SettableFuture.create(); - int databaseNum; - List databasePathPattern = - Arrays.asList(countDatabaseStatement.getPathPattern().getNodes()); - try (ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TGetDatabaseReq req = - new TGetDatabaseReq( - databasePathPattern, countDatabaseStatement.getAuthorityScope().serialize()) - .setCanSeeAuditDB(countDatabaseStatement.isCanSeeAuditDB()); - TCountDatabaseResp resp = client.countMatchedDatabases(req); - databaseNum = resp.getCount(); - // build TSBlock - CountDatabaseTask.buildTSBlock(databaseNum, future); - } catch (IOException | ClientManagerException | TException e) { - future.setException(e); - } - return future; - } - - @Override - public SettableFuture deleteDatabase( - final DeleteDatabaseStatement deleteDatabaseStatement) { - final SettableFuture future = SettableFuture.create(); - final TDeleteDatabasesReq req = - new TDeleteDatabasesReq(deleteDatabaseStatement.getPrefixPath()).setIsTableModel(false); - try (final ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus tsStatus = client.deleteDatabases(req); - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - LOGGER.warn( - "Failed to execute delete database {} in config node, status is {}.", - deleteDatabaseStatement.getPrefixPath(), - tsStatus); - if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - future.setException( - new BatchProcessException(tsStatus.subStatus.toArray(new TSStatus[0]))); + tsStatus = DataNodeInternalRPCService.getInstance().getImpl().clearCacheImpl(options); + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } else { - future.setException(new IoTDBException(tsStatus)); - } - } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); - } - } catch (final ClientManagerException | TException e) { - future.setException(e); - } - return future; - } - - @Override - public SettableFuture createFunction( - Model model, - String udfName, - String className, - Optional stringURI, - Class baseClazz) { - SettableFuture future = SettableFuture.create(); - if (UDFManagementService.getInstance().checkIsBuiltInFunctionName(model, udfName)) { - future.setException( - new IoTDBException( - String.format( - "Failed to create UDF [%s], the given function name conflicts with the built-in function name.", - udfName.toUpperCase()), - TSStatusCode.CREATE_UDF_ERROR.getStatusCode())); - return future; - } - try (ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TCreateFunctionReq tCreateFunctionReq = - new TCreateFunctionReq(udfName, className, false).setModel(model); - String libRoot = UDFExecutableManager.getInstance().getLibRoot(); - String jarFileName; - ByteBuffer jarFile; - String jarMd5; - if (stringURI.isPresent()) { - String uriString = stringURI.get(); - jarFileName = new File(uriString).getName(); + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture setConfiguration(TSetConfigurationReq req) { + SettableFuture future = SettableFuture.create(); + + TSStatus tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + List ignoredConfigItems = + ConfigurationFileUtils.filterInvalidConfigItems(req.getConfigs()); + TSStatus warningTsStatus = null; + if (!ignoredConfigItems.isEmpty()) { + warningTsStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + warningTsStatus.setMessage( + "ignored config items: " + + ignoredConfigItems + + " because they are immutable or undefined."); + if (req.getConfigs().isEmpty()) { + future.setException(new IoTDBException(warningTsStatus)); + return future; + } + } + + boolean onLocal = IoTDBDescriptor.getInstance().getConfig().getDataNodeId() == req.getNodeId(); + if (onLocal) { + tsStatus = StorageEngine.getInstance().setConfiguration(req); + } else { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.setConfiguration(req); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } + if (warningTsStatus != null) { + tsStatus = warningTsStatus; + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture showAppliedConfigurations( + ShowConfigurationStatement showConfigurationStatement) { + SettableFuture future = SettableFuture.create(); + int nodeId = showConfigurationStatement.getNodeId(); + boolean showAllConfigurations = showConfigurationStatement.isShowAllConfigurations(); + try { + boolean onLocal = + nodeId == -1 || IoTDBDescriptor.getInstance().getConfig().getDataNodeId() == nodeId; + Map appliedProperties; + if (onLocal) { + appliedProperties = ConfigurationFileUtils.getAppliedProperties(); + } else { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TShowAppliedConfigurationsResp resp = client.showAppliedConfigurations(nodeId); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException(new IoTDBException(resp.getStatus())); + return future; + } + appliedProperties = resp.getData(); + } + } + ShowConfigurationTask.buildTsBlock( + appliedProperties, + showAllConfigurations, + showConfigurationStatement.withDescription(), + showConfigurationStatement.getMissingPrivileges(), + future); + } catch (Exception e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture startRepairData(boolean onCluster) { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.startRepairData(); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } else { + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { + future.setException( + new IoTDBException( + "not all sg is ready", TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); + return future; + } + if (!CompactionTaskManager.getInstance().isInit()) { + future.setException( + new IoTDBException( + "cannot start repair task because compaction is not enabled", + TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); + return future; + } + try { + if (StorageEngine.getInstance().repairData()) { + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } else { + if (CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus() + == RepairTaskStatus.STOPPING) { + tsStatus = + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair task is still stopping"); + } else { + tsStatus = + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running repair task"); + } + } + } catch (Exception e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture stopRepairData(boolean onCluster) { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.stopRepairData(); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } else { + try { + StorageEngine.getInstance().stopRepairData(); + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (StorageEngineException e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture loadConfiguration(boolean onCluster) { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.submitLoadConfigurationTask(); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } else { + try { + IoTDBDescriptor.getInstance().loadHotModifiedProps(); + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (Exception e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new StatementExecutionException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture setSystemStatus(boolean onCluster, NodeStatus status) { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + if (onCluster) { + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + tsStatus = client.setSystemStatus(status.getStatus()); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + } else { + try { + CommonDescriptor.getInstance().getConfig().setNodeStatus(status); + tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (Exception e) { + tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + } + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new StatementExecutionException(tsStatus)); + } + return future; + } + + @Override + public SettableFuture killQuery(final KillQueryStatement killQueryStatement) { + int dataNodeId = -1; + String queryId = killQueryStatement.getQueryId(); + SettableFuture future = SettableFuture.create(); + if (!killQueryStatement.isKillAll()) { + String[] splits = queryId.split("_"); + try { + // We just judge the input queryId has three '_' and the DataNodeId from it is non-negative + // here + if (splits.length != 4 || ((dataNodeId = Integer.parseInt(splits[3])) < 0)) { + future.setException( + new IoTDBException( + "Please ensure your input is correct", + TSStatusCode.SEMANTIC_ERROR.getStatusCode(), + true)); + return future; + } + } catch (NumberFormatException e) { + future.setException( + new IoTDBException( + "Please ensure your input is correct", + TSStatusCode.SEMANTIC_ERROR.getStatusCode(), + true)); + return future; + } + } + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus executionStatus = + client.killQuery(queryId, dataNodeId, killQueryStatement.getAllowedUsername()); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { + future.setException(new IoTDBException(executionStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showCluster( + final ShowClusterStatement showClusterStatement) { + final SettableFuture future = SettableFuture.create(); + TShowClusterResp showClusterResp = new TShowClusterResp(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showClusterResp = client.showCluster(); + } catch (final ClientManagerException | TException e) { + if (showClusterResp.getConfigNodeList() == null) { + future.setException(new TException(MSG_RECONNECTION_FAIL)); + } else { + future.setException(e); + } + return future; + } + // build TSBlock + if (showClusterStatement.isDetails()) { + ShowClusterDetailsTask.buildTSBlock(showClusterResp, future); + } else { + ShowClusterTask.buildTsBlock(showClusterResp, future); + } + + return future; + } + + @Override + public SettableFuture showClusterParameters() { + final SettableFuture future = SettableFuture.create(); + TShowVariablesResp showVariablesResp = new TShowVariablesResp(); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showVariablesResp = client.showVariables(); + } catch (ClientManagerException | TException e) { + if (showVariablesResp.getStatus().getCode() + == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + future.setException(new TException(MSG_RECONNECTION_FAIL)); + } else { + future.setException(e); + } + return future; + } + + // build TSBlock + ShowVariablesTask.buildTSBlock(showVariablesResp, future); + + return future; + } + + @Override + public SettableFuture showClusterId() { + final SettableFuture future = SettableFuture.create(); + ShowClusterIdTask.buildTSBlock( + IoTDBDescriptor.getInstance().getConfig().getClusterId(), future); + return future; + } + + @Override + public SettableFuture showVersion() { + final SettableFuture future = SettableFuture.create(); + ShowVersionTask.buildTsBlock(future); + return future; + } + + @Override + public SettableFuture showCurrentSqlDialect(final String sqlDialect) { + final SettableFuture future = SettableFuture.create(); + ShowCurrentSqlDialectTask.buildTsBlock(sqlDialect, future); + return future; + } + + @Override + public SettableFuture setSqlDialect(IClientSession.SqlDialect sqlDialect) { + final SettableFuture future = SettableFuture.create(); + try { + SessionManager.getInstance().getCurrSession().setSqlDialectAndClean(sqlDialect); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } catch (Exception e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showCurrentDatabase( + @Nullable final String currentDatabase) { + SettableFuture future = SettableFuture.create(); + ShowCurrentDatabaseTask.buildTsBlock(currentDatabase, future); + return future; + } + + @Override + public SettableFuture showCurrentUser(final String currentUser) { + final SettableFuture future = SettableFuture.create(); + ShowCurrentUserTask.buildTsBlock(currentUser, future); + return future; + } + + @Override + public SettableFuture showCurrentTimestamp() { + final SettableFuture future = SettableFuture.create(); + ShowCurrentTimestampTask.buildTsBlock(future); + return future; + } + + @Override + public SettableFuture testConnection(boolean needDetails) { + SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TTestConnectionResp result = client.submitTestConnectionTaskToLeader(); + int configNodeNum = 0, dataNodeNum = 0; + if (!needDetails) { + configNodeNum = client.showConfigNodes().getConfigNodesInfoListSize(); + dataNodeNum = client.showDataNodes().getDataNodesInfoListSize(); + } + TestConnectionTask.buildTSBlock(result, configNodeNum, dataNodeNum, needDetails, future); + } catch (Exception e) { + future.setException(e); + } + return future; + } + + @Override + public SettableFuture showTTL(ShowTTLStatement showTTLStatement) { + SettableFuture future = SettableFuture.create(); + Map databaseToTTL = new TreeMap<>(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // TODO: send all paths in one RPC + PathPatternTree authorityScope = showTTLStatement.getAuthorityScope(); + for (PartialPath pathPattern : showTTLStatement.getPaths()) { + if (!pathPattern.endWithMultiLevelWildcard()) { + TShowTTLReq req = new TShowTTLReq(Arrays.asList(pathPattern.getNodes())); + TShowTTLResp resp = client.showTTL(req); + databaseToTTL.putAll(resp.getPathTTLMap()); + continue; + } + for (PartialPath overlappedPathPattern : + authorityScope.getOverlappedPathPatterns(pathPattern)) { + List nodes = Arrays.asList(overlappedPathPattern.getNodes()); + TShowTTLReq req = new TShowTTLReq(nodes); + TShowTTLResp resp = client.showTTL(req); + databaseToTTL.putAll(resp.getPathTTLMap()); + if (showTTLStatement.isShowAllTTL()) { + req.setPathPattern(nodes.subList(0, nodes.size() - 1)); + resp = client.showTTL(req); + databaseToTTL.putAll(resp.getPathTTLMap()); + } + } + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + // build TSBlock + ShowTTLTask.buildTSBlock(databaseToTTL, future); + return future; + } + + @Override + public SettableFuture showRegion( + final ShowRegionStatement showRegionStatement, final boolean isTableModel) { + final SettableFuture future = SettableFuture.create(); + TShowRegionResp showRegionResp = new TShowRegionResp(); + final TShowRegionReq showRegionReq = new TShowRegionReq().setIsTableModel(isTableModel); + showRegionReq.setConsensusGroupType(showRegionStatement.getRegionType()); + if (showRegionStatement.getDatabases() == null) { + showRegionReq.setDatabases(null); + } else { + showRegionReq.setDatabases( + showRegionStatement.getDatabases().stream() + .map(PartialPath::getFullPath) + .collect(Collectors.toList())); + } + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showRegionResp = client.showRegion(showRegionReq); + if (showRegionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + showRegionResp.getStatus().message, showRegionResp.getStatus().code)); + return future; + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + + // filter the regions by nodeId + if (showRegionStatement.getNodeIds() != null) { + List regionInfos = showRegionResp.getRegionInfoList(); + regionInfos = + regionInfos.stream() + .filter( + regionInfo -> + showRegionStatement.getNodeIds().contains(regionInfo.getDataNodeId())) + .collect(Collectors.toList()); + showRegionResp.setRegionInfoList(regionInfos); + } + + // build TSBlock + ShowRegionTask.buildTSBlock(showRegionResp, future, isTableModel); + return future; + } + + @Override + public SettableFuture showDataNodes() { + final SettableFuture future = SettableFuture.create(); + TShowDataNodesResp showDataNodesResp = new TShowDataNodesResp(); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showDataNodesResp = client.showDataNodes(); + if (showDataNodesResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code)); + return future; + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + // build TSBlock + ShowDataNodesTask.buildTSBlock(showDataNodesResp, future); + return future; + } + + @Override + public SettableFuture showConfigNodes() { + final SettableFuture future = SettableFuture.create(); + TShowConfigNodesResp showConfigNodesResp = new TShowConfigNodesResp(); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showConfigNodesResp = client.showConfigNodes(); + if (showConfigNodesResp.getStatus().getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code)); + return future; + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + // build TSBlock + ShowConfigNodesTask.buildTSBlock(showConfigNodesResp, future); + return future; + } + + @Override + public SettableFuture showAINodes() { + final SettableFuture future = SettableFuture.create(); + TShowAINodesResp resp = new TShowAINodesResp(); + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + resp = client.showAINodes(); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException(new IoTDBException(resp.getStatus())); + return future; + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + ShowAINodesTask.buildTsBlock(resp, future); + return future; + } + + @Override + public SettableFuture createSchemaTemplate( + final CreateSchemaTemplateStatement createSchemaTemplateStatement) { + final SettableFuture future = SettableFuture.create(); + // Construct request using statement + try { + // Send request to some API server + final TSStatus tsStatus = + ClusterTemplateManager.getInstance().createSchemaTemplate(createSchemaTemplateStatement); + // Get response or throw exception + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + future.setException(new IoTDBException(tsStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (final Exception e) { + future.setException(e.getCause()); + } + return future; + } + + @Override + public SettableFuture showSchemaTemplate( + final ShowSchemaTemplateStatement showSchemaTemplateStatement) { + final SettableFuture future = SettableFuture.create(); + try { + List