Skip to content

Commit 85f85ce

Browse files
Pipe: Optimize the data logic of distinguishing table model and tree model (apache#14803)
Co-authored-by: Steve Yurong Su <[email protected]>
1 parent adba099 commit 85f85ce

File tree

16 files changed

+322
-95
lines changed

16 files changed

+322
-95
lines changed

example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public void process(final Event event, final EventCollector eventCollector) thro
7676
tablet.addTimestamp(0, System.currentTimeMillis());
7777
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
7878
eventCollector.collect(
79-
new PipeRawTabletInsertionEvent(null, null, tablet, false, null, 0, null, null, false));
79+
new PipeRawTabletInsertionEvent(
80+
false, null, null, null, tablet, false, null, 0, null, null, false));
8081
}
8182
}
8283

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,38 @@
2424
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
2525
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2626
import org.apache.iotdb.commons.utils.PathUtils;
27+
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2728

2829
import javax.validation.constraints.NotNull;
2930

31+
/**
32+
* The data model used to record the Event and the data model of the DataRegion corresponding to the
33+
* source data, so this type requires some specifications .
34+
*
35+
* <p>1. {@code sourceDatabaseNameFromDataRegion} is immutable, coming from the source data or the
36+
* DataBaseName corresponding to the Processor that generates this Event.
37+
*
38+
* <p>2. {@code isTableModelEvent} is mutable, because it may be necessary to support the conversion
39+
* of the table model to the tree model or the tree model to the table model, leaving it to the user
40+
* to decide what model the data is. If it is not defined, the default is the data model
41+
* corresponding to {@code sourceDatabaseNameFromDataRegion}.
42+
*
43+
* <p>3. {@code treeModelDatabaseName} and {@code tableModelDatabaseName} are mutable, and the user
44+
* can change the name of the world, but it must correspond to the {@code isTableModelEvent} field.
45+
* The default is determined by {@code sourceDatabaseNameFromDataRegion}.
46+
*
47+
* <p>4. The corresponding {@link PipeTsFileInsertionEvent} cannot convert the data model at will,
48+
* and TSFile does not support this.
49+
*/
3050
public abstract class PipeInsertionEvent extends EnrichedEvent {
3151

32-
private Boolean isTableModelEvent; // lazy initialization
52+
// Record the database name of the DataRegion corresponding to the SourceEvent
53+
private final String sourceDatabaseNameFromDataRegion;
54+
55+
protected Boolean isTableModelEvent; // lazy initialization
3356

34-
private String treeModelDatabaseName;
35-
private String tableModelDatabaseName; // lazy initialization
57+
protected String treeModelDatabaseName; // lazy initialization
58+
protected String tableModelDatabaseName; // lazy initialization
3659

3760
protected PipeInsertionEvent(
3861
final String pipeName,
@@ -43,10 +66,12 @@ protected PipeInsertionEvent(
4366
final long startTime,
4467
final long endTime,
4568
final Boolean isTableModelEvent,
46-
final String treeModelDatabaseName,
47-
final String tableModelDatabaseName) {
69+
final String databaseNameFromDataRegion,
70+
final String tableModelDatabaseName,
71+
final String treeModelDatabaseName) {
4872
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime);
4973
this.isTableModelEvent = isTableModelEvent;
74+
this.sourceDatabaseNameFromDataRegion = databaseNameFromDataRegion;
5075
this.treeModelDatabaseName = treeModelDatabaseName;
5176
if (tableModelDatabaseName != null) {
5277
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
@@ -73,6 +98,7 @@ protected PipeInsertionEvent(
7398
endTime,
7499
isTableModelEvent,
75100
databaseNameFromDataRegion,
101+
null,
76102
null);
77103
}
78104

@@ -86,30 +112,46 @@ public void markAsTreeModelEvent() {
86112

87113
public boolean isTableModelEvent() {
88114
if (isTableModelEvent == null) {
89-
throw new IllegalStateException("isTableModelEvent is not initialized");
115+
if (sourceDatabaseNameFromDataRegion == null) {
116+
throw new IllegalStateException("databaseNameFromDataRegion is null");
117+
}
118+
return isTableModelEvent = PathUtils.isTableModelDatabase(sourceDatabaseNameFromDataRegion);
90119
}
91120
return isTableModelEvent;
92121
}
93122

94-
/** Only for internal use. */
95-
protected Boolean getRawIsTableModelEvent() {
123+
public Boolean getRawIsTableModelEvent() {
96124
return isTableModelEvent;
97125
}
98126

99-
public String getTreeModelDatabaseName() {
127+
public String getSourceDatabaseNameFromDataRegion() {
128+
return sourceDatabaseNameFromDataRegion;
129+
}
130+
131+
public String getRawTableModelDataBase() {
132+
return tableModelDatabaseName;
133+
}
134+
135+
public String getRawTreeModelDataBase() {
100136
return treeModelDatabaseName;
101137
}
102138

139+
public String getTreeModelDatabaseName() {
140+
return treeModelDatabaseName == null
141+
? treeModelDatabaseName = PathUtils.qualifyDatabaseName(sourceDatabaseNameFromDataRegion)
142+
: treeModelDatabaseName;
143+
}
144+
103145
public String getTableModelDatabaseName() {
104146
return tableModelDatabaseName == null
105-
? tableModelDatabaseName = PathUtils.unQualifyDatabaseName(treeModelDatabaseName)
147+
? tableModelDatabaseName = PathUtils.unQualifyDatabaseName(sourceDatabaseNameFromDataRegion)
106148
: tableModelDatabaseName;
107149
}
108150

109151
public void renameTableModelDatabase(@NotNull final String tableModelDatabaseName) {
110152
// Please note that if you parse TsFile, you need to use TreeModelDatabaseName, so you need to
111153
// rename TreeModelDatabaseName as well.
112154
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
113-
this.treeModelDatabaseName = "root." + tableModelDatabaseName;
155+
this.treeModelDatabaseName = PathUtils.qualifyDatabaseName(tableModelDatabaseName);
114156
}
115157
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,31 @@ public class PipeRowCollector implements RowCollector {
4545
private boolean isAligned = false;
4646
private final PipeTaskMeta pipeTaskMeta; // Used to report progress
4747
private final EnrichedEvent sourceEvent; // Used to report progress
48+
private final String sourceEventDataBaseName;
49+
private final Boolean isTableModel;
4850

4951
public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) {
5052
this.pipeTaskMeta = pipeTaskMeta;
5153
this.sourceEvent = sourceEvent;
54+
if (sourceEvent instanceof PipeInsertionEvent) {
55+
sourceEventDataBaseName =
56+
((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion();
57+
isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent();
58+
} else {
59+
sourceEventDataBaseName = null;
60+
isTableModel = null;
61+
}
62+
}
63+
64+
public PipeRowCollector(
65+
PipeTaskMeta pipeTaskMeta,
66+
EnrichedEvent sourceEvent,
67+
String sourceEventDataBase,
68+
Boolean isTableModel) {
69+
this.pipeTaskMeta = pipeTaskMeta;
70+
this.sourceEvent = sourceEvent;
71+
this.sourceEventDataBaseName = sourceEventDataBase;
72+
this.isTableModel = isTableModel;
5273
}
5374

5475
@Override
@@ -106,8 +127,10 @@ private void collectTabletInsertionEvent() {
106127
sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null;
107128
tabletInsertionEventList.add(
108129
new PipeRawTabletInsertionEvent(
109-
pipeInsertionEvent == null ? null : pipeInsertionEvent.isTableModelEvent(),
110-
pipeInsertionEvent == null ? null : pipeInsertionEvent.getTreeModelDatabaseName(),
130+
isTableModel,
131+
sourceEventDataBaseName,
132+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(),
133+
pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(),
111134
tablet,
112135
isAligned,
113136
sourceEvent == null ? null : sourceEvent.getPipeName(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,16 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
9393
private ProgressIndex progressIndex;
9494

9595
public PipeInsertNodeTabletInsertionEvent(
96-
final String databaseName,
96+
final Boolean isTableModel,
97+
final String databaseNameFromDataRegion,
9798
final WALEntryHandler walEntryHandler,
9899
final PartialPath devicePath,
99100
final ProgressIndex progressIndex,
100101
final boolean isAligned,
101102
final boolean isGeneratedByPipe) {
102103
this(
103-
null,
104-
databaseName,
104+
isTableModel,
105+
databaseNameFromDataRegion,
105106
walEntryHandler,
106107
devicePath,
107108
progressIndex,
@@ -118,7 +119,7 @@ public PipeInsertNodeTabletInsertionEvent(
118119

119120
private PipeInsertNodeTabletInsertionEvent(
120121
final Boolean isTableModelEvent,
121-
final String databaseName,
122+
final String databaseNameFromDataRegion,
122123
final WALEntryHandler walEntryHandler,
123124
final PartialPath devicePath,
124125
final ProgressIndex progressIndex,
@@ -140,7 +141,7 @@ private PipeInsertNodeTabletInsertionEvent(
140141
startTime,
141142
endTime,
142143
isTableModelEvent,
143-
databaseName);
144+
databaseNameFromDataRegion);
144145
this.walEntryHandler = walEntryHandler;
145146
// Record device path here so there's no need to get it from InsertNode cache later.
146147
this.devicePath = devicePath;
@@ -243,7 +244,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
243244
final long endTime) {
244245
return new PipeInsertNodeTabletInsertionEvent(
245246
getRawIsTableModelEvent(),
246-
getTreeModelDatabaseName(),
247+
getSourceDatabaseNameFromDataRegion(),
247248
walEntryHandler,
248249
devicePath,
249250
progressIndex,
@@ -459,7 +460,9 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
459460
container ->
460461
new PipeRawTabletInsertionEvent(
461462
getRawIsTableModelEvent(),
462-
getTreeModelDatabaseName(),
463+
getSourceDatabaseNameFromDataRegion(),
464+
getRawTableModelDataBase(),
465+
getRawTreeModelDataBase(),
463466
container.convertToTablet(),
464467
container.isAligned(),
465468
pipeName,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
7171
private PipeRawTabletInsertionEvent(
7272
final Boolean isTableModelEvent,
7373
final String databaseName,
74+
final String tableModelDataBaseName,
75+
final String treeModelDataBaseName,
7476
final Tablet tablet,
7577
final boolean isAligned,
7678
final EnrichedEvent sourceEvent,
@@ -91,7 +93,9 @@ private PipeRawTabletInsertionEvent(
9193
startTime,
9294
endTime,
9395
isTableModelEvent,
94-
databaseName);
96+
databaseName,
97+
tableModelDataBaseName,
98+
treeModelDataBaseName);
9599
this.tablet = Objects.requireNonNull(tablet);
96100
this.isAligned = isAligned;
97101
this.sourceEvent = sourceEvent;
@@ -101,6 +105,8 @@ private PipeRawTabletInsertionEvent(
101105
public PipeRawTabletInsertionEvent(
102106
final Boolean isTableModelEvent,
103107
final String databaseName,
108+
final String tableModelDataBaseName,
109+
final String treeModelDataBaseName,
104110
final Tablet tablet,
105111
final boolean isAligned,
106112
final String pipeName,
@@ -111,6 +117,8 @@ public PipeRawTabletInsertionEvent(
111117
this(
112118
isTableModelEvent,
113119
databaseName,
120+
tableModelDataBaseName,
121+
treeModelDataBaseName,
114122
tablet,
115123
isAligned,
116124
sourceEvent,
@@ -127,6 +135,8 @@ public PipeRawTabletInsertionEvent(
127135
@TestOnly
128136
public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) {
129137
this(
138+
null,
139+
null,
130140
null,
131141
null,
132142
tablet,
@@ -146,6 +156,8 @@ public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned)
146156
public PipeRawTabletInsertionEvent(
147157
final Tablet tablet, final boolean isAligned, final TreePattern treePattern) {
148158
this(
159+
null,
160+
null,
149161
null,
150162
null,
151163
tablet,
@@ -164,7 +176,9 @@ public PipeRawTabletInsertionEvent(
164176
@TestOnly
165177
public PipeRawTabletInsertionEvent(
166178
final Tablet tablet, final long startTime, final long endTime) {
167-
this(null, null, tablet, false, null, false, null, 0, null, null, null, startTime, endTime);
179+
this(
180+
null, null, null, null, tablet, false, null, false, null, 0, null, null, null, startTime,
181+
endTime);
168182
}
169183

170184
@Override
@@ -240,7 +254,9 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
240254
final long endTime) {
241255
return new PipeRawTabletInsertionEvent(
242256
getRawIsTableModelEvent(),
243-
getTreeModelDatabaseName(),
257+
getSourceDatabaseNameFromDataRegion(),
258+
getRawTableModelDataBase(),
259+
getRawTreeModelDataBase(),
244260
tablet,
245261
isAligned,
246262
sourceEvent,
@@ -338,7 +354,9 @@ public long count() {
338354
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
339355
return new PipeRawTabletInsertionEvent(
340356
getRawIsTableModelEvent(),
341-
getTreeModelDatabaseName(),
357+
getSourceDatabaseNameFromDataRegion(),
358+
getRawTableModelDataBase(),
359+
getRawTreeModelDataBase(),
342360
convertToTablet(),
343361
isAligned,
344362
pipeName,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,16 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
9191
private volatile ProgressIndex overridingProgressIndex;
9292

9393
public PipeTsFileInsertionEvent(
94-
final String databaseName,
94+
final Boolean isTableModelEvent,
95+
final String databaseNameFromDataRegion,
9596
final TsFileResource resource,
9697
final boolean isLoaded,
9798
final boolean isGeneratedByPipe,
9899
final boolean isGeneratedByHistoricalExtractor) {
99100
// The modFile must be copied before the event is assigned to the listening pipes
100101
this(
101-
null,
102-
databaseName,
102+
isTableModelEvent,
103+
databaseNameFromDataRegion,
103104
resource,
104105
true,
105106
isLoaded,
@@ -116,7 +117,7 @@ public PipeTsFileInsertionEvent(
116117

117118
public PipeTsFileInsertionEvent(
118119
final Boolean isTableModelEvent,
119-
final String databaseName,
120+
final String databaseNameFromDataRegion,
120121
final TsFileResource resource,
121122
final boolean isWithMod,
122123
final boolean isLoaded,
@@ -138,7 +139,7 @@ public PipeTsFileInsertionEvent(
138139
startTime,
139140
endTime,
140141
isTableModelEvent,
141-
databaseName);
142+
databaseNameFromDataRegion);
142143

143144
this.resource = resource;
144145
tsFile = resource.getTsFile();
@@ -377,7 +378,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
377378
final long endTime) {
378379
return new PipeTsFileInsertionEvent(
379380
getRawIsTableModelEvent(),
380-
getTreeModelDatabaseName(),
381+
getSourceDatabaseNameFromDataRegion(),
381382
resource,
382383
isWithMod,
383384
isLoaded,
@@ -424,7 +425,8 @@ public boolean mayEventPathsOverlappedWithPattern() {
424425
.anyMatch(
425426
deviceID -> {
426427
// Tree model
427-
if (deviceID instanceof PlainDeviceID
428+
if (Boolean.FALSE.equals(getRawIsTableModelEvent())
429+
|| deviceID instanceof PlainDeviceID
428430
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
429431
|| deviceID.getTableName().equals(PATH_ROOT)) {
430432
markAsTreeModelEvent();
@@ -450,6 +452,10 @@ public boolean mayEventPathsOverlappedWithPattern() {
450452
@Override
451453
public boolean isTableModelEvent() {
452454
if (getRawIsTableModelEvent() == null) {
455+
if (getSourceDatabaseNameFromDataRegion() != null) {
456+
return super.isTableModelEvent();
457+
}
458+
453459
try {
454460
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
455461
PipeDataNodeResourceManager.tsfile()

0 commit comments

Comments
 (0)