Skip to content

Commit 720c7c5

Browse files
committed
draft
Signed-off-by: Weihao Li <[email protected]>
1 parent 8f93984 commit 720c7c5

File tree

43 files changed

+1468
-62
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1468
-62
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,8 @@ public enum TSStatusCode {
341341

342342
// ExternalService
343343
NO_SUCH_EXTERNAL_SERVICE(2300),
344-
EXTERNAL_SERVICE_ALREADY_ACTIVE(2301),
345-
EXTERNAL_SERVICE_ALREADY_EXIST(2302),
344+
EXTERNAL_SERVICE_ALREADY_EXIST(2301),
345+
GET_BUILTIN_EXTERNAL_SERVICE_ERROR(2302),
346346
;
347347

348348
private final int statusCode;

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ keyWords
214214
| SECURITY
215215
| SELECT
216216
| SERIESSLOTID
217+
| SERVICE
218+
| SERVICES
217219
| SESSION
218220
| SET
219221
| SETTLE

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ ddlStatement
5252
| createFunction | dropFunction | showFunctions
5353
// Trigger
5454
| createTrigger | dropTrigger | showTriggers | startTrigger | stopTrigger
55+
// ExternalService
56+
| createService | startService | stopService | dropService | showService
5557
// Pipe Task
5658
| createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
5759
// Pipe Plugin
@@ -437,6 +439,27 @@ stopTrigger
437439
: STOP TRIGGER triggerName=identifier
438440
;
439441

442+
// ExternalService =========================================================================================
443+
createService
444+
: CREATE SERVICE serviceName=identifier
445+
AS className=STRING_LITERAL
446+
;
447+
448+
startService
449+
: START SERVICE serviceName=identifier
450+
;
451+
452+
stopService
453+
: STOP SERVICE serviceName=identifier
454+
;
455+
456+
dropService
457+
: DROP SERVICE serviceName=identifier
458+
;
459+
460+
showService
461+
: SHOW SERVICES (ON targetDataNodeId=INTEGER_LITERAL)?
462+
;
440463

441464
// CQ ==============================================================================================
442465
// ---- Create Continuous Query

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,14 @@ SERIESSLOTID
786786
: S E R I E S S L O T I D
787787
;
788788

789+
SERVICE
790+
: S E R V I C E
791+
;
792+
793+
SERVICES
794+
: S E R V I C E S
795+
;
796+
789797
SESSION
790798
: S E S S I O N
791799
;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public enum CnToDnAsyncRequestType {
6060
INACTIVE_TRIGGER_INSTANCE,
6161
UPDATE_TRIGGER_LOCATION,
6262

63+
// ExternalService
64+
GET_BUILTIN_SERVICE,
65+
6366
// Pipe Plugin
6467
CREATE_PIPE_PLUGIN,
6568
DROP_PIPE_PLUGIN,

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.iotdb.confignode.client.async.handlers.rpc.DataNodeAsyncRequestRPCHandler;
3939
import org.apache.iotdb.confignode.client.async.handlers.rpc.DataNodeTSStatusRPCHandler;
4040
import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
41+
import org.apache.iotdb.confignode.client.async.handlers.rpc.GetBuiltInExternalServiceRPCHandler;
4142
import org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
4243
import org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
4344
import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
@@ -486,6 +487,10 @@ protected void initActionMapBuilder() {
486487
CnToDnAsyncRequestType.ENABLE_SEPARATION_OF_ADMIN_POWERS,
487488
(req, client, handler) ->
488489
client.enableSeparationOfAdminPower((DataNodeTSStatusRPCHandler) handler));
490+
actionMapBuilder.put(
491+
CnToDnAsyncRequestType.GET_BUILTIN_SERVICE,
492+
(req, client, handler) ->
493+
client.getBuiltInService((GetBuiltInExternalServiceRPCHandler) handler));
489494
}
490495

491496
@Override

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.client.async.handlers.rpc;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
2324
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2425
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2526
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
@@ -193,6 +194,14 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
193194
dataNodeLocationMap,
194195
(Map<Integer, TDeviceViewResp>) responseMap,
195196
countDownLatch);
197+
case GET_BUILTIN_SERVICE:
198+
return new GetBuiltInExternalServiceRPCHandler(
199+
requestType,
200+
requestId,
201+
targetDataNode,
202+
dataNodeLocationMap,
203+
(Map<Integer, TExternalServiceListResp>) responseMap,
204+
countDownLatch);
196205
case SET_TTL:
197206
case CREATE_DATA_REGION:
198207
case CREATE_SCHEMA_REGION:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.client.async.handlers.rpc;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
24+
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
25+
import org.apache.iotdb.rpc.TSStatusCode;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Map;
31+
import java.util.concurrent.CountDownLatch;
32+
33+
public class GetBuiltInExternalServiceRPCHandler
34+
extends DataNodeAsyncRequestRPCHandler<TExternalServiceListResp> {
35+
private static final Logger LOGGER =
36+
LoggerFactory.getLogger(GetBuiltInExternalServiceRPCHandler.class);
37+
38+
public GetBuiltInExternalServiceRPCHandler(
39+
CnToDnAsyncRequestType requestType,
40+
int requestId,
41+
TDataNodeLocation targetDataNode,
42+
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
43+
Map<Integer, TExternalServiceListResp> responseMap,
44+
CountDownLatch countDownLatch) {
45+
super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
46+
}
47+
48+
@Override
49+
public void onComplete(TExternalServiceListResp response) {
50+
// Put response only when success
51+
if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
52+
responseMap.put(requestId, response);
53+
} else {
54+
LOGGER.error(
55+
"Failed to {} on DataNode: {}, response: {}",
56+
requestType,
57+
formattedTargetLocation,
58+
response);
59+
}
60+
61+
// Always remove to avoid retrying
62+
nodeLocationMap.remove(requestId);
63+
64+
// Always CountDown
65+
countDownLatch.countDown();
66+
}
67+
68+
@Override
69+
public void onError(Exception e) {
70+
String errorMsg =
71+
"Failed to "
72+
+ requestType
73+
+ " on DataNode: "
74+
+ formattedTargetLocation
75+
+ ", exception: "
76+
+ e.getMessage();
77+
LOGGER.error(errorMsg, e);
78+
79+
// Always CountDown
80+
countDownLatch.countDown();
81+
}
82+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
4949
import org.apache.iotdb.confignode.consensus.request.write.externalservice.CreateExternalServicePlan;
5050
import org.apache.iotdb.confignode.consensus.request.write.externalservice.DropExternalServicePlan;
51+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.StartExternalServicePlan;
52+
import org.apache.iotdb.confignode.consensus.request.write.externalservice.StopExternalServicePlan;
5153
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
5254
import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan;
5355
import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/exernalservice/ShowExternalServicePlan.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@
2222
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2323
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
2424

25+
import java.util.List;
2526
import java.util.Objects;
2627

27-
/**
28-
* Get infos of ExternalService by the specific DataNode's id. And return all when dataNodeID is set
29-
* to -1.
30-
*/
28+
/** Get infos of ExternalService by the DataNode's id. */
3129
public class ShowExternalServicePlan extends ConfigPhysicalReadPlan {
3230

33-
private final int dataNodeId;
31+
private final List<Integer> dataNodeIds;
3432

35-
public ShowExternalServicePlan(final int dataNodeId) {
33+
public ShowExternalServicePlan(List<Integer> dataNodeIds) {
3634
super(ConfigPhysicalPlanType.GetDataNodeConfiguration);
37-
this.dataNodeId = dataNodeId;
35+
this.dataNodeIds = dataNodeIds;
3836
}
3937

40-
public Integer getDataNodeId() {
41-
return dataNodeId;
38+
public List<Integer> getDataNodeIds() {
39+
return dataNodeIds;
4240
}
4341

4442
@Override
@@ -50,11 +48,11 @@ public boolean equals(final Object o) {
5048
return false;
5149
}
5250
final ShowExternalServicePlan that = (ShowExternalServicePlan) o;
53-
return dataNodeId == that.dataNodeId;
51+
return dataNodeIds.equals(that.dataNodeIds);
5452
}
5553

5654
@Override
5755
public int hashCode() {
58-
return Objects.hash(dataNodeId);
56+
return Objects.hash(dataNodeIds);
5957
}
6058
}

0 commit comments

Comments
 (0)