Skip to content

Commit 425a746

Browse files
committed
may-complete-n
1 parent 9a9d6fb commit 425a746

File tree

8 files changed

+55
-3
lines changed

8 files changed

+55
-3
lines changed

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
/**
2929
* Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},
30-
* {@link TabletInsertionEvent#processTablet(BiConsumer)}.
30+
* {@link TabletInsertionEvent#processTabletWithTabletCollection(BiConsumer)}.
3131
*/
3232
public interface RowCollector {
3333

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.pipe.api.access.Row;
2323
import org.apache.iotdb.pipe.api.collector.RowCollector;
24+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
2425
import org.apache.iotdb.pipe.api.event.Event;
2526

2627
import org.apache.tsfile.write.record.Tablet;
@@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event {
4546
* contains the results collected by the {@link RowCollector}
4647
*/
4748
Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
49+
50+
/**
51+
* The consumer processes the Tablet directly and collects the results by {@link
52+
* org.apache.iotdb.pipe.api.collector.TabletCollector}.
53+
*
54+
* @return {@code Iterable<TabletInsertionEvent>} a list of new {@link TabletInsertionEvent}
55+
* contains the results collected by the {@link RowCollector}
56+
*/
57+
Iterable<TabletInsertionEvent> processTabletWithCollect(
58+
BiConsumer<Tablet, TabletCollector> consumer);
4859
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
5353
import org.apache.iotdb.pipe.api.access.Row;
5454
import org.apache.iotdb.pipe.api.collector.RowCollector;
55+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
5556
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
5657
import org.apache.iotdb.pipe.api.exception.PipeException;
5758

@@ -425,6 +426,17 @@ public Iterable<TabletInsertionEvent> processTablet(
425426
.collect(Collectors.toList());
426427
}
427428

429+
@Override
430+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
431+
BiConsumer<Tablet, TabletCollector> consumer) {
432+
return initEventParsers().stream()
433+
.map(
434+
tabletInsertionEventParser ->
435+
tabletInsertionEventParser.processTabletWithCollect(consumer))
436+
.flatMap(Collection::stream)
437+
.collect(Collectors.toList());
438+
}
439+
428440
/////////////////////////// convertToTablet ///////////////////////////
429441

430442
public boolean isAligned(final int i) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
4040
import org.apache.iotdb.pipe.api.access.Row;
4141
import org.apache.iotdb.pipe.api.collector.RowCollector;
42+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
4243
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4344

4445
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -415,6 +416,12 @@ public Iterable<TabletInsertionEvent> processTablet(
415416
return initEventParser().processTablet(consumer);
416417
}
417418

419+
@Override
420+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
421+
BiConsumer<Tablet, TabletCollector> consumer) {
422+
return initEventParser().processTabletWithCollect(consumer);
423+
}
424+
418425
/////////////////////////// convertToTablet ///////////////////////////
419426

420427
public boolean isAligned() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import org.apache.tsfile.write.record.Tablet;
2828

29-
import java.io.IOException;
30-
3129
public class PipeTabletCollector extends PipeRawTabletEventConverter implements TabletCollector {
3230

3331
public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
2727
import org.apache.iotdb.pipe.api.access.Row;
2828
import org.apache.iotdb.pipe.api.collector.RowCollector;
29+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
2930
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3031

3132
import org.apache.tsfile.enums.ColumnCategory;
@@ -636,5 +637,8 @@ public abstract List<TabletInsertionEvent> processRowByRow(
636637
public abstract List<TabletInsertionEvent> processTablet(
637638
final BiConsumer<Tablet, RowCollector> consumer);
638639

640+
public abstract List<TabletInsertionEvent> processTabletWithCollect(
641+
final BiConsumer<Tablet, TabletCollector> consumer);
642+
639643
public abstract Tablet convertToTablet();
640644
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
2828
import org.apache.iotdb.pipe.api.access.Row;
2929
import org.apache.iotdb.pipe.api.collector.RowCollector;
30+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
3031
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3132

3233
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -113,6 +114,15 @@ public List<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector>
113114
return Collections.emptyList();
114115
}
115116

117+
@Override
118+
public List<TabletInsertionEvent> processTabletWithCollect(
119+
BiConsumer<Tablet, TabletCollector> consumer) {
120+
if (LOGGER.isDebugEnabled()) {
121+
LOGGER.warn("TablePatternParser does not support tablet processing");
122+
}
123+
return Collections.emptyList();
124+
}
125+
116126
//////////////////////////// convertToTablet ////////////////////////////
117127

118128
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.apache.iotdb.commons.utils.TestOnly;
2626
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
2727
import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
28+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletCollector;
2829
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
2930
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
3031
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
3132
import org.apache.iotdb.pipe.api.access.Row;
3233
import org.apache.iotdb.pipe.api.collector.RowCollector;
34+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
3335
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3436

3537
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -154,6 +156,14 @@ public List<TabletInsertionEvent> processTablet(final BiConsumer<Tablet, RowColl
154156
return rowCollector.convertToTabletInsertionEvents(shouldReport);
155157
}
156158

159+
@Override
160+
public List<TabletInsertionEvent> processTabletWithCollect(
161+
BiConsumer<Tablet, TabletCollector> consumer) {
162+
final PipeTabletCollector tabletCollector = new PipeTabletCollector(pipeTaskMeta, sourceEvent);
163+
consumer.accept(convertToTablet(), tabletCollector);
164+
return tabletCollector.convertToTabletInsertionEvents(shouldReport);
165+
}
166+
157167
//////////////////////////// convertToTablet ////////////////////////////
158168

159169
@Override

0 commit comments

Comments
 (0)