Skip to content

Commit 9a9d6fb

Browse files
committed
may-comp
1 parent 3ae0390 commit 9a9d6fb

File tree

6 files changed

+122
-36
lines changed

6 files changed

+122
-36
lines changed

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/DataCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424
import java.util.List;
2525

2626
public interface DataCollector {
27-
List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport);
27+
List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport);
2828
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222
import org.apache.iotdb.pipe.api.access.Row;
2323
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2424

25-
import javax.xml.crypto.Data;
2625
import java.io.IOException;
2726
import java.util.function.BiConsumer;
2827

2928
/**
3029
* Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},
3130
* {@link TabletInsertionEvent#processTablet(BiConsumer)}.
3231
*/
33-
public interface RowCollector extends DataCollector {
32+
public interface RowCollector {
3433

3534
/**
3635
* Collects a row.

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/TabletCollector.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919

2020
package org.apache.iotdb.pipe.api.collector;
2121

22-
import org.apache.iotdb.pipe.api.access.Row;
2322
import org.apache.tsfile.write.record.Tablet;
2423

2524
import java.io.IOException;
2625

27-
public interface TabletCollector extends DataCollector {
28-
void collectTablet(Tablet tablet) throws IOException;
26+
public interface TabletCollector {
27+
void collectTablet(Tablet tablet) throws IOException;
2928
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2424
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
2526
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2627
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2728
import org.apache.iotdb.pipe.api.access.Row;
@@ -38,38 +39,19 @@
3839
import java.util.Arrays;
3940
import java.util.List;
4041

41-
public class PipeRowCollector implements RowCollector {
42-
43-
private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
42+
public class PipeRowCollector extends PipeRawTabletEventConverter implements RowCollector {
4443
private Tablet tablet = null;
45-
private boolean isAligned = false;
46-
private final PipeTaskMeta pipeTaskMeta; // Used to report progress
47-
private final EnrichedEvent sourceEvent; // Used to report progress
48-
private final String sourceEventDataBaseName;
49-
private final Boolean isTableModel;
5044

5145
public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
52-
this.pipeTaskMeta = pipeTaskMeta;
53-
this.sourceEvent = sourceEvent;
54-
if (sourceEvent instanceof PipeInsertionEvent) {
55-
sourceEventDataBaseName =
56-
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
57-
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
58-
} else {
59-
sourceEventDataBaseName = null;
60-
isTableModel = null;
61-
}
46+
super(pipeTaskMeta, sourceEvent);
6247
}
6348

6449
public PipeRowCollector(
6550
PipeTaskMeta pipeTaskMeta,
6651
EnrichedEvent sourceEvent,
6752
String sourceEventDataBase,
6853
Boolean isTableModel) {
69-
this.pipeTaskMeta = pipeTaskMeta;
70-
this.sourceEvent = sourceEvent;
71-
this.sourceEventDataBaseName = sourceEventDataBase;
72-
this.isTableModel = isTableModel;
54+
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
7355
}
7456

7557
@Override
@@ -145,12 +127,6 @@ private void collectTabletInsertionEvent() {
145127
@Override
146128
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
147129
collectTabletInsertionEvent();
148-
149-
final int eventListSize = tabletInsertionEventList.size();
150-
if (eventListSize > 0 && shouldReport) { // The last event should report progress
151-
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
152-
.markAsNeedToReport();
153-
}
154-
return tabletInsertionEventList;
130+
return super.convertToTabletInsertionEvents(shouldReport);
155131
}
156132
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.pipe.api.collector.DataCollector;
26+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
public abstract class PipeRawTabletEventConverter implements DataCollector {
32+
protected final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>();
33+
protected boolean isAligned = false;
34+
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
35+
protected final EnrichedEvent sourceEvent; // Used to report progress
36+
protected final String sourceEventDataBaseName;
37+
protected final Boolean isTableModel;
38+
39+
public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
40+
this.pipeTaskMeta = pipeTaskMeta;
41+
this.sourceEvent = sourceEvent;
42+
if (sourceEvent instanceof PipeInsertionEvent) {
43+
sourceEventDataBaseName =
44+
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
45+
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
46+
} else {
47+
sourceEventDataBaseName = null;
48+
isTableModel = null;
49+
}
50+
}
51+
52+
public PipeRawTabletEventConverter(
53+
PipeTaskMeta pipeTaskMeta,
54+
EnrichedEvent sourceEvent,
55+
String sourceEventDataBase,
56+
Boolean isTableModel) {
57+
this.pipeTaskMeta = pipeTaskMeta;
58+
this.sourceEvent = sourceEvent;
59+
this.sourceEventDataBaseName = sourceEventDataBase;
60+
this.isTableModel = isTableModel;
61+
}
62+
63+
@Override
64+
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) {
65+
final int eventListSize = tabletInsertionEventList.size();
66+
if (eventListSize > 0 && shouldReport) { // The last event should report progress
67+
((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1))
68+
.markAsNeedToReport();
69+
}
70+
return tabletInsertionEventList;
71+
}
72+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,45 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.tablet;
2121

22-
public class PipeTabletCollector {
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
23+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
25+
import org.apache.iotdb.pipe.api.collector.TabletCollector;
26+
27+
import org.apache.tsfile.write.record.Tablet;
28+
29+
import java.io.IOException;
30+
31+
public class PipeTabletCollector extends PipeRawTabletEventConverter implements TabletCollector {
32+
33+
public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
34+
super(pipeTaskMeta, sourceEvent);
35+
}
36+
37+
public PipeTabletCollector(
38+
PipeTaskMeta pipeTaskMeta,
39+
EnrichedEvent sourceEvent,
40+
String sourceEventDataBase,
41+
Boolean isTableModel) {
42+
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
43+
}
44+
45+
@Override
46+
public void collectTablet(final Tablet tablet) {
47+
final PipeInsertionEvent pipeInsertionEvent =
48+
sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null;
49+
tabletInsertionEventList.add(
50+
new PipeRawTabletInsertionEvent(
51+
isTableModel,
52+
sourceEventDataBaseName,
53+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(),
54+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(),
55+
tablet,
56+
isAligned,
57+
sourceEvent == null ? null : sourceEvent.getPipeName(),
58+
sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
59+
pipeTaskMeta,
60+
sourceEvent,
61+
false));
62+
}
2363
}

0 commit comments

Comments
 (0)