Skip to content

Commit c72f67e

Browse files
authored
Pipe: Banned the audit / system DB from configNode sync (apache#16592)
1 parent 2a591bd commit c72f67e

File tree

8 files changed

+76
-42
lines changed

8 files changed

+76
-42
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,10 @@ public static void executeNonQueries(
11041104
}
11051105
}
11061106

1107+
public static boolean tryExecuteNonQuery(BaseEnv env, String sql) {
1108+
return tryExecuteNonQuery(env, sql, null);
1109+
}
1110+
11071111
public static boolean tryExecuteNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
11081112
return tryExecuteNonQuery(
11091113
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
@@ -1115,6 +1119,11 @@ public static boolean tryExecuteNonQuery(
11151119
env, Collections.singletonList(sql), userName, password, defaultConnection);
11161120
}
11171121

1122+
public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, List<String> sqlList) {
1123+
return tryExecuteNonQueriesWithRetry(
1124+
env, sqlList, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, null);
1125+
}
1126+
11181127
public static boolean tryExecuteNonQueriesWithRetry(
11191128
BaseEnv env,
11201129
List<String> sqlList,

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ public void testPureSchemaInclusion() throws Exception {
5151

5252
try (final SyncConfigNodeIServiceClient client =
5353
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
54-
final Map<String, String> extractorAttributes = new HashMap<>();
54+
final Map<String, String> sourceAttributes = new HashMap<>();
5555
final Map<String, String> processorAttributes = new HashMap<>();
56-
final Map<String, String> connectorAttributes = new HashMap<>();
56+
final Map<String, String> sinkAttributes = new HashMap<>();
5757

58-
extractorAttributes.put("extractor.inclusion", "schema");
58+
sourceAttributes.put("source.inclusion", "schema");
5959

60-
connectorAttributes.put("connector", "iotdb-thrift-connector");
61-
connectorAttributes.put("connector.ip", receiverIp);
62-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
60+
sinkAttributes.put("sink", "iotdb-thrift-sink");
61+
sinkAttributes.put("sink.ip", receiverIp);
62+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
6363

6464
final TSStatus status =
6565
client.createPipe(
66-
new TCreatePipeReq("testPipe", connectorAttributes)
67-
.setExtractorAttributes(extractorAttributes)
66+
new TCreatePipeReq("testPipe", sinkAttributes)
67+
.setExtractorAttributes(sourceAttributes)
6868
.setProcessorAttributes(processorAttributes));
6969

7070
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -113,18 +113,18 @@ public void testAuthExclusion() throws Exception {
113113
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
114114
final Map<String, String> extractorAttributes = new HashMap<>();
115115
final Map<String, String> processorAttributes = new HashMap<>();
116-
final Map<String, String> connectorAttributes = new HashMap<>();
116+
final Map<String, String> sinkAttributes = new HashMap<>();
117117

118118
extractorAttributes.put("extractor.inclusion", "all");
119119
extractorAttributes.put("extractor.inclusion.exclusion", "auth");
120120

121-
connectorAttributes.put("connector", "iotdb-thrift-connector");
122-
connectorAttributes.put("connector.ip", receiverIp);
123-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
121+
sinkAttributes.put("sink", "iotdb-thrift-sink");
122+
sinkAttributes.put("sink.ip", receiverIp);
123+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
124124

125125
final TSStatus status =
126126
client.createPipe(
127-
new TCreatePipeReq("testPipe", connectorAttributes)
127+
new TCreatePipeReq("testPipe", sinkAttributes)
128128
.setExtractorAttributes(extractorAttributes)
129129
.setProcessorAttributes(processorAttributes));
130130

@@ -151,18 +151,18 @@ public void testAuthInclusionWithPattern() throws Exception {
151151
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
152152
final Map<String, String> extractorAttributes = new HashMap<>();
153153
final Map<String, String> processorAttributes = new HashMap<>();
154-
final Map<String, String> connectorAttributes = new HashMap<>();
154+
final Map<String, String> sinkAttributes = new HashMap<>();
155155

156156
extractorAttributes.put("extractor.inclusion", "auth");
157157
extractorAttributes.put("path", "root.ln.**");
158158

159-
connectorAttributes.put("connector", "iotdb-thrift-connector");
160-
connectorAttributes.put("connector.ip", receiverIp);
161-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
159+
sinkAttributes.put("sink", "iotdb-thrift-sink");
160+
sinkAttributes.put("sink.ip", receiverIp);
161+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
162162

163163
final TSStatus status =
164164
client.createPipe(
165-
new TCreatePipeReq("testPipe", connectorAttributes)
165+
new TCreatePipeReq("testPipe", sinkAttributes)
166166
.setExtractorAttributes(extractorAttributes)
167167
.setProcessorAttributes(processorAttributes));
168168

@@ -203,17 +203,17 @@ public void testPureDeleteInclusion() throws Exception {
203203
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
204204
final Map<String, String> extractorAttributes = new HashMap<>();
205205
final Map<String, String> processorAttributes = new HashMap<>();
206-
final Map<String, String> connectorAttributes = new HashMap<>();
206+
final Map<String, String> sinkAttributes = new HashMap<>();
207207

208208
extractorAttributes.put("extractor.inclusion", "data.delete");
209209

210-
connectorAttributes.put("connector", "iotdb-thrift-connector");
211-
connectorAttributes.put("connector.ip", receiverIp);
212-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
210+
sinkAttributes.put("sink", "iotdb-thrift-sink");
211+
sinkAttributes.put("sink.ip", receiverIp);
212+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
213213

214214
final TSStatus status =
215215
client.createPipe(
216-
new TCreatePipeReq("testPipe", connectorAttributes)
216+
new TCreatePipeReq("testPipe", sinkAttributes)
217217
.setExtractorAttributes(extractorAttributes)
218218
.setProcessorAttributes(processorAttributes));
219219

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.apache.iotdb.commons.path.PartialPath;
2424
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2525
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
26+
import org.apache.iotdb.commons.schema.SchemaConstant;
2627
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
2728
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
29+
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
30+
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
2831
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
2932
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3033

@@ -248,6 +251,23 @@ static boolean shouldPlanBeListened(final ConfigPhysicalPlan plan) {
248251
return false;
249252
}
250253

254+
// system / audit DB
255+
if (type.equals(ConfigPhysicalPlanType.DeleteDatabase)
256+
&& (((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
257+
|| ((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.SYSTEM_DATABASE))
258+
|| (type.equals(ConfigPhysicalPlanType.CreateDatabase)
259+
|| type.equals(ConfigPhysicalPlanType.AlterDatabase))
260+
&& (((DatabaseSchemaPlan) plan)
261+
.getSchema()
262+
.getName()
263+
.equals(SchemaConstant.SYSTEM_DATABASE)
264+
|| ((DatabaseSchemaPlan) plan)
265+
.getSchema()
266+
.getName()
267+
.equals(SchemaConstant.AUDIT_DATABASE))) {
268+
return false;
269+
}
270+
251271
// PipeEnriched & UnsetTemplate are not listened directly,
252272
// but their inner plan or converted plan are listened.
253273
return type.equals(ConfigPhysicalPlanType.PipeEnriched)

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
2424
import org.apache.iotdb.commons.exception.IllegalPathException;
2525
import org.apache.iotdb.commons.path.PartialPath;
26+
import org.apache.iotdb.commons.schema.SchemaConstant;
2627
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
2728
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
2829
import org.apache.iotdb.commons.schema.table.TsTable;
@@ -41,6 +42,7 @@
4142
import org.apache.iotdb.confignode.persistence.schema.mnode.IConfigMNode;
4243
import org.apache.iotdb.confignode.persistence.schema.mnode.factory.ConfigMNodeFactory;
4344
import org.apache.iotdb.confignode.persistence.schema.mnode.impl.ConfigTableNode;
45+
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
4446
import org.apache.iotdb.db.schemaengine.template.Template;
4547

4648
import org.apache.commons.io.IOUtils;
@@ -72,8 +74,8 @@
7274
import java.util.Stack;
7375

7476
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
77+
import static org.apache.iotdb.commons.schema.SchemaConstant.DATABASE_MNODE_TYPE;
7578
import static org.apache.iotdb.commons.schema.SchemaConstant.INTERNAL_MNODE_TYPE;
76-
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
7779
import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE;
7880
import static org.apache.iotdb.commons.utils.IOUtils.readString;
7981

@@ -440,7 +442,7 @@ private void generateDatabasePhysicalPlan() {
440442

441443
final Set<TsTable> tableSet = new HashSet<>();
442444

443-
if (type == STORAGE_GROUP_MNODE_TYPE) {
445+
if (type == DATABASE_MNODE_TYPE) {
444446
databaseMNode = deserializeDatabaseMNode(bufferedInputStream);
445447
name = databaseMNode.getName();
446448
stack.push(new Pair<>(databaseMNode, true));
@@ -472,7 +474,7 @@ private void generateDatabasePhysicalPlan() {
472474
stack.push(new Pair<>(internalMNode, hasDB));
473475
name = internalMNode.getName();
474476
break;
475-
case STORAGE_GROUP_MNODE_TYPE:
477+
case DATABASE_MNODE_TYPE:
476478
databaseMNode = deserializeDatabaseMNode(bufferedInputStream).getAsMNode();
477479
while (!stack.isEmpty() && !stack.peek().right) {
478480
databaseMNode.addChild(stack.pop().left);
@@ -548,10 +550,13 @@ private IConfigMNode deserializeDatabaseMNode(final InputStream inputStream) thr
548550
templateNodeList.add((IConfigMNode) databaseMNode);
549551
}
550552

551-
final DatabaseSchemaPlan createDBPlan =
552-
new DatabaseSchemaPlan(
553-
ConfigPhysicalPlanType.CreateDatabase, databaseMNode.getAsMNode().getDatabaseSchema());
554-
planDeque.add(createDBPlan);
553+
final TDatabaseSchema schema = databaseMNode.getAsMNode().getDatabaseSchema();
554+
if (!schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
555+
&& !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
556+
final DatabaseSchemaPlan createDBPlan =
557+
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, schema);
558+
planDeque.add(createDBPlan);
559+
}
555560
return databaseMNode.getAsMNode();
556561
}
557562

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@
9191
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
9292
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES;
9393
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_TEMPLATE;
94+
import static org.apache.iotdb.commons.schema.SchemaConstant.DATABASE_MNODE_TYPE;
9495
import static org.apache.iotdb.commons.schema.SchemaConstant.INTERNAL_MNODE_TYPE;
9596
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
9697
import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
97-
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
9898
import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE;
9999
import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;
100100

@@ -1070,7 +1070,7 @@ private void serializeDatabaseNode(
10701070
throws IOException {
10711071
serializeChildren(storageGroupNode.getAsMNode(), outputStream);
10721072

1073-
ReadWriteIOUtils.write(STORAGE_GROUP_MNODE_TYPE, outputStream);
1073+
ReadWriteIOUtils.write(DATABASE_MNODE_TYPE, outputStream);
10741074
ReadWriteIOUtils.write(storageGroupNode.getName(), outputStream);
10751075
ReadWriteIOUtils.write(storageGroupNode.getAsMNode().getSchemaTemplateId(), outputStream);
10761076
ThriftConfigNodeSerDeUtils.serializeTDatabaseSchema(
@@ -1100,7 +1100,7 @@ public void deserialize(final InputStream inputStream) throws IOException {
11001100
IConfigMNode internalMNode;
11011101
IConfigMNode tableNode;
11021102

1103-
if (type == STORAGE_GROUP_MNODE_TYPE) {
1103+
if (type == DATABASE_MNODE_TYPE) {
11041104
databaseMNode = deserializeDatabaseMNode(inputStream);
11051105
name = databaseMNode.getName();
11061106
stack.push(new Pair<>(databaseMNode, true));
@@ -1132,7 +1132,7 @@ public void deserialize(final InputStream inputStream) throws IOException {
11321132
stack.push(new Pair<>(internalMNode, hasDB));
11331133
name = internalMNode.getName();
11341134
break;
1135-
case STORAGE_GROUP_MNODE_TYPE:
1135+
case DATABASE_MNODE_TYPE:
11361136
databaseMNode = deserializeDatabaseMNode(inputStream).getAsMNode();
11371137
while (!stack.isEmpty() && Boolean.FALSE.equals(stack.peek().right)) {
11381138
databaseMNode.addChild(stack.pop().left);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/snapshot/MemMTreeSnapshotUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@
5656
import java.util.function.BiConsumer;
5757
import java.util.function.Consumer;
5858

59+
import static org.apache.iotdb.commons.schema.SchemaConstant.DATABASE_MNODE_TYPE;
5960
import static org.apache.iotdb.commons.schema.SchemaConstant.ENTITY_MNODE_TYPE;
6061
import static org.apache.iotdb.commons.schema.SchemaConstant.INTERNAL_MNODE_TYPE;
6162
import static org.apache.iotdb.commons.schema.SchemaConstant.LOGICAL_VIEW_MNODE_TYPE;
6263
import static org.apache.iotdb.commons.schema.SchemaConstant.MEASUREMENT_MNODE_TYPE;
6364
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_ENTITY_MNODE_TYPE;
64-
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
6565
import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE;
6666
import static org.apache.iotdb.commons.schema.SchemaConstant.isStorageGroupType;
6767

@@ -249,7 +249,7 @@ private static void deserializeMNode(
249249
currentTableName.set(node.getName());
250250
}
251251
break;
252-
case STORAGE_GROUP_MNODE_TYPE:
252+
case DATABASE_MNODE_TYPE:
253253
childrenNum = ReadWriteIOUtils.readInt(inputStream);
254254
node = deserializer.deserializeStorageGroupMNode(inputStream);
255255
break;
@@ -353,7 +353,7 @@ public Boolean visitDatabaseMNode(
353353
// database node in schemaRegion doesn't store any database schema
354354
return true;
355355
} else {
356-
ReadWriteIOUtils.write(STORAGE_GROUP_MNODE_TYPE, outputStream);
356+
ReadWriteIOUtils.write(DATABASE_MNODE_TYPE, outputStream);
357357
serializeBasicMNode(node.getBasicMNode(), outputStream);
358358
ReadWriteIOUtils.write(0, outputStream); // for compatibly
359359
ReadWriteIOUtils.write(false, outputStream); // for compatibly

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,12 @@
7171
import java.util.Objects;
7272
import java.util.stream.Collectors;
7373

74+
import static org.apache.iotdb.commons.schema.SchemaConstant.DATABASE_MNODE_TYPE;
7475
import static org.apache.iotdb.commons.schema.SchemaConstant.ENTITY_MNODE_TYPE;
7576
import static org.apache.iotdb.commons.schema.SchemaConstant.INTERNAL_MNODE_TYPE;
7677
import static org.apache.iotdb.commons.schema.SchemaConstant.LOGICAL_VIEW_MNODE_TYPE;
7778
import static org.apache.iotdb.commons.schema.SchemaConstant.MEASUREMENT_MNODE_TYPE;
7879
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_ENTITY_MNODE_TYPE;
79-
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
8080
import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE;
8181
import static org.apache.iotdb.commons.schema.SchemaConstant.isStorageGroupType;
8282
import static org.apache.iotdb.db.schemaengine.schemaregion.tag.TagLogFile.parseByteBuffer;
@@ -287,7 +287,7 @@ private IMemMNode deserializeMNode(
287287
this.tableName = node.getName();
288288
}
289289
break;
290-
case STORAGE_GROUP_MNODE_TYPE:
290+
case DATABASE_MNODE_TYPE:
291291
childrenNum = ReadWriteIOUtils.readInt(inputStream);
292292
node = deserializer.deserializeStorageGroupMNode(inputStream);
293293
break;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/SchemaConstant.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private SchemaConstant() {
9191
public static final int ALL_TEMPLATE = -2;
9292

9393
public static final byte INTERNAL_MNODE_TYPE = 0;
94-
public static final byte STORAGE_GROUP_MNODE_TYPE = 1;
94+
public static final byte DATABASE_MNODE_TYPE = 1;
9595
public static final byte MEASUREMENT_MNODE_TYPE = 2;
9696
public static final byte ENTITY_MNODE_TYPE = 3;
9797
public static final byte STORAGE_GROUP_ENTITY_MNODE_TYPE = 4;
@@ -120,7 +120,7 @@ public static String getMNodeTypeName(byte type) {
120120
switch (type) {
121121
case INTERNAL_MNODE_TYPE:
122122
return INTERNAL_MNODE_TYPE_NAME;
123-
case STORAGE_GROUP_MNODE_TYPE:
123+
case DATABASE_MNODE_TYPE:
124124
return STORAGE_GROUP_MNODE_TYPE_NAME;
125125
case MEASUREMENT_MNODE_TYPE:
126126
return MEASUREMENT_MNODE_TYPE_NAME;
@@ -138,6 +138,6 @@ public static String getMNodeTypeName(byte type) {
138138
}
139139

140140
public static boolean isStorageGroupType(byte type) {
141-
return type == STORAGE_GROUP_MNODE_TYPE || type == STORAGE_GROUP_ENTITY_MNODE_TYPE;
141+
return type == DATABASE_MNODE_TYPE || type == STORAGE_GROUP_ENTITY_MNODE_TYPE;
142142
}
143143
}

0 commit comments

Comments
 (0)