2424import org .apache .iotdb .db .pipe .event .common .PipeInsertionEvent ;
2525import org .apache .iotdb .db .pipe .event .common .tablet .PipeRawTabletInsertionEvent ;
2626import org .apache .iotdb .db .pipe .event .common .tsfile .parser .TsFileInsertionEventParser ;
27+ import org .apache .iotdb .db .pipe .resource .PipeDataNodeResourceManager ;
28+ import org .apache .iotdb .db .pipe .resource .memory .PipeMemoryBlock ;
2729import org .apache .iotdb .pipe .api .event .dml .insertion .TabletInsertionEvent ;
2830import org .apache .iotdb .pipe .api .exception .PipeException ;
2931
30- import org .apache .tsfile .file .metadata .TableSchema ;
3132import org .apache .tsfile .read .TsFileSequenceReader ;
32- import org .apache .tsfile .read .controller .CachedChunkLoaderImpl ;
33- import org .apache .tsfile .read .controller .MetadataQuerierByFileImpl ;
34- import org .apache .tsfile .read .query .executor .TableQueryExecutor ;
3533import org .apache .tsfile .write .record .Tablet ;
3634
3735import java .io .File ;
3836import java .io .IOException ;
3937import java .util .Iterator ;
40- import java .util .Map ;
4138import java .util .NoSuchElementException ;
4239import java .util .Objects ;
4340
4441public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser {
4542
46- private final TableQueryExecutor tableQueryExecutor ;
43+ private final long startTime ;
44+ private final long endTime ;
45+ private final TablePattern tablePattern ;
4746
48- private final Iterator <Map .Entry <String , TableSchema >> filteredTableSchemaIterator ;
47+ private final PipeMemoryBlock allocatedMemoryBlockForBatchData ;
48+ private final PipeMemoryBlock allocatedMemoryBlockForChunk ;
49+ private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta ;
50+ private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas ;
4951
5052 public TsFileInsertionEventTableParser (
5153 final File tsFile ,
@@ -58,16 +60,20 @@ public TsFileInsertionEventTableParser(
5860 super (null , pattern , startTime , endTime , pipeTaskMeta , sourceEvent );
5961
6062 try {
63+ this .allocatedMemoryBlockForChunk =
64+ PipeDataNodeResourceManager .memory ().forceAllocateForTabletWithRetry (0 );
65+ this .allocatedMemoryBlockForBatchData =
66+ PipeDataNodeResourceManager .memory ().forceAllocateForTabletWithRetry (0 );
67+ this .allocatedMemoryBlockForChunkMeta =
68+ PipeDataNodeResourceManager .memory ().forceAllocateForTabletWithRetry (0 );
69+ this .allocatedMemoryBlockForTableSchemas =
70+ PipeDataNodeResourceManager .memory ().forceAllocateForTabletWithRetry (0 );
71+
72+ this .startTime = startTime ;
73+ this .endTime = endTime ;
74+ this .tablePattern = pattern ;
75+
6176 tsFileSequenceReader = new TsFileSequenceReader (tsFile .getPath (), true , true );
62- filteredTableSchemaIterator =
63- tsFileSequenceReader .getTableSchemaMap ().entrySet ().stream ()
64- .filter (entry -> Objects .isNull (pattern ) || pattern .matchesTable (entry .getKey ()))
65- .iterator ();
66- tableQueryExecutor =
67- new TableQueryExecutor (
68- new MetadataQuerierByFileImpl (tsFileSequenceReader ),
69- new CachedChunkLoaderImpl (tsFileSequenceReader ),
70- TableQueryExecutor .TableQueryOrdering .DEVICE );
7177 } catch (final Exception e ) {
7278 close ();
7379 throw e ;
@@ -79,29 +85,35 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
7985 return () ->
8086 new Iterator <TabletInsertionEvent >() {
8187
82- private TsFileInsertionEventTableParserTabletIterator tabletIterator = null ;
88+ private TsFileInsertionEventTableParserTabletIterator tabletIterator ;
8389
8490 @ Override
8591 public boolean hasNext () {
86- while (tabletIterator == null || !tabletIterator .hasNext ()) {
87- if (!filteredTableSchemaIterator .hasNext ()) {
88- close ();
89- return false ;
90- }
91-
92- final Map .Entry <String , TableSchema > entry = filteredTableSchemaIterator .next ();
93-
94- try {
92+ try {
93+ if (tabletIterator == null ) {
9594 tabletIterator =
9695 new TsFileInsertionEventTableParserTabletIterator (
97- tableQueryExecutor , entry .getKey (), entry .getValue (), startTime , endTime );
98- } catch (final Exception e ) {
96+ tsFileSequenceReader ,
97+ entry ->
98+ Objects .isNull (tablePattern )
99+ || tablePattern .matchesTable (entry .getKey ()),
100+ allocatedMemoryBlockForTablet ,
101+ allocatedMemoryBlockForBatchData ,
102+ allocatedMemoryBlockForChunk ,
103+ allocatedMemoryBlockForChunkMeta ,
104+ allocatedMemoryBlockForTableSchemas ,
105+ startTime ,
106+ endTime );
107+ }
108+ if (!tabletIterator .hasNext ()) {
99109 close ();
100- throw new PipeException ( "failed to create TsFileInsertionDataTabletIterator" , e ) ;
110+ return false ;
101111 }
112+ return true ;
113+ } catch (Exception e ) {
114+ close ();
115+ throw new PipeException ("Error while parsing tsfile insertion event" , e );
102116 }
103-
104- return true ;
105117 }
106118
107119 @ Override
@@ -174,4 +186,25 @@ public TabletInsertionEvent next() {
174186 }
175187 };
176188 }
189+
190+ @ Override
191+ public void close () {
192+ super .close ();
193+
194+ if (allocatedMemoryBlockForBatchData != null ) {
195+ allocatedMemoryBlockForBatchData .close ();
196+ }
197+
198+ if (allocatedMemoryBlockForChunk != null ) {
199+ allocatedMemoryBlockForChunk .close ();
200+ }
201+
202+ if (allocatedMemoryBlockForChunkMeta != null ) {
203+ allocatedMemoryBlockForChunkMeta .close ();
204+ }
205+
206+ if (allocatedMemoryBlockForTableSchemas != null ) {
207+ allocatedMemoryBlockForTableSchemas .close ();
208+ }
209+ }
177210}
0 commit comments