Skip to content

Commit 9b3bdc4

Browse files
CaideyipiCRZbulabulahongzhi-gao
authored
Pipe: Implemented tree auth check for source + write-back-sink (#16531)
* remove internal auditor * stash 4 collaborate * Update PermissionManager.java * Merge new pipe privilege (#16476) * fix * fix * fix * fix * fix * fix * fix * fix * refactor * fix * fix * partial * grass * Update PipeConfigTreePrivilegeParseVisitor.java * Feature/client hide password (#16468) * session hide password * fix start-cli.sh * fix start-cli.bat * client hide password * fix client warning * echo cmd line * fix * Update PipeConfigTreePrivilegeParseVisitor.java * partial * partial * fix * fix * partial --------- Co-authored-by: Hongzhi Gao <[email protected]> * entity * FIX COMPILE BUGS * fix * fix * fix * fix * fix * little * fix * remove internal auditor * stash 4 collaborate * FIX COMPILE BUGS * Update PermissionManager.java * Merge new pipe privilege (#16476) * fix * fix * fix * fix * fix * fix * fix * fix * refactor * fix * fix * partial * grass * Update PipeConfigTreePrivilegeParseVisitor.java * Feature/client hide password (#16468) * session hide password * fix start-cli.sh * fix start-cli.bat * client hide password * fix client warning * echo cmd line * fix * Update PipeConfigTreePrivilegeParseVisitor.java * partial * partial * fix * fix * partial --------- Co-authored-by: Hongzhi Gao <[email protected]> * entity * fix * fix * fix * fix * fix * little * fix * partial * append log 4 tree * Update PermissionManager.java * fix pipe bugs Co-Authored-By: Hongzhi Gao <[email protected]> * Bug fix (#16481) * refactor * fix-ut * Audit CI 4 table (#16483) move password his to __audit * spotless * Update DNAuditLogger.java * add more logs * fix ci * Update IoTDBAuditLogBasicIT.java * Update WriteBackSink.java * FIx an audit version * fix * Update IAuthorPlanExecutor.java * fix * fix * fix * fix * try-fix * fix * fix * Move password history under __audit (#16496) * Audit log patch for both tree and table models (#16497) * Pipe: Reduced the conversion logger & Fixed the illegal formats of PipeLogger (#16503) * fix-grass * fix * rest * refactor * fix * fix * fix * fix * fix * fix * refactor * fix * fix * fix * unwebbed-fish * refactor * fix * fix * add-IT * fix * fix * fix * user-null-fix * fix-some * user * refactor * ap * fix * fix * fix * apply * codecov * fix * codecov * fix * apply * fix * ut-fix * split-full * partial * partial-coverage * fix * fix * refactor * fix * with-pro * sptls * fix-temp * pattern-priv * missing-fix * comment * add-IT * fix * fix * remove-sit * fix * test-fix * last * finally * useroot * fix --------- Co-authored-by: Yongzao <[email protected]> Co-authored-by: Hongzhi Gao <[email protected]> Co-authored-by: Yongzao <[email protected]>
1 parent 19a4bef commit 9b3bdc4

File tree

188 files changed

+4448
-1996
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

188 files changed

+4448
-1996
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -976,11 +976,28 @@ public static void executeNonQueriesWithRetry(
976976
}
977977
}
978978

979-
public static void executeNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
979+
public static void executeNonQuery(final BaseEnv env, final String sql) {
980+
executeNonQuery(env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, null);
981+
}
982+
983+
public static void executeNonQuery(
984+
final BaseEnv env, final String sql, final Connection defaultConnection) {
980985
executeNonQuery(
981986
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
982987
}
983988

989+
public static void executeNonQuery(
990+
final String dataBaseName, final String sqlDialect, final BaseEnv env, final String sql) {
991+
executeNonQuery(
992+
env,
993+
sql,
994+
SessionConfig.DEFAULT_USER,
995+
SessionConfig.DEFAULT_PASSWORD,
996+
dataBaseName,
997+
sqlDialect,
998+
null);
999+
}
1000+
9841001
public static void executeNonQuery(
9851002
String dataBaseName,
9861003
String sqlDialect,
@@ -1020,6 +1037,17 @@ public static void executeNonQuery(
10201037
defaultConnection);
10211038
}
10221039

1040+
public static void executeNonQueries(BaseEnv env, List<String> sqlList) {
1041+
executeNonQueries(
1042+
env,
1043+
sqlList,
1044+
SessionConfig.DEFAULT_USER,
1045+
SessionConfig.DEFAULT_PASSWORD,
1046+
null,
1047+
TREE_SQL_DIALECT,
1048+
null);
1049+
}
1050+
10231051
public static void executeNonQueries(
10241052
BaseEnv env, List<String> sqlList, Connection defaultConnection) {
10251053
executeNonQueries(

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeLifeCycleIT.java

Lines changed: 115 additions & 115 deletions
Large diffs are not rendered by default.

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ public void testSourcePermission() {
184184

185185
// Grant some privilege
186186
TestUtils.executeNonQuery(
187-
"test", BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant INSERT on any to user thulab", null);
187+
"test", BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant INSERT on any to user thulab");
188188

189189
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
190190

191-
// Shall not be transferred
191+
// Shall be transferred
192192
TestUtils.assertDataEventuallyOnEnv(
193193
receiverEnv,
194194
"show tables from test1",

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeIdempotentIT.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -215,28 +215,28 @@ private void testTableConfigIdempotent(final List<String> beforeSqlList, final S
215215

216216
try (final SyncConfigNodeIServiceClient client =
217217
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
218-
final Map<String, String> extractorAttributes = new HashMap<>();
218+
final Map<String, String> sourceAttributes = new HashMap<>();
219219
final Map<String, String> processorAttributes = new HashMap<>();
220-
final Map<String, String> connectorAttributes = new HashMap<>();
221-
222-
extractorAttributes.put("extractor.inclusion", "all");
223-
extractorAttributes.put("extractor.inclusion.exclusion", "");
224-
extractorAttributes.put("extractor.forwarding-pipe-requests", "false");
225-
extractorAttributes.put("extractor.capture.table", "true");
226-
extractorAttributes.put("extractor.capture.tree", "false");
227-
extractorAttributes.put("user", "root");
228-
229-
connectorAttributes.put("connector", "iotdb-thrift-connector");
230-
connectorAttributes.put("connector.ip", receiverIp);
231-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
232-
connectorAttributes.put("connector.batch.enable", "false");
233-
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
234-
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
220+
final Map<String, String> sinkAttributes = new HashMap<>();
221+
222+
sourceAttributes.put("source.inclusion", "all");
223+
sourceAttributes.put("source.inclusion.exclusion", "");
224+
sourceAttributes.put("source.forwarding-pipe-requests", "false");
225+
sourceAttributes.put("source.capture.table", "true");
226+
sourceAttributes.put("source.capture.tree", "false");
227+
sourceAttributes.put("user", "root");
228+
229+
sinkAttributes.put("sink", "iotdb-thrift-sink");
230+
sinkAttributes.put("sink.ip", receiverIp);
231+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
232+
sinkAttributes.put("sink.batch.enable", "false");
233+
sinkAttributes.put("sink.exception.conflict.resolve-strategy", "retry");
234+
sinkAttributes.put("sink.exception.conflict.retry-max-time-seconds", "-1");
235235

236236
final TSStatus status =
237237
client.createPipe(
238-
new TCreatePipeReq("testPipe", connectorAttributes)
239-
.setExtractorAttributes(extractorAttributes)
238+
new TCreatePipeReq("testPipe", sinkAttributes)
239+
.setExtractorAttributes(sourceAttributes)
240240
.setProcessorAttributes(processorAttributes));
241241

242242
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java

Lines changed: 72 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,23 @@ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
6969
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"),
7070
null);
7171

72-
final Map<String, String> extractorAttributes = new HashMap<>();
72+
final Map<String, String> sourceAttributes = new HashMap<>();
7373
final Map<String, String> processorAttributes = new HashMap<>();
74-
final Map<String, String> connectorAttributes = new HashMap<>();
74+
final Map<String, String> sinkAttributes = new HashMap<>();
7575

76-
extractorAttributes.put("extractor.realtime.mode", "log");
76+
sourceAttributes.put("source.realtime.mode", "log");
77+
sourceAttributes.put("user", "root");
7778

78-
connectorAttributes.put("connector", "iotdb-thrift-connector");
79-
connectorAttributes.put("connector.batch.enable", "false");
80-
connectorAttributes.put("connector.ip", receiverIp);
81-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
82-
connectorAttributes.put("connector.realtime-first", "false");
79+
sinkAttributes.put("sink", "iotdb-thrift-sink");
80+
sinkAttributes.put("sink.batch.enable", "false");
81+
sinkAttributes.put("sink.ip", receiverIp);
82+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
83+
sinkAttributes.put("sink.realtime-first", "false");
8384

8485
final TSStatus status =
8586
client.createPipe(
86-
new TCreatePipeReq("testPipe", connectorAttributes)
87-
.setExtractorAttributes(extractorAttributes)
87+
new TCreatePipeReq("testPipe", sinkAttributes)
88+
.setExtractorAttributes(sourceAttributes)
8889
.setProcessorAttributes(processorAttributes));
8990

9091
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -133,25 +134,26 @@ private void testSinkFormat(final String format) throws Exception {
133134
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
134135
null);
135136

136-
final Map<String, String> extractorAttributes = new HashMap<>();
137+
final Map<String, String> sourceAttributes = new HashMap<>();
137138
final Map<String, String> processorAttributes = new HashMap<>();
138-
final Map<String, String> connectorAttributes = new HashMap<>();
139+
final Map<String, String> sinkAttributes = new HashMap<>();
139140

140-
extractorAttributes.put("extractor.realtime.mode", "forced-log");
141+
sourceAttributes.put("source.realtime.mode", "forced-log");
142+
sourceAttributes.put("user", "root");
141143

142-
connectorAttributes.put("connector", "iotdb-thrift-connector");
143-
connectorAttributes.put("connector.batch.enable", "false");
144-
connectorAttributes.put("connector.ip", receiverIp);
145-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
146-
connectorAttributes.put("connector.format", format);
147-
connectorAttributes.put("connector.realtime-first", "false");
144+
sinkAttributes.put("sink", "iotdb-thrift-sink");
145+
sinkAttributes.put("sink.batch.enable", "false");
146+
sinkAttributes.put("sink.ip", receiverIp);
147+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
148+
sinkAttributes.put("sink.format", format);
149+
sinkAttributes.put("sink.realtime-first", "false");
148150

149151
Assert.assertEquals(
150152
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
151153
client
152154
.createPipe(
153-
new TCreatePipeReq("testPipe", connectorAttributes)
154-
.setExtractorAttributes(extractorAttributes)
155+
new TCreatePipeReq("testPipe", sinkAttributes)
156+
.setExtractorAttributes(sourceAttributes)
155157
.setProcessorAttributes(processorAttributes))
156158
.getCode());
157159

@@ -178,8 +180,8 @@ private void testSinkFormat(final String format) throws Exception {
178180
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
179181
client
180182
.createPipe(
181-
new TCreatePipeReq("testPipe", connectorAttributes)
182-
.setExtractorAttributes(extractorAttributes)
183+
new TCreatePipeReq("testPipe", sinkAttributes)
184+
.setExtractorAttributes(sourceAttributes)
183185
.setProcessorAttributes(processorAttributes))
184186
.getCode());
185187

@@ -211,24 +213,25 @@ public void testLegacyConnector() throws Exception {
211213

212214
try (final SyncConfigNodeIServiceClient client =
213215
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
214-
final Map<String, String> extractorAttributes = new HashMap<>();
216+
final Map<String, String> sourceAttributes = new HashMap<>();
215217
final Map<String, String> processorAttributes = new HashMap<>();
216-
final Map<String, String> connectorAttributes = new HashMap<>();
218+
final Map<String, String> sinkAttributes = new HashMap<>();
217219

218-
extractorAttributes.put("source.realtime.mode", "log");
220+
sourceAttributes.put("source.realtime.mode", "log");
221+
sourceAttributes.put("user", "root");
219222

220-
connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
221-
connectorAttributes.put("sink.batch.enable", "false");
222-
connectorAttributes.put("sink.ip", receiverIp);
223-
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
223+
sinkAttributes.put("sink", "iotdb-legacy-pipe-sink");
224+
sinkAttributes.put("sink.batch.enable", "false");
225+
sinkAttributes.put("sink.ip", receiverIp);
226+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
224227

225228
// This version does not matter since it's no longer checked by the legacy receiver
226-
connectorAttributes.put("sink.version", "1.3");
229+
sinkAttributes.put("sink.version", "1.3");
227230

228231
final TSStatus status =
229232
client.createPipe(
230-
new TCreatePipeReq("testPipe", connectorAttributes)
231-
.setExtractorAttributes(extractorAttributes)
233+
new TCreatePipeReq("testPipe", sinkAttributes)
234+
.setExtractorAttributes(sourceAttributes)
232235
.setProcessorAttributes(processorAttributes));
233236

234237
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -257,6 +260,7 @@ public void testReceiverAutoCreateByLog() throws Exception {
257260
new HashMap<String, String>() {
258261
{
259262
put("source.realtime.mode", "forced-log");
263+
put("user", "root");
260264
}
261265
});
262266
}
@@ -267,6 +271,7 @@ public void testReceiverAutoCreateByFile() throws Exception {
267271
new HashMap<String, String>() {
268272
{
269273
put("source.realtime.mode", "batch");
274+
put("user", "root");
270275
}
271276
});
272277
}
@@ -278,12 +283,12 @@ public void testReceiverAutoCreateWithPattern() throws Exception {
278283
{
279284
put("source.realtime.mode", "batch");
280285
put("source.path", "root.ln.wf01.wt0*.*");
286+
put("user", "root");
281287
}
282288
});
283289
}
284290

285-
private void testReceiverAutoCreate(final Map<String, String> extractorAttributes)
286-
throws Exception {
291+
private void testReceiverAutoCreate(final Map<String, String> sourceAttributes) throws Exception {
287292
final Consumer<String> handleFailure =
288293
o -> {
289294
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
@@ -298,17 +303,17 @@ private void testReceiverAutoCreate(final Map<String, String> extractorAttribute
298303
try (final SyncConfigNodeIServiceClient client =
299304
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
300305
final Map<String, String> processorAttributes = new HashMap<>();
301-
final Map<String, String> connectorAttributes = new HashMap<>();
306+
final Map<String, String> sinkAttributes = new HashMap<>();
302307

303-
connectorAttributes.put("sink", "iotdb-thrift-sink");
304-
connectorAttributes.put("sink.batch.enable", "false");
305-
connectorAttributes.put("sink.ip", receiverIp);
306-
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
308+
sinkAttributes.put("sink", "iotdb-thrift-sink");
309+
sinkAttributes.put("sink.batch.enable", "false");
310+
sinkAttributes.put("sink.ip", receiverIp);
311+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
307312

308313
final TSStatus status =
309314
client.createPipe(
310-
new TCreatePipeReq("testPipe", connectorAttributes)
311-
.setExtractorAttributes(extractorAttributes)
315+
new TCreatePipeReq("testPipe", sinkAttributes)
316+
.setExtractorAttributes(sourceAttributes)
312317
.setProcessorAttributes(processorAttributes));
313318

314319
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -389,24 +394,25 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
389394
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
390395
null);
391396

392-
final Map<String, String> extractorAttributes = new HashMap<>();
397+
final Map<String, String> sourceAttributes = new HashMap<>();
393398
final Map<String, String> processorAttributes = new HashMap<>();
394-
final Map<String, String> connectorAttributes = new HashMap<>();
399+
final Map<String, String> sinkAttributes = new HashMap<>();
395400

396-
extractorAttributes.put("extractor.realtime.mode", "forced-log");
401+
sourceAttributes.put("source.realtime.mode", "forced-log");
402+
sourceAttributes.put("user", "root");
397403

398-
connectorAttributes.put("sink", "iotdb-thrift-sink");
399-
connectorAttributes.put("sink.batch.enable", "false");
400-
connectorAttributes.put("sink.ip", receiverIp);
401-
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
402-
connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
404+
sinkAttributes.put("sink", "iotdb-thrift-sink");
405+
sinkAttributes.put("sink.batch.enable", "false");
406+
sinkAttributes.put("sink.ip", receiverIp);
407+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
408+
sinkAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
403409

404410
Assert.assertEquals(
405411
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
406412
client
407413
.createPipe(
408-
new TCreatePipeReq("testPipe", connectorAttributes)
409-
.setExtractorAttributes(extractorAttributes)
414+
new TCreatePipeReq("testPipe", sinkAttributes)
415+
.setExtractorAttributes(sourceAttributes)
410416
.setProcessorAttributes(processorAttributes))
411417
.getCode());
412418

@@ -433,8 +439,8 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
433439
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
434440
client
435441
.createPipe(
436-
new TCreatePipeReq("testPipe", connectorAttributes)
437-
.setExtractorAttributes(extractorAttributes)
442+
new TCreatePipeReq("testPipe", sinkAttributes)
443+
.setExtractorAttributes(sourceAttributes)
438444
.setProcessorAttributes(processorAttributes))
439445
.getCode());
440446

@@ -483,25 +489,26 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
483489
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
484490
null);
485491

486-
final Map<String, String> extractorAttributes = new HashMap<>();
492+
final Map<String, String> sourceAttributes = new HashMap<>();
487493
final Map<String, String> processorAttributes = new HashMap<>();
488-
final Map<String, String> connectorAttributes = new HashMap<>();
494+
final Map<String, String> sinkAttributes = new HashMap<>();
489495

490-
extractorAttributes.put("extractor.realtime.mode", "batch");
496+
sourceAttributes.put("source.realtime.mode", "batch");
497+
sourceAttributes.put("user", "root");
491498

492-
connectorAttributes.put("sink", "iotdb-thrift-sink");
493-
connectorAttributes.put("sink.batch.enable", "false");
494-
connectorAttributes.put("sink.ip", receiverIp);
495-
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
496-
connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
497-
connectorAttributes.put("sink.tsfile.validation", "false");
499+
sinkAttributes.put("sink", "iotdb-thrift-sink");
500+
sinkAttributes.put("sink.batch.enable", "false");
501+
sinkAttributes.put("sink.ip", receiverIp);
502+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
503+
sinkAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
504+
sinkAttributes.put("sink.tsfile.validation", "false");
498505

499506
Assert.assertEquals(
500507
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
501508
client
502509
.createPipe(
503-
new TCreatePipeReq("testPipe", connectorAttributes)
504-
.setExtractorAttributes(extractorAttributes)
510+
new TCreatePipeReq("testPipe", sinkAttributes)
511+
.setExtractorAttributes(sourceAttributes)
505512
.setProcessorAttributes(processorAttributes))
506513
.getCode());
507514

0 commit comments

Comments
 (0)