Skip to content

Commit 3e4cf34

Browse files
committed
done
Signed-off-by: Weihao Li <[email protected]>
1 parent 720c7c5 commit 3e4cf34

File tree

30 files changed

+606
-100
lines changed

30 files changed

+606
-100
lines changed

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ keyWords
115115
| FIRST
116116
| FLUSH
117117
| FOR
118+
| FORCEDLY
118119
| FROM
119120
| FULL
120121
| FUNCTION

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,8 @@ stopService
454454
;
455455

456456
dropService
457-
: DROP SERVICE serviceName=identifier
457+
: DROP SERVICE serviceName=identifier FORCEDLY?
458+
458459
;
459460

460461
showService

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,11 @@ FOR
382382
: F O R
383383
;
384384

385+
FORCEDLY
386+
: F O R C E D L Y
387+
;
388+
389+
385390
FROM
386391
: F R O M
387392
;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/exernalservice/ShowExternalServicePlan.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@
2222
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2323
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
2424

25-
import java.util.List;
2625
import java.util.Objects;
26+
import java.util.Set;
2727

2828
/** Get infos of ExternalService by the DataNode's id. */
2929
public class ShowExternalServicePlan extends ConfigPhysicalReadPlan {
3030

31-
private final List<Integer> dataNodeIds;
31+
private final Set<Integer> dataNodeIds;
3232

33-
public ShowExternalServicePlan(List<Integer> dataNodeIds) {
34-
super(ConfigPhysicalPlanType.GetDataNodeConfiguration);
33+
public ShowExternalServicePlan(Set<Integer> dataNodeIds) {
34+
super(ConfigPhysicalPlanType.ShowExternalService);
3535
this.dataNodeIds = dataNodeIds;
3636
}
3737

38-
public List<Integer> getDataNodeIds() {
38+
public Set<Integer> getDataNodeIds() {
3939
return dataNodeIds;
4040
}
4141

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/externalservice/ShowExternalServiceResp.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,32 @@
2222
import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
2323
import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
2424
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25-
import org.apache.iotdb.commons.externalservice.ServiceInfo;
2625
import org.apache.iotdb.consensus.common.DataSet;
26+
import org.apache.iotdb.rpc.TSStatusCode;
2727

2828
import javax.validation.constraints.NotNull;
2929

30+
import java.util.Comparator;
3031
import java.util.List;
31-
import java.util.stream.Collectors;
3232

3333
public class ShowExternalServiceResp implements DataSet {
3434

35-
private final TSStatus status;
36-
private final List<ServiceInfo> serviceInfos;
35+
private final List<TExternalServiceEntry> serviceInfoEntryList;
3736

38-
public ShowExternalServiceResp(
39-
@NotNull TSStatus status, @NotNull List<ServiceInfo> serviceInfos) {
40-
this.status = status;
41-
this.serviceInfos = serviceInfos;
37+
public ShowExternalServiceResp(@NotNull List<TExternalServiceEntry> serviceInfoEntryList) {
38+
this.serviceInfoEntryList = serviceInfoEntryList;
39+
}
40+
41+
public List<TExternalServiceEntry> getServiceInfoEntryList() {
42+
return serviceInfoEntryList;
4243
}
4344

4445
public TExternalServiceListResp convertToRpcShowExternalServiceResp() {
46+
serviceInfoEntryList.sort(
47+
Comparator.comparingInt(TExternalServiceEntry::getDataNodId)
48+
.thenComparing(TExternalServiceEntry::getServiceType)
49+
.thenComparing(TExternalServiceEntry::getServiceName));
4550
return new TExternalServiceListResp(
46-
status,
47-
serviceInfos.stream()
48-
.map(
49-
entry ->
50-
new TExternalServiceEntry(
51-
entry.getServiceName(), entry.getClassName(), entry.getState().getValue()))
52-
.collect(Collectors.toList()));
51+
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), serviceInfoEntryList);
5352
}
5453
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine;
111111
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
112112
import org.apache.iotdb.confignode.manager.cq.CQManager;
113+
import org.apache.iotdb.confignode.manager.externalservice.ExternalServiceInfo;
113114
import org.apache.iotdb.confignode.manager.externalservice.ExternalServiceManager;
114115
import org.apache.iotdb.confignode.manager.load.LoadManager;
115116
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
@@ -360,6 +361,7 @@ public ConfigManager() throws IOException {
360361
UDFInfo udfInfo = new UDFInfo();
361362
TriggerInfo triggerInfo = new TriggerInfo();
362363
CQInfo cqInfo = new CQInfo();
364+
ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo();
363365
PipeInfo pipeInfo = new PipeInfo();
364366
QuotaInfo quotaInfo = new QuotaInfo();
365367
TTLInfo ttlInfo = new TTLInfo();
@@ -377,6 +379,7 @@ public ConfigManager() throws IOException {
377379
udfInfo,
378380
triggerInfo,
379381
cqInfo,
382+
externalServiceInfo,
380383
pipeInfo,
381384
subscriptionInfo,
382385
quotaInfo,

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.manager.externalservice;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
2223
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2324
import org.apache.iotdb.commons.externalservice.ServiceInfo;
2425
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
@@ -37,6 +38,7 @@
3738
import java.io.FileOutputStream;
3839
import java.io.IOException;
3940
import java.util.Map;
41+
import java.util.Set;
4042
import java.util.concurrent.ConcurrentHashMap;
4143
import java.util.stream.Collectors;
4244

@@ -160,11 +162,21 @@ public TSStatus stopService(StopExternalServicePlan plan) {
160162
return res;
161163
}
162164

163-
public ShowExternalServiceResp showService() {
165+
public ShowExternalServiceResp showService(Set<Integer> dataNodes) {
164166
return new ShowExternalServiceResp(
165-
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
166-
datanodeToServiceInfos.values().stream()
167-
.flatMap(stringServiceInfoMap -> stringServiceInfoMap.values().stream())
167+
datanodeToServiceInfos.entrySet().stream()
168+
.filter(entry -> dataNodes.contains(entry.getKey()))
169+
.flatMap(
170+
entry ->
171+
entry.getValue().values().stream()
172+
.map(
173+
serviceInfo ->
174+
new TExternalServiceEntry(
175+
serviceInfo.getServiceName(),
176+
serviceInfo.getClassName(),
177+
serviceInfo.getState().getValue(),
178+
entry.getKey(),
179+
ServiceInfo.ServiceType.USER_DEFINED.getValue())))
168180
.collect(Collectors.toList()));
169181
}
170182

@@ -180,6 +192,7 @@ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
180192

181193
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
182194

195+
// TODO
183196
// serializeExistedJarToMD5(fileOutputStream);
184197

185198
// udfTable.serializeUDFTable(fileOutputStream);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceManager.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,13 @@
3434
import org.apache.iotdb.confignode.consensus.response.externalservice.ShowExternalServiceResp;
3535
import org.apache.iotdb.confignode.manager.ConfigManager;
3636
import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
37-
import org.apache.iotdb.consensus.common.DataSet;
3837
import org.apache.iotdb.consensus.exception.ConsensusException;
3938
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
4039
import org.apache.iotdb.rpc.TSStatusCode;
4140

4241
import org.slf4j.Logger;
4342
import org.slf4j.LoggerFactory;
4443

45-
import java.util.ArrayList;
4644
import java.util.Collections;
4745
import java.util.Map;
4846

@@ -164,20 +162,19 @@ public TExternalServiceListResp showService(int dataNodeId) {
164162

165163
try {
166164
// 2. get user-defined services info from CN consensus
167-
DataSet response =
168-
configManager
169-
.getConsensusManager()
170-
.read(new ShowExternalServicePlan(new ArrayList<>(builtInServiceInfos.keySet())));
171-
TExternalServiceListResp resp =
172-
((ShowExternalServiceResp) response).convertToRpcShowExternalServiceResp();
165+
ShowExternalServiceResp response =
166+
(ShowExternalServiceResp)
167+
configManager
168+
.getConsensusManager()
169+
.read(new ShowExternalServicePlan(builtInServiceInfos.keySet()));
173170

174171
// 3. combined built-in services info and user-defined services info
175172
builtInServiceInfos
176173
.values()
177174
.forEach(
178175
builtInResp ->
179-
resp.externalServiceInfos.addAll(builtInResp.getExternalServiceInfos()));
180-
return resp;
176+
response.getServiceInfoEntryList().addAll(builtInResp.getExternalServiceInfos()));
177+
return response.convertToRpcShowExternalServiceResp();
181178
} catch (ConsensusException e) {
182179
LOGGER.warn("Unexpected error happened while showing Service: ", e);
183180
// consensus layer related errors

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
3434
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
3535
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
36+
import org.apache.iotdb.confignode.consensus.request.read.exernalservice.ShowExternalServicePlan;
3637
import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan;
3738
import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan;
3839
import org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
@@ -78,6 +79,10 @@
7879
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
7980
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
8081
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
82+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.CreateExternalServicePlan;
83+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.DropExternalServicePlan;
84+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.StartExternalServicePlan;
85+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.StopExternalServicePlan;
8186
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
8287
import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan;
8388
import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan;
@@ -143,6 +148,7 @@
143148
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
144149
import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
145150
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
151+
import org.apache.iotdb.confignode.manager.externalservice.ExternalServiceInfo;
146152
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
147153
import org.apache.iotdb.confignode.persistence.ClusterInfo;
148154
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -204,6 +210,8 @@ public class ConfigPlanExecutor {
204210

205211
private final CQInfo cqInfo;
206212

213+
private final ExternalServiceInfo externalServiceInfo;
214+
207215
private final PipeInfo pipeInfo;
208216

209217
private final SubscriptionInfo subscriptionInfo;
@@ -222,6 +230,7 @@ public ConfigPlanExecutor(
222230
UDFInfo udfInfo,
223231
TriggerInfo triggerInfo,
224232
CQInfo cqInfo,
233+
ExternalServiceInfo externalServiceInfo,
225234
PipeInfo pipeInfo,
226235
SubscriptionInfo subscriptionInfo,
227236
QuotaInfo quotaInfo,
@@ -253,6 +262,9 @@ public ConfigPlanExecutor(
253262
this.cqInfo = cqInfo;
254263
this.snapshotProcessorList.add(cqInfo);
255264

265+
this.externalServiceInfo = externalServiceInfo;
266+
this.snapshotProcessorList.add(externalServiceInfo);
267+
256268
this.pipeInfo = pipeInfo;
257269
this.snapshotProcessorList.add(pipeInfo);
258270

@@ -344,6 +356,8 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req)
344356
return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
345357
case SHOW_CQ:
346358
return cqInfo.showCQ();
359+
case ShowExternalService:
360+
return externalServiceInfo.showService(((ShowExternalServicePlan) req).getDataNodeIds());
347361
case GetFunctionTable:
348362
return udfInfo.getUDFTable((GetFunctionTablePlan) req);
349363
case GetFunctionJar:
@@ -635,6 +649,14 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
635649
return cqInfo.activeCQ((ActiveCQPlan) physicalPlan);
636650
case UPDATE_CQ_LAST_EXEC_TIME:
637651
return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) physicalPlan);
652+
case CreateExternalService:
653+
return externalServiceInfo.addService((CreateExternalServicePlan) physicalPlan);
654+
case StartExternalService:
655+
return externalServiceInfo.startService((StartExternalServicePlan) physicalPlan);
656+
case StopExternalService:
657+
return externalServiceInfo.stopService((StopExternalServicePlan) physicalPlan);
658+
case DropExternalService:
659+
return externalServiceInfo.dropService((DropExternalServicePlan) physicalPlan);
638660
case CreatePipePlugin:
639661
return pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan) physicalPlan);
640662
case DropPipePlugin:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,27 +1346,35 @@ public TShowCQResp showCQ() throws TException {
13461346

13471347
@Override
13481348
public TSStatus createExternalService(TCreateExternalServiceReq req) throws TException {
1349-
return null;
1349+
return executeRemoteCallWithRetry(
1350+
() -> client.createExternalService(req), resp -> !updateConfigNodeLeader(resp));
13501351
}
13511352

13521353
@Override
13531354
public TSStatus startExternalService(int dataNodeId, String serviceName) throws TException {
1354-
return null;
1355+
return executeRemoteCallWithRetry(
1356+
() -> client.startExternalService(dataNodeId, serviceName),
1357+
resp -> !updateConfigNodeLeader(resp));
13551358
}
13561359

13571360
@Override
13581361
public TSStatus stopExternalService(int dataNodeId, String serviceName) throws TException {
1359-
return null;
1362+
return executeRemoteCallWithRetry(
1363+
() -> client.stopExternalService(dataNodeId, serviceName),
1364+
resp -> !updateConfigNodeLeader(resp));
13601365
}
13611366

13621367
@Override
13631368
public TSStatus dropExternalService(int dataNodeId, String serviceName) throws TException {
1364-
return null;
1369+
return executeRemoteCallWithRetry(
1370+
() -> client.dropExternalService(dataNodeId, serviceName),
1371+
resp -> !updateConfigNodeLeader(resp));
13651372
}
13661373

13671374
@Override
13681375
public TExternalServiceListResp showExternalService(int dataNodeId) throws TException {
1369-
return null;
1376+
return executeRemoteCallWithRetry(
1377+
() -> client.showExternalService(dataNodeId), resp -> !updateConfigNodeLeader(resp.status));
13701378
}
13711379

13721380
@Override

0 commit comments

Comments
 (0)