Skip to content

Commit af1eb36

Browse files
committed
patch
1 parent 2c8649f commit af1eb36

File tree

5 files changed

+13
-31
lines changed

5 files changed

+13
-31
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public Iterable<TabletInsertionEvent> processTablet(
313313
@Override
314314
public Iterable<TabletInsertionEvent> processTabletWithCollect(
315315
BiConsumer<Tablet, TabletCollector> consumer) {
316-
return initEventParsers().stream()
316+
return initDataContainers().stream()
317317
.map(
318318
tabletInsertionEventParser ->
319319
tabletInsertionEventParser.processTabletWithCollect(consumer))

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24-
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
2524
import org.apache.iotdb.pipe.api.collector.DataCollector;
2625
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2726

@@ -33,31 +32,10 @@ public abstract class PipeRawTabletEventConverter implements DataCollector {
3332
protected boolean isAligned = false;
3433
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
3534
protected final EnrichedEvent sourceEvent; // Used to report progress
36-
protected final String sourceEventDataBaseName;
37-
protected final Boolean isTableModel;
3835

3936
public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
4037
this.pipeTaskMeta = pipeTaskMeta;
4138
this.sourceEvent = sourceEvent;
42-
if (sourceEvent instanceof PipeInsertionEvent) {
43-
sourceEventDataBaseName =
44-
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
45-
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
46-
} else {
47-
sourceEventDataBaseName = null;
48-
isTableModel = null;
49-
}
50-
}
51-
52-
public PipeRawTabletEventConverter(
53-
PipeTaskMeta pipeTaskMeta,
54-
EnrichedEvent sourceEvent,
55-
String sourceEventDataBase,
56-
Boolean isTableModel) {
57-
this.pipeTaskMeta = pipeTaskMeta;
58-
this.sourceEvent = sourceEvent;
59-
this.sourceEventDataBaseName = sourceEventDataBase;
60-
this.isTableModel = isTableModel;
6139
}
6240

6341
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,11 @@ public Iterable<TabletInsertionEvent> processTablet(
295295
@Override
296296
public Iterable<TabletInsertionEvent> processTabletWithCollect(
297297
BiConsumer<Tablet, TabletCollector> consumer) {
298-
return initEventParser().processTabletWithCollect(consumer);
298+
if (dataContainer == null) {
299+
dataContainer =
300+
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern);
301+
}
302+
return dataContainer.processTabletWithCollect(consumer);
299303
}
300304

301305
/////////////////////////// convertToTablet ///////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,13 @@ public List<TabletInsertionEvent> processTablet(final BiConsumer<Tablet, RowColl
690690
return rowCollector.convertToTabletInsertionEvents(shouldReport);
691691
}
692692

693+
public List<TabletInsertionEvent> processTabletWithCollect(
694+
BiConsumer<Tablet, TabletCollector> consumer) {
695+
final PipeTabletCollector tabletCollector = new PipeTabletCollector(pipeTaskMeta, sourceEvent);
696+
consumer.accept(convertToTablet(), tabletCollector);
697+
return tabletCollector.convertToTabletInsertionEvents(shouldReport);
698+
}
699+
693700
//////////////////////////// convertToTablet ////////////////////////////
694701

695702
public Tablet convertToTablet() {
@@ -707,7 +714,4 @@ public Tablet convertToTablet() {
707714

708715
return tablet;
709716
}
710-
711-
public abstract List<TabletInsertionEvent> processTabletWithCollect(
712-
final BiConsumer<Tablet, TabletCollector> consumer);
713717
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@
2222
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2323
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2424
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
25-
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2625
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
2726
import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
2827
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
29-
import org.apache.iotdb.commons.exception.IllegalPathException;
30-
import org.apache.iotdb.commons.path.PartialPath;
31-
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3228
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
3329
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3430
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;

0 commit comments

Comments
 (0)