@@ -215,7 +215,7 @@ public void testIoTDBPatternWithLegacySyntax() throws Exception {
215215 //////////////////////////// Multiple & Exclusion ////////////////////////////
216216
217217 private void testPipeWithMultiplePatterns (
218- final Map <String , String > extractorAttributes ,
218+ final Map <String , String > sourceAttributes ,
219219 final List <String > insertQueries ,
220220 final boolean isHistorical ,
221221 final String validationSelectQuery ,
@@ -249,7 +249,7 @@ private void testPipeWithMultiplePatterns(
249249 final TSStatus createStatus =
250250 client .createPipe (
251251 new TCreatePipeReq ("p1" , connectorAttributes )
252- .setExtractorAttributes (extractorAttributes )
252+ .setExtractorAttributes (sourceAttributes )
253253 .setProcessorAttributes (processorAttributes ));
254254 Assert .assertEquals (TSStatusCode .SUCCESS_STATUS .getStatusCode (), createStatus .getCode ());
255255
@@ -271,10 +271,11 @@ private void testPipeWithMultiplePatterns(
271271
272272 @ Test
273273 public void testMultiplePrefixPatternHistoricalData () throws Exception {
274- // Define extractor attributes
275- final Map <String , String > extractorAttributes = new HashMap <>();
276- extractorAttributes .put ("extractor.pattern" , "root.db.d1.s, root.db2.d1.s" );
277- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
274+ // Define source attributes
275+ final Map <String , String > sourceAttributes = new HashMap <>();
276+ sourceAttributes .put ("source.pattern" , "root.db.d1.s, root.db2.d1.s" );
277+ sourceAttributes .put ("source.inclusion" , "data.insert" );
278+ sourceAttributes .put ("user" , "root" );
278279
279280 // Define data to be inserted
280281 final List <String > insertQueries =
@@ -290,7 +291,7 @@ public void testMultiplePrefixPatternHistoricalData() throws Exception {
290291
291292 // Execute the common test logic
292293 testPipeWithMultiplePatterns (
293- extractorAttributes ,
294+ sourceAttributes ,
294295 insertQueries ,
295296 true , // isHistorical = true
296297 "select * from root.db2.**,root.db.**" ,
@@ -300,9 +301,10 @@ public void testMultiplePrefixPatternHistoricalData() throws Exception {
300301
301302 @ Test
302303 public void testMultiplePrefixPatternRealtimeData () throws Exception {
303- final Map <String , String > extractorAttributes = new HashMap <>();
304- extractorAttributes .put ("extractor.pattern" , "root.db.d1.s, root.db2.d1.s" );
305- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
304+ final Map <String , String > sourceAttributes = new HashMap <>();
305+ sourceAttributes .put ("source.pattern" , "root.db.d1.s, root.db2.d1.s" );
306+ sourceAttributes .put ("source.inclusion" , "data.insert" );
307+ sourceAttributes .put ("user" , "root" );
306308
307309 final List <String > insertQueries =
308310 Arrays .asList (
@@ -315,7 +317,7 @@ public void testMultiplePrefixPatternRealtimeData() throws Exception {
315317 expectedResSet .add ("3,3.0,null,null," );
316318
317319 testPipeWithMultiplePatterns (
318- extractorAttributes ,
320+ sourceAttributes ,
319321 insertQueries ,
320322 false , // isHistorical = false
321323 "select * from root.db2.**,root.db.**" ,
@@ -325,9 +327,10 @@ public void testMultiplePrefixPatternRealtimeData() throws Exception {
325327
326328 @ Test
327329 public void testMultipleIoTDBPatternHistoricalData () throws Exception {
328- final Map <String , String > extractorAttributes = new HashMap <>();
329- extractorAttributes .put ("extractor.path" , "root.db.**, root.db2.d1.*" );
330- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
330+ final Map <String , String > sourceAttributes = new HashMap <>();
331+ sourceAttributes .put ("source.path" , "root.db.**, root.db2.d1.*" );
332+ sourceAttributes .put ("source.inclusion" , "data.insert" );
333+ sourceAttributes .put ("user" , "root" );
331334
332335 final List <String > insertQueries =
333336 Arrays .asList (
@@ -342,7 +345,7 @@ public void testMultipleIoTDBPatternHistoricalData() throws Exception {
342345 expectedResSet .add ("3,3.0,3.0,null,null,null," );
343346
344347 testPipeWithMultiplePatterns (
345- extractorAttributes ,
348+ sourceAttributes ,
346349 insertQueries ,
347350 true , // isHistorical = true
348351 "select * from root.db2.**,root.db.**" ,
@@ -352,9 +355,10 @@ public void testMultipleIoTDBPatternHistoricalData() throws Exception {
352355
353356 @ Test
354357 public void testMultipleIoTDBPatternRealtimeData () throws Exception {
355- final Map <String , String > extractorAttributes = new HashMap <>();
356- extractorAttributes .put ("extractor.path" , "root.db.**, root.db2.d1.*" );
357- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
358+ final Map <String , String > sourceAttributes = new HashMap <>();
359+ sourceAttributes .put ("source.path" , "root.db.**, root.db2.d1.*" );
360+ sourceAttributes .put ("source.inclusion" , "data.insert" );
361+ sourceAttributes .put ("user" , "root" );
358362
359363 final List <String > insertQueries =
360364 Arrays .asList (
@@ -369,7 +373,7 @@ public void testMultipleIoTDBPatternRealtimeData() throws Exception {
369373 expectedResSet .add ("3,3.0,3.0,null,null,null," );
370374
371375 testPipeWithMultiplePatterns (
372- extractorAttributes ,
376+ sourceAttributes ,
373377 insertQueries ,
374378 false , // isHistorical = false
375379 "select * from root.db2.**,root.db.**" ,
@@ -379,10 +383,11 @@ public void testMultipleIoTDBPatternRealtimeData() throws Exception {
379383
380384 @ Test
381385 public void testMultipleHybridPatternHistoricalData () throws Exception {
382- final Map <String , String > extractorAttributes = new HashMap <>();
383- extractorAttributes .put ("extractor.path" , "root.db.d1.*" );
384- extractorAttributes .put ("extractor.pattern" , "root.db2.d1.s" );
385- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
386+ final Map <String , String > sourceAttributes = new HashMap <>();
387+ sourceAttributes .put ("source.path" , "root.db.d1.*" );
388+ sourceAttributes .put ("source.pattern" , "root.db2.d1.s" );
389+ sourceAttributes .put ("source.inclusion" , "data.insert" );
390+ sourceAttributes .put ("user" , "root" );
386391
387392 final List <String > insertQueries =
388393 Arrays .asList (
@@ -395,7 +400,7 @@ public void testMultipleHybridPatternHistoricalData() throws Exception {
395400 expectedResSet .add ("2,null,null,2.0," );
396401
397402 testPipeWithMultiplePatterns (
398- extractorAttributes ,
403+ sourceAttributes ,
399404 insertQueries ,
400405 true , // isHistorical = true
401406 "select * from root.db.**,root.db2.**" ,
@@ -405,10 +410,11 @@ public void testMultipleHybridPatternHistoricalData() throws Exception {
405410
406411 @ Test
407412 public void testMultipleHybridPatternRealtimeData () throws Exception {
408- final Map <String , String > extractorAttributes = new HashMap <>();
409- extractorAttributes .put ("extractor.path" , "root.db.d1.*" );
410- extractorAttributes .put ("extractor.pattern" , "root.db2.d1.s" );
411- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
413+ final Map <String , String > sourceAttributes = new HashMap <>();
414+ sourceAttributes .put ("source.path" , "root.db.d1.*" );
415+ sourceAttributes .put ("source.pattern" , "root.db2.d1.s" );
416+ sourceAttributes .put ("source.inclusion" , "data.insert" );
417+ sourceAttributes .put ("user" , "root" );
412418
413419 final List <String > insertQueries =
414420 Arrays .asList (
@@ -421,7 +427,7 @@ public void testMultipleHybridPatternRealtimeData() throws Exception {
421427 expectedResSet .add ("2,null,null,2.0," );
422428
423429 testPipeWithMultiplePatterns (
424- extractorAttributes ,
430+ sourceAttributes ,
425431 insertQueries ,
426432 false , // isHistorical = false
427433 "select * from root.db.**,root.db2.**" ,
@@ -431,12 +437,13 @@ public void testMultipleHybridPatternRealtimeData() throws Exception {
431437
432438 @ Test
433439 public void testPrefixPatternWithExclusionHistoricalData () throws Exception {
434- final Map <String , String > extractorAttributes = new HashMap <>();
440+ final Map <String , String > sourceAttributes = new HashMap <>();
435441 // Inclusion: Match everything under root.db.d1 and root.db.d2
436- extractorAttributes .put ("extractor .pattern" , "root.db.d1, root.db.d2" );
442+ sourceAttributes .put ("source .pattern" , "root.db.d1, root.db.d2" );
437443 // Exclusion: Exclude anything with the prefix root.db.d1.s1
438- extractorAttributes .put ("extractor.pattern.exclusion" , "root.db.d1.s1" );
439- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
444+ sourceAttributes .put ("source.pattern.exclusion" , "root.db.d1.s1" );
445+ sourceAttributes .put ("source.inclusion" , "data.insert" );
446+ sourceAttributes .put ("user" , "root" );
440447
441448 final List <String > insertQueries =
442449 Arrays .asList (
@@ -451,7 +458,7 @@ public void testPrefixPatternWithExclusionHistoricalData() throws Exception {
451458 expectedResSet .add ("2,null,2.0," );
452459
453460 testPipeWithMultiplePatterns (
454- extractorAttributes ,
461+ sourceAttributes ,
455462 insertQueries ,
456463 true , // isHistorical = true
457464 "select * from root.db.**" ,
@@ -461,10 +468,11 @@ public void testPrefixPatternWithExclusionHistoricalData() throws Exception {
461468
462469 @ Test
463470 public void testPrefixPatternWithExclusionRealtimeData () throws Exception {
464- final Map <String , String > extractorAttributes = new HashMap <>();
465- extractorAttributes .put ("extractor.pattern" , "root.db.d1, root.db.d2" );
466- extractorAttributes .put ("extractor.pattern.exclusion" , "root.db.d1.s1" );
467- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
471+ final Map <String , String > sourceAttributes = new HashMap <>();
472+ sourceAttributes .put ("source.pattern" , "root.db.d1, root.db.d2" );
473+ sourceAttributes .put ("source.pattern.exclusion" , "root.db.d1.s1" );
474+ sourceAttributes .put ("source.inclusion" , "data.insert" );
475+ sourceAttributes .put ("user" , "root" );
468476
469477 final List <String > insertQueries =
470478 Arrays .asList (
@@ -477,7 +485,7 @@ public void testPrefixPatternWithExclusionRealtimeData() throws Exception {
477485 expectedResSet .add ("2,null,2.0," );
478486
479487 testPipeWithMultiplePatterns (
480- extractorAttributes ,
488+ sourceAttributes ,
481489 insertQueries ,
482490 false , // isHistorical = false
483491 "select * from root.db.**" ,
@@ -487,12 +495,13 @@ public void testPrefixPatternWithExclusionRealtimeData() throws Exception {
487495
488496 @ Test
489497 public void testIoTDBPatternWithExclusionHistoricalData () throws Exception {
490- final Map <String , String > extractorAttributes = new HashMap <>();
498+ final Map <String , String > sourceAttributes = new HashMap <>();
491499 // Inclusion: Match everything under root.db
492- extractorAttributes .put ("extractor .path" , "root.db.**" );
500+ sourceAttributes .put ("source .path" , "root.db.**" );
493501 // Exclusion: Exclude root.db.d1.s* and root.db.d3.*
494- extractorAttributes .put ("extractor.path.exclusion" , "root.db.d1.s*, root.db.d3.*" );
495- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
502+ sourceAttributes .put ("source.path.exclusion" , "root.db.d1.s*, root.db.d3.*" );
503+ sourceAttributes .put ("source.inclusion" , "data.insert" );
504+ sourceAttributes .put ("user" , "root" );
496505
497506 final List <String > insertQueries =
498507 Arrays .asList (
@@ -509,7 +518,7 @@ public void testIoTDBPatternWithExclusionHistoricalData() throws Exception {
509518 expectedResSet .add ("2,null,2.0," );
510519
511520 testPipeWithMultiplePatterns (
512- extractorAttributes ,
521+ sourceAttributes ,
513522 insertQueries ,
514523 true , // isHistorical = true
515524 "select * from root.db.**" ,
@@ -519,10 +528,11 @@ public void testIoTDBPatternWithExclusionHistoricalData() throws Exception {
519528
520529 @ Test
521530 public void testIoTDBPatternWithExclusionRealtimeData () throws Exception {
522- final Map <String , String > extractorAttributes = new HashMap <>();
523- extractorAttributes .put ("extractor.path" , "root.db.**" );
524- extractorAttributes .put ("extractor.path.exclusion" , "root.db.d1.s*, root.db.d3.*" );
525- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
531+ final Map <String , String > sourceAttributes = new HashMap <>();
532+ sourceAttributes .put ("source.path" , "root.db.**" );
533+ sourceAttributes .put ("source.path.exclusion" , "root.db.d1.s*, root.db.d3.*" );
534+ sourceAttributes .put ("source.inclusion" , "data.insert" );
535+ sourceAttributes .put ("user" , "root" );
526536
527537 final List <String > insertQueries =
528538 Arrays .asList (
@@ -536,7 +546,7 @@ public void testIoTDBPatternWithExclusionRealtimeData() throws Exception {
536546 expectedResSet .add ("2,null,2.0," );
537547
538548 testPipeWithMultiplePatterns (
539- extractorAttributes ,
549+ sourceAttributes ,
540550 insertQueries ,
541551 false , // isHistorical = false
542552 "select * from root.db.**" ,
@@ -546,14 +556,15 @@ public void testIoTDBPatternWithExclusionRealtimeData() throws Exception {
546556
547557 @ Test
548558 public void testHybridPatternWithHybridExclusionHistoricalData () throws Exception {
549- final Map <String , String > extractorAttributes = new HashMap <>();
559+ final Map <String , String > sourceAttributes = new HashMap <>();
550560 // Inclusion: Match root.db.** (IoTDB) AND root.db2.d1 (Prefix)
551- extractorAttributes .put ("extractor .path" , "root.db.**" );
552- extractorAttributes .put ("extractor .pattern" , "root.db2.d1" );
561+ sourceAttributes .put ("source .path" , "root.db.**" );
562+ sourceAttributes .put ("source .pattern" , "root.db2.d1" );
553563 // Exclusion: Exclude root.db.d1.* (IoTDB) AND root.db2.d1.s (Prefix)
554- extractorAttributes .put ("extractor.path.exclusion" , "root.db.d1.*" );
555- extractorAttributes .put ("extractor.pattern.exclusion" , "root.db2.d1.s" );
556- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
564+ sourceAttributes .put ("source.path.exclusion" , "root.db.d1.*" );
565+ sourceAttributes .put ("source.pattern.exclusion" , "root.db2.d1.s" );
566+ sourceAttributes .put ("source.inclusion" , "data.insert" );
567+ sourceAttributes .put ("user" , "root" );
557568
558569 final List <String > insertQueries =
559570 Arrays .asList (
@@ -570,7 +581,7 @@ public void testHybridPatternWithHybridExclusionHistoricalData() throws Exceptio
570581 expectedResSet .add ("3,null,3.0," );
571582
572583 testPipeWithMultiplePatterns (
573- extractorAttributes ,
584+ sourceAttributes ,
574585 insertQueries ,
575586 true , // isHistorical = true
576587 "select * from root.db.**,root.db2.**" ,
@@ -580,12 +591,13 @@ public void testHybridPatternWithHybridExclusionHistoricalData() throws Exceptio
580591
581592 @ Test
582593 public void testHybridPatternWithHybridExclusionRealtimeData () throws Exception {
583- final Map <String , String > extractorAttributes = new HashMap <>();
584- extractorAttributes .put ("extractor.path" , "root.db.**" );
585- extractorAttributes .put ("extractor.pattern" , "root.db2.d1" );
586- extractorAttributes .put ("extractor.path.exclusion" , "root.db.d1.*" );
587- extractorAttributes .put ("extractor.pattern.exclusion" , "root.db2.d1.s" );
588- extractorAttributes .put ("extractor.inclusion" , "data.insert" );
594+ final Map <String , String > sourceAttributes = new HashMap <>();
595+ sourceAttributes .put ("source.path" , "root.db.**" );
596+ sourceAttributes .put ("source.pattern" , "root.db2.d1" );
597+ sourceAttributes .put ("source.path.exclusion" , "root.db.d1.*" );
598+ sourceAttributes .put ("source.pattern.exclusion" , "root.db2.d1.s" );
599+ sourceAttributes .put ("source.inclusion" , "data.insert" );
600+ sourceAttributes .put ("user" , "root" );
589601
590602 final List <String > insertQueries =
591603 Arrays .asList (
@@ -599,7 +611,7 @@ public void testHybridPatternWithHybridExclusionRealtimeData() throws Exception
599611 expectedResSet .add ("3,null,3.0," );
600612
601613 testPipeWithMultiplePatterns (
602- extractorAttributes ,
614+ sourceAttributes ,
603615 insertQueries ,
604616 false , // isHistorical = false
605617 "select * from root.db.**,root.db2.**" ,
0 commit comments