4949import org .apache .iotdb .pipe .api .exception .PipeException ;
5050
5151import org .apache .tsfile .file .metadata .IDeviceID ;
52- import org .apache .tsfile .file .metadata .PlainDeviceID ;
5352import org .slf4j .Logger ;
5453import org .slf4j .LoggerFactory ;
5554
6564import java .util .concurrent .atomic .AtomicLong ;
6665import java .util .concurrent .atomic .AtomicReference ;
6766
68- import static org .apache .tsfile .common .constant .TsFileConstant .PATH_ROOT ;
69- import static org .apache .tsfile .common .constant .TsFileConstant .PATH_SEPARATOR ;
70-
7167public class PipeTsFileInsertionEvent extends PipeInsertionEvent
7268 implements TsFileInsertionEvent , ReferenceTrackableEvent {
7369
7470 private static final Logger LOGGER = LoggerFactory .getLogger (PipeTsFileInsertionEvent .class );
75- private static final String TREE_MODEL_EVENT_TABLE_NAME_PREFIX = PATH_ROOT + PATH_SEPARATOR ;
7671
7772 private final TsFileResource resource ;
7873 private File tsFile ;
@@ -96,6 +91,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
9691 protected long flushPointCount = TsFileProcessor .FLUSH_POINT_COUNT_NOT_SET ;
9792
9893 protected volatile ProgressIndex overridingProgressIndex ;
94+ private Set <String > tableNames ;
9995
10096 public PipeTsFileInsertionEvent (
10197 final Boolean isTableModelEvent ,
@@ -112,6 +108,7 @@ public PipeTsFileInsertionEvent(
112108 isLoaded ,
113109 false ,
114110 null ,
111+ null ,
115112 0 ,
116113 null ,
117114 null ,
@@ -130,6 +127,7 @@ public PipeTsFileInsertionEvent(
130127 final boolean isWithMod ,
131128 final boolean isLoaded ,
132129 final boolean isGeneratedByHistoricalExtractor ,
130+ final Set <String > tableNames ,
133131 final String pipeName ,
134132 final long creationTime ,
135133 final PipeTaskMeta pipeTaskMeta ,
@@ -170,6 +168,7 @@ public PipeTsFileInsertionEvent(
170168 this .isGeneratedByPipe = resource .isGeneratedByPipe ();
171169 this .isGeneratedByPipeConsensus = resource .isGeneratedByPipeConsensus ();
172170 this .isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor ;
171+ this .tableNames = tableNames ;
173172
174173 isClosed = new AtomicBoolean (resource .isClosed ());
175174 // Register close listener if TsFile is not closed
@@ -412,6 +411,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
412411 isWithMod ,
413412 isLoaded ,
414413 isGeneratedByHistoricalExtractor ,
414+ tableNames ,
415415 pipeName ,
416416 creationTime ,
417417 pipeTaskMeta ,
@@ -438,23 +438,22 @@ public void throwIfNoPrivilege() {
438438 LOGGER .info ("Temporary tsFile {} detected, will skip its transfer." , tsFile );
439439 return ;
440440 }
441- for (final IDeviceID deviceID : getDeviceSet () ) {
441+ for (final String table : tableNames ) {
442442 if (!tablePattern .matchesDatabase (getTableModelDatabaseName ())
443- || !tablePattern .matchesTable (deviceID . getTableName () )) {
443+ || !tablePattern .matchesTable (table )) {
444444 continue ;
445445 }
446446 if (!Coordinator .getInstance ()
447447 .getAccessControl ()
448448 .checkCanSelectFromTable4Pipe (
449- userName ,
450- new QualifiedObjectName (getTableModelDatabaseName (), deviceID .getTableName ()))) {
449+ userName , new QualifiedObjectName (getTableModelDatabaseName (), table ))) {
451450 if (skipIfNoPrivileges ) {
452451 shouldParse4Privilege = true ;
453452 } else {
454453 throw new AccessDeniedException (
455454 String .format (
456455 "No privilege for SELECT for user %s at table %s.%s" ,
457- userName , tableModelDatabaseName , deviceID . getTableName () ));
456+ userName , tableModelDatabaseName , table ));
458457 }
459458 }
460459 }
@@ -488,29 +487,14 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
488487
489488 @ Override
490489 public boolean mayEventPathsOverlappedWithPattern () {
491- if (Objects .isNull (resource ) || !resource .isClosed ()) {
490+ if (Objects .isNull (resource ) || !resource .isClosed () || isTableModelEvent () ) {
492491 return true ;
493492 }
494493
495494 try {
496- return getDeviceSet ().stream ()
497- .anyMatch (
498- deviceID -> {
499- // Tree model
500- if (Boolean .FALSE .equals (getRawIsTableModelEvent ())
501- || deviceID instanceof PlainDeviceID
502- || deviceID .getTableName ().startsWith (TREE_MODEL_EVENT_TABLE_NAME_PREFIX )
503- || deviceID .getTableName ().equals (PATH_ROOT )) {
504- markAsTreeModelEvent ();
505- return treePattern .mayOverlapWithDevice (deviceID );
506- }
507-
508- // Table model
509- markAsTableModelEvent ();
510- return true ;
511- });
495+ return getDeviceSet ().stream ().anyMatch (treePattern ::mayOverlapWithDevice );
512496 } catch (final Exception e ) {
513- LOGGER .warn (
497+ LOGGER .info (
514498 "Pipe {}: failed to get devices from TsFile {}, extract it anyway" ,
515499 pipeName ,
516500 resource .getTsFilePath (),
@@ -526,9 +510,14 @@ private Set<IDeviceID> getDeviceSet() throws IOException {
526510 PipeTsFileResourceManager .getHardlinkOrCopiedFileInPipeDir (
527511 resource .getTsFile (), pipeName ),
528512 false );
529- return Objects .nonNull (deviceIsAlignedMap )
530- ? deviceIsAlignedMap .keySet ()
531- : resource .getDevices ();
513+ if (Objects .nonNull (deviceIsAlignedMap )) {
514+ return deviceIsAlignedMap .keySet ();
515+ }
516+ return resource .getDevices ();
517+ }
518+
519+ public void setTableNames (final Set <String > tableNames ) {
520+ this .tableNames = tableNames ;
532521 }
533522
534523 /////////////////////////// PipeInsertionEvent ///////////////////////////
@@ -539,25 +528,6 @@ public boolean isTableModelEvent() {
539528 if (getSourceDatabaseNameFromDataRegion () != null ) {
540529 return super .isTableModelEvent ();
541530 }
542-
543- try {
544- for (final IDeviceID deviceID : getDeviceSet ()) {
545- if (deviceID instanceof PlainDeviceID
546- || deviceID .getTableName ().startsWith (TREE_MODEL_EVENT_TABLE_NAME_PREFIX )
547- || deviceID .getTableName ().equals (PATH_ROOT )) {
548- markAsTreeModelEvent ();
549- } else {
550- markAsTableModelEvent ();
551- }
552- break ;
553- }
554- } catch (final Exception e ) {
555- throw new PipeException (
556- String .format (
557- "Pipe %s: failed to judge whether TsFile %s is table model or tree model" ,
558- pipeName , resource .getTsFilePath ()),
559- e );
560- }
561531 }
562532
563533 return getRawIsTableModelEvent ();
0 commit comments