Skip to content

Commit 9f8b4cc

Browse files
authored
[To dev/1.3] Pipe: Implemented the processTabletWithCollect api with tablet collector (#16941)
* Pipe: Implemented the processTabletWithCollect api with tablet collector (#16930) * patch * fix * fix * fix
1 parent 4f979dc commit 9f8b4cc

File tree

10 files changed

+245
-28
lines changed

10 files changed

+245
-28
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.collector;
21+
22+
import org.apache.iotdb.pipe.api.access.Row;
23+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
24+
25+
import java.util.List;
26+
27+
/** Transform data to {@link TabletInsertionEvent}. */
28+
public interface DataCollector {
29+
30+
/**
31+
* Transform data to {@link TabletInsertionEvent}.
32+
*
33+
* @param shouldReport Whether to report progress for generated events
34+
* @see Row
35+
*/
36+
List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport);
37+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.collector;
21+
22+
import org.apache.iotdb.pipe.api.access.Row;
23+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
24+
25+
import org.apache.tsfile.write.record.Tablet;
26+
27+
import java.io.IOException;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* Used to collect rows generated by {@link
32+
* TabletInsertionEvent#processTabletWithCollect(BiConsumer)}.
33+
*/
34+
public interface TabletCollector {
35+
36+
/**
37+
* Collects a tablet.
38+
*
39+
* @param tablet Tablet to be collected
40+
* @throws IOException if any I/O errors occur
41+
* @see Row
42+
*/
43+
void collectTablet(Tablet tablet) throws IOException;
44+
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.pipe.api.access.Row;
2323
import org.apache.iotdb.pipe.api.collector.RowCollector;
24+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
2425
import org.apache.iotdb.pipe.api.event.Event;
2526

2627
import org.apache.tsfile.write.record.Tablet;
@@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event {
4546
* contains the results collected by the {@link RowCollector}
4647
*/
4748
Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
49+
50+
/**
51+
* The consumer processes the Tablet directly and collects the results by {@link
52+
* org.apache.iotdb.pipe.api.collector.TabletCollector}.
53+
*
54+
* @return {@code Iterable<TabletInsertionEvent>} a list of new {@link TabletInsertionEvent}
55+
* contains the results collected by the {@link TabletCollector}
56+
*/
57+
Iterable<TabletInsertionEvent> processTabletWithCollect(
58+
BiConsumer<Tablet, TabletCollector> consumer);
4859
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
2425
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2526
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2627
import org.apache.iotdb.pipe.api.access.Row;
@@ -36,17 +37,11 @@
3637
import java.util.Arrays;
3738
import java.util.List;
3839

39-
public class PipeRowCollector implements RowCollector {
40-
41-
private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
40+
public class PipeRowCollector extends PipeRawTabletEventConverter implements RowCollector {
4241
private Tablet tablet = null;
43-
private boolean isAligned = false;
44-
private final PipeTaskMeta pipeTaskMeta; // Used to report progress
45-
private final EnrichedEvent sourceEvent; // Used to report progress
4642

4743
public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
48-
this.pipeTaskMeta = pipeTaskMeta;
49-
this.sourceEvent = sourceEvent;
44+
super(pipeTaskMeta, sourceEvent);
5045
}
5146

5247
@Override
@@ -113,14 +108,9 @@ private void collectTabletInsertionEvent() {
113108
this.tablet = null;
114109
}
115110

111+
@Override
116112
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
117113
collectTabletInsertionEvent();
118-
119-
final int eventListSize = tabletInsertionEventList.size();
120-
if (eventListSize > 0 && shouldReport) { // The last event should report progress
121-
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
122-
.markAsNeedToReport();
123-
}
124-
return tabletInsertionEventList;
114+
return super.convertToTabletInsertionEvents(shouldReport);
125115
}
126116
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
4141
import org.apache.iotdb.pipe.api.access.Row;
4242
import org.apache.iotdb.pipe.api.collector.RowCollector;
43+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
4344
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4445
import org.apache.iotdb.pipe.api.exception.PipeException;
4546

@@ -309,6 +310,17 @@ public Iterable<TabletInsertionEvent> processTablet(
309310
.collect(Collectors.toList());
310311
}
311312

313+
@Override
314+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
315+
BiConsumer<Tablet, TabletCollector> consumer) {
316+
return initDataContainers().stream()
317+
.map(
318+
tabletInsertionEventParser ->
319+
tabletInsertionEventParser.processTabletWithCollect(consumer))
320+
.flatMap(Collection::stream)
321+
.collect(Collectors.toList());
322+
}
323+
312324
/////////////////////////// convertToTablet ///////////////////////////
313325

314326
public boolean isAligned(final int i) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.pipe.api.collector.DataCollector;
25+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
public abstract class PipeRawTabletEventConverter implements DataCollector {
31+
protected final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
32+
protected boolean isAligned = false;
33+
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
34+
protected final EnrichedEvent sourceEvent; // Used to report progress
35+
36+
public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
37+
this.pipeTaskMeta = pipeTaskMeta;
38+
this.sourceEvent = sourceEvent;
39+
}
40+
41+
@Override
42+
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
43+
final int eventListSize = tabletInsertionEventList.size();
44+
if (eventListSize > 0 && shouldReport) { // The last event should report progress
45+
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
46+
.markAsNeedToReport();
47+
}
48+
return tabletInsertionEventList;
49+
}
50+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
3535
import org.apache.iotdb.pipe.api.access.Row;
3636
import org.apache.iotdb.pipe.api.collector.RowCollector;
37+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
3738
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3839

3940
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -291,6 +292,16 @@ public Iterable<TabletInsertionEvent> processTablet(
291292
return dataContainer.processTablet(consumer);
292293
}
293294

295+
@Override
296+
public Iterable<TabletInsertionEvent> processTabletWithCollect(
297+
BiConsumer<Tablet, TabletCollector> consumer) {
298+
if (dataContainer == null) {
299+
dataContainer =
300+
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern);
301+
}
302+
return dataContainer.processTabletWithCollect(consumer);
303+
}
304+
294305
/////////////////////////// convertToTablet ///////////////////////////
295306

296307
public boolean isAligned() {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
25+
26+
import org.apache.tsfile.write.record.Tablet;
27+
28+
public class PipeTabletCollector extends PipeRawTabletEventConverter implements TabletCollector {
29+
30+
public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
31+
super(pipeTaskMeta, sourceEvent);
32+
}
33+
34+
@Override
35+
public void collectTablet(final Tablet tablet) {
36+
tabletInsertionEventList.add(
37+
new PipeRawTabletInsertionEvent(
38+
tablet,
39+
isAligned,
40+
sourceEvent == null ? null : sourceEvent.getPipeName(),
41+
sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
42+
pipeTaskMeta,
43+
sourceEvent,
44+
false));
45+
}
46+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
3131
import org.apache.iotdb.pipe.api.access.Row;
3232
import org.apache.iotdb.pipe.api.collector.RowCollector;
33+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
3334
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3435

3536
import org.apache.tsfile.enums.TSDataType;
@@ -689,6 +690,13 @@ public List<TabletInsertionEvent> processTablet(final BiConsumer<Tablet, RowColl
689690
return rowCollector.convertToTabletInsertionEvents(shouldReport);
690691
}
691692

693+
public List<TabletInsertionEvent> processTabletWithCollect(
694+
BiConsumer<Tablet, TabletCollector> consumer) {
695+
final PipeTabletCollector tabletCollector = new PipeTabletCollector(pipeTaskMeta, sourceEvent);
696+
consumer.accept(convertToTablet(), tabletCollector);
697+
return tabletCollector.convertToTabletInsertionEvents(shouldReport);
698+
}
699+
692700
//////////////////////////// convertToTablet ////////////////////////////
693701

694702
public Tablet convertToTablet() {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2323
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2424
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
25-
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2625
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
2726
import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
2827
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -593,18 +592,15 @@ private void testTsFilePointNum(
593592
})
594593
.forEach(
595594
tabletInsertionEvent2 ->
596-
tabletInsertionEvent2.processTablet(
597-
(tablet, rowCollector) ->
598-
new PipeRawTabletInsertionEvent(tablet, false)
599-
.processRowByRow(
600-
(row, collector) -> {
601-
try {
602-
rowCollector.collectRow(row);
603-
count3.addAndGet(getNonNullSize(row));
604-
} catch (final IOException e) {
605-
throw new RuntimeException(e);
606-
}
607-
})))));
595+
tabletInsertionEvent2.processTabletWithCollect(
596+
(tablet, collector) -> {
597+
try {
598+
collector.collectTablet(tablet);
599+
count3.addAndGet(getNonNullSize(tablet));
600+
} catch (final IOException e) {
601+
throw new RuntimeException(e);
602+
}
603+
}))));
608604

609605
Assert.assertEquals(expectedCount, count1.get());
610606
Assert.assertEquals(expectedCount, count2.get());
@@ -624,4 +620,16 @@ private int getNonNullSize(final Row row) {
624620
}
625621
return count;
626622
}
623+
624+
private int getNonNullSize(final Tablet tablet) {
625+
int count = 0;
626+
for (int i = 0; i < tablet.rowSize; ++i) {
627+
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
628+
if (tablet.bitMaps == null || tablet.bitMaps[j] == null || !tablet.bitMaps[j].isMarked(i)) {
629+
++count;
630+
}
631+
}
632+
}
633+
return count;
634+
}
627635
}

0 commit comments

Comments
 (0)