Skip to content

Commit 2bdbd41

Browse files
authored
Pipe: Implemented the processTabletWithCollect api with tablet collector (#16930)
1 parent 16965de commit 2bdbd41

File tree

12 files changed

+297
-44
lines changed

12 files changed

+297
-44
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.collector;
21+
22+
import org.apache.iotdb.pipe.api.access.Row;
23+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
24+
25+
import java.util.List;
26+
27+
/** Transform data to {@link TabletInsertionEvent}. */
28+
public interface DataCollector {
29+
30+
/**
31+
* Transform data to {@link TabletInsertionEvent}.
32+
*
33+
* @param shouldReport Whether to report progress for generated events
34+
* @see Row
35+
*/
36+
List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport);
37+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.collector;
21+
22+
import org.apache.iotdb.pipe.api.access.Row;
23+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
24+
25+
import org.apache.tsfile.write.record.Tablet;
26+
27+
import java.io.IOException;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* Used to collect rows generated by {@link
32+
* TabletInsertionEvent#processTabletWithCollect(BiConsumer)}.
33+
*/
34+
public interface TabletCollector {
35+
36+
/**
37+
* Collects a tablet.
38+
*
39+
* @param tablet Tablet to be collected
40+
* @throws IOException if any I/O errors occur
41+
* @see Row
42+
*/
43+
void collectTablet(Tablet tablet) throws IOException;
44+
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.pipe.api.access.Row;
2323
import org.apache.iotdb.pipe.api.collector.RowCollector;
24+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
2425
import org.apache.iotdb.pipe.api.event.Event;
2526

2627
import org.apache.tsfile.write.record.Tablet;
@@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event {
4546
* contains the results collected by the {@link RowCollector}
4647
*/
4748
Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
49+
50+
/**
51+
* The consumer processes the Tablet directly and collects the results by {@link
52+
* org.apache.iotdb.pipe.api.collector.TabletCollector}.
53+
*
54+
* @return {@code Iterable<TabletInsertionEvent>} a list of new {@link TabletInsertionEvent}
55+
* contains the results collected by the {@link TabletCollector}
56+
*/
57+
Iterable<TabletInsertionEvent> processTabletWithCollect(
58+
BiConsumer<Tablet, TabletCollector> consumer);
4859
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2424
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
2526
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2627
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2728
import org.apache.iotdb.pipe.api.access.Row;
@@ -38,38 +39,19 @@
3839
import java.util.Arrays;
3940
import java.util.List;
4041

41-
public class PipeRowCollector implements RowCollector {
42-
43-
private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
42+
public class PipeRowCollector extends PipeRawTabletEventConverter implements RowCollector {
4443
private Tablet tablet = null;
45-
private boolean isAligned = false;
46-
private final PipeTaskMeta pipeTaskMeta; // Used to report progress
47-
private final EnrichedEvent sourceEvent; // Used to report progress
48-
private final String sourceEventDataBaseName;
49-
private final Boolean isTableModel;
5044

5145
public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
52-
this.pipeTaskMeta = pipeTaskMeta;
53-
this.sourceEvent = sourceEvent;
54-
if (sourceEvent instanceof PipeInsertionEvent) {
55-
sourceEventDataBaseName =
56-
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
57-
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
58-
} else {
59-
sourceEventDataBaseName = null;
60-
isTableModel = null;
61-
}
46+
super(pipeTaskMeta, sourceEvent);
6247
}
6348

6449
public PipeRowCollector(
6550
PipeTaskMeta pipeTaskMeta,
6651
EnrichedEvent sourceEvent,
6752
String sourceEventDataBase,
6853
Boolean isTableModel) {
69-
this.pipeTaskMeta = pipeTaskMeta;
70-
this.sourceEvent = sourceEvent;
71-
this.sourceEventDataBaseName = sourceEventDataBase;
72-
this.isTableModel = isTableModel;
54+
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
7355
}
7456

7557
@Override
@@ -142,14 +124,9 @@ private void collectTabletInsertionEvent() {
142124
this.tablet = null;
143125
}
144126

127+
@Override
145128
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
146129
collectTabletInsertionEvent();
147-
148-
final int eventListSize = tabletInsertionEventList.size();
149-
if (eventListSize > 0 && shouldReport) { // The last event should report progress
150-
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
151-
.markAsNeedToReport();
152-
}
153-
return tabletInsertionEventList;
130+
return super.convertToTabletInsertionEvents(shouldReport);
154131
}
155132
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
5353
import org.apache.iotdb.pipe.api.access.Row;
5454
import org.apache.iotdb.pipe.api.collector.RowCollector;
55+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
5556
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
5657
import org.apache.iotdb.pipe.api.exception.PipeException;
5758

@@ -425,6 +426,17 @@ public Iterable<TabletInsertionEvent> processTablet(
425426
.collect(Collectors.toList());
426427
}
427428

429+
@Override
430+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
431+
BiConsumer<Tablet, TabletCollector> consumer) {
432+
return initEventParsers().stream()
433+
.map(
434+
tabletInsertionEventParser ->
435+
tabletInsertionEventParser.processTabletWithCollect(consumer))
436+
.flatMap(Collection::stream)
437+
.collect(Collectors.toList());
438+
}
439+
428440
/////////////////////////// convertToTablet ///////////////////////////
429441

430442
public boolean isAligned(final int i) {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.pipe.api.collector.DataCollector;
26+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
public abstract class PipeRawTabletEventConverter implements DataCollector {
32+
protected final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
33+
protected boolean isAligned = false;
34+
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
35+
protected final EnrichedEvent sourceEvent; // Used to report progress
36+
protected final String sourceEventDataBaseName;
37+
protected final Boolean isTableModel;
38+
39+
public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
40+
this.pipeTaskMeta = pipeTaskMeta;
41+
this.sourceEvent = sourceEvent;
42+
if (sourceEvent instanceof PipeInsertionEvent) {
43+
sourceEventDataBaseName =
44+
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
45+
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
46+
} else {
47+
sourceEventDataBaseName = null;
48+
isTableModel = null;
49+
}
50+
}
51+
52+
public PipeRawTabletEventConverter(
53+
PipeTaskMeta pipeTaskMeta,
54+
EnrichedEvent sourceEvent,
55+
String sourceEventDataBase,
56+
Boolean isTableModel) {
57+
this.pipeTaskMeta = pipeTaskMeta;
58+
this.sourceEvent = sourceEvent;
59+
this.sourceEventDataBaseName = sourceEventDataBase;
60+
this.isTableModel = isTableModel;
61+
}
62+
63+
@Override
64+
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
65+
final int eventListSize = tabletInsertionEventList.size();
66+
if (eventListSize > 0 && shouldReport) { // The last event should report progress
67+
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
68+
.markAsNeedToReport();
69+
}
70+
return tabletInsertionEventList;
71+
}
72+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
4040
import org.apache.iotdb.pipe.api.access.Row;
4141
import org.apache.iotdb.pipe.api.collector.RowCollector;
42+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
4243
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4344

4445
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -415,6 +416,12 @@ public Iterable<TabletInsertionEvent> processTablet(
415416
return initEventParser().processTablet(consumer);
416417
}
417418

419+
@Override
420+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
421+
BiConsumer<Tablet, TabletCollector> consumer) {
422+
return initEventParser().processTabletWithCollect(consumer);
423+
}
424+
418425
/////////////////////////// convertToTablet ///////////////////////////
419426

420427
public boolean isAligned() {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
26+
27+
import org.apache.tsfile.write.record.Tablet;
28+
29+
public class PipeTabletCollector extends PipeRawTabletEventConverter implements TabletCollector {
30+
31+
public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
32+
super(pipeTaskMeta, sourceEvent);
33+
}
34+
35+
public PipeTabletCollector(
36+
PipeTaskMeta pipeTaskMeta,
37+
EnrichedEvent sourceEvent,
38+
String sourceEventDataBase,
39+
Boolean isTableModel) {
40+
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
41+
}
42+
43+
@Override
44+
public void collectTablet(final Tablet tablet) {
45+
final PipeInsertionEvent pipeInsertionEvent =
46+
sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null;
47+
tabletInsertionEventList.add(
48+
new PipeRawTabletInsertionEvent(
49+
isTableModel,
50+
sourceEventDataBaseName,
51+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(),
52+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(),
53+
tablet,
54+
isAligned,
55+
sourceEvent == null ? null : sourceEvent.getPipeName(),
56+
sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
57+
pipeTaskMeta,
58+
sourceEvent,
59+
false));
60+
}
61+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
2727
import org.apache.iotdb.pipe.api.access.Row;
2828
import org.apache.iotdb.pipe.api.collector.RowCollector;
29+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
2930
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3031

3132
import org.apache.tsfile.enums.ColumnCategory;
@@ -636,5 +637,8 @@ public abstract List<TabletInsertionEvent> processRowByRow(
636637
public abstract List<TabletInsertionEvent> processTablet(
637638
final BiConsumer<Tablet, RowCollector> consumer);
638639

640+
public abstract List<TabletInsertionEvent> processTabletWithCollect(
641+
final BiConsumer<Tablet, TabletCollector> consumer);
642+
639643
public abstract Tablet convertToTablet();
640644
}

0 commit comments

Comments
 (0)