Skip to content

Commit e981d88

Browse files
committed
add TsFile scan object data function
1 parent 112568c commit e981d88

11 files changed

+343
-27
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,19 @@
3333
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3434
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3535
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
36+
import org.apache.iotdb.pipe.api.exception.PipeException;
3637

3738
import org.apache.tsfile.read.TsFileSequenceReader;
3839
import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
3940
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
41+
import org.apache.tsfile.utils.Binary;
42+
import org.apache.tsfile.utils.BitMap;
4043
import org.apache.tsfile.write.record.Tablet;
4144
import org.slf4j.Logger;
4245
import org.slf4j.LoggerFactory;
4346

4447
import java.io.IOException;
48+
import java.util.Iterator;
4549

4650
public abstract class TsFileInsertionEventParser implements AutoCloseable {
4751

@@ -79,6 +83,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable {
7983

8084
protected Iterable<TabletInsertionEvent> tabletInsertionIterable;
8185

86+
protected final boolean notOnlyNeedObject;
87+
8288
protected TsFileInsertionEventParser(
8389
final String pipeName,
8490
final long creationTime,
@@ -88,7 +94,8 @@ protected TsFileInsertionEventParser(
8894
final long endTime,
8995
final PipeTaskMeta pipeTaskMeta,
9096
final PipeInsertionEvent sourceEvent,
91-
final TsFileResource tsFileResource) {
97+
final TsFileResource tsFileResource,
98+
final boolean notOnlyNeedObject) {
9299
this.pipeName = pipeName;
93100
this.creationTime = creationTime;
94101

@@ -115,13 +122,142 @@ protected TsFileInsertionEventParser(
115122
PipeDataNodeResourceManager.memory()
116123
.forceAllocateForTabletWithRetry(
117124
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
125+
126+
this.notOnlyNeedObject = notOnlyNeedObject;
118127
}
119128

120129
/**
121130
* @return {@link TabletInsertionEvent} in a streaming way
122131
*/
123132
public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
124133

134+
public abstract Iterable<Binary> getObjectTypeData();
135+
136+
/**
137+
* Template method for creating ObjectTypeData iterator. Subclasses should implement
138+
* createObjectTypeDataIteratorState() to provide data source specific state management.
139+
*/
140+
protected Iterable<Binary> createObjectTypeDataIterator() {
141+
return () -> {
142+
final ObjectTypeDataIteratorState state = createObjectTypeDataIteratorState();
143+
return new Iterator<Binary>() {
144+
private Tablet tablet = null;
145+
private int rowIndex = 0;
146+
private int columnIndex = 0;
147+
148+
@Override
149+
public boolean hasNext() {
150+
while (true) {
151+
// Process current Tablet
152+
if (tablet != null) {
153+
final Object[] values = tablet.getValues();
154+
if (values == null || columnIndex >= values.length) {
155+
resetTablet();
156+
continue;
157+
}
158+
159+
// Check if current column is Binary[] type
160+
if (!(values[columnIndex] instanceof Binary[])) {
161+
columnIndex++;
162+
continue;
163+
}
164+
165+
final int rowSize = tablet.getRowSize();
166+
167+
while (rowIndex < rowSize) {
168+
if (!isRowNull(tablet, columnIndex, rowIndex)) {
169+
return true;
170+
}
171+
rowIndex++;
172+
}
173+
174+
columnIndex++;
175+
rowIndex = 0;
176+
continue;
177+
}
178+
179+
// Check if there are more data sources
180+
if (!state.hasMoreDataSources()) {
181+
return false;
182+
}
183+
184+
// Get next Tablet from data source
185+
try {
186+
tablet = state.getNextTablet();
187+
} catch (final Exception e) {
188+
close();
189+
throw new PipeException("failed to read next Tablet", e);
190+
}
191+
192+
rowIndex = 0;
193+
columnIndex = 0;
194+
195+
// If the fetched Tablet has no data, continue to fetch the next one
196+
if (tablet == null || tablet.getRowSize() == 0) {
197+
tablet = null;
198+
continue;
199+
}
200+
201+
final Object[] values = tablet.getValues();
202+
if (values == null || values.length == 0) {
203+
tablet = null;
204+
continue;
205+
}
206+
}
207+
}
208+
209+
private void resetTablet() {
210+
tablet = null;
211+
columnIndex = 0;
212+
rowIndex = 0;
213+
}
214+
215+
private boolean isRowNull(Tablet tablet, int colIndex, int rowIdx) {
216+
final BitMap[] bitMaps = tablet.getBitMaps();
217+
return bitMaps != null
218+
&& colIndex < bitMaps.length
219+
&& bitMaps[colIndex] != null
220+
&& bitMaps[colIndex].isMarked(rowIdx);
221+
}
222+
223+
@Override
224+
public Binary next() {
225+
final Binary[] column = (Binary[]) tablet.getValues()[columnIndex];
226+
return column[rowIndex++];
227+
}
228+
};
229+
};
230+
}
231+
232+
/**
233+
* Create state object for ObjectTypeData iterator. Should be implemented by subclasses to provide
234+
* data source specific state management.
235+
*
236+
* @return the state object for managing data source iteration
237+
*/
238+
protected abstract ObjectTypeDataIteratorState createObjectTypeDataIteratorState();
239+
240+
/**
241+
* State interface for ObjectTypeData iterator. Subclasses should implement this to manage their
242+
* specific data source iteration state.
243+
*/
244+
protected interface ObjectTypeDataIteratorState {
245+
/**
246+
* Check if there are more data sources to read from.
247+
*
248+
* @return true if there are more data sources, false otherwise
249+
*/
250+
boolean hasMoreDataSources();
251+
252+
/**
253+
* Get the next Tablet from the data source.
254+
*
255+
* @return the next Tablet, or null if no more data
256+
* @throws Exception if an error occurs while reading
257+
*/
258+
Tablet getNextTablet() throws Exception;
259+
}
260+
125261
/**
126262
* Record parse start time when hasNext() is called for the first time and returns true. Should be
127263
* called in Iterator.hasNext() when it's the first call.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
9595
pipeTaskMeta,
9696
userName,
9797
sourceEvent,
98-
isWithMod);
98+
isWithMod,
99+
true);
99100
}
100101

101102
// Use scan container to save memory
@@ -111,7 +112,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
111112
endTime,
112113
pipeTaskMeta,
113114
sourceEvent,
114-
isWithMod);
115+
isWithMod,
116+
true);
115117
}
116118

117119
if (treePattern instanceof IoTDBTreePatternOperations
@@ -131,7 +133,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
131133
endTime,
132134
pipeTaskMeta,
133135
sourceEvent,
134-
isWithMod);
136+
isWithMod,
137+
true);
135138
}
136139

137140
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -148,7 +151,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
148151
endTime,
149152
pipeTaskMeta,
150153
sourceEvent,
151-
isWithMod);
154+
isWithMod,
155+
true);
152156
}
153157

154158
final int originalSize = deviceIsAlignedMap.size();
@@ -166,7 +170,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
166170
endTime,
167171
pipeTaskMeta,
168172
sourceEvent,
169-
isWithMod)
173+
isWithMod,
174+
true)
170175
: new TsFileInsertionEventQueryParser(
171176
pipeName,
172177
creationTime,
@@ -177,7 +182,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce
177182
pipeTaskMeta,
178183
sourceEvent,
179184
filteredDeviceIsAlignedMap,
180-
isWithMod);
185+
isWithMod,
186+
true);
181187
}
182188

183189
private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.tsfile.read.TsFileDeviceIterator;
4242
import org.apache.tsfile.read.TsFileReader;
4343
import org.apache.tsfile.read.TsFileSequenceReader;
44+
import org.apache.tsfile.utils.Binary;
4445
import org.apache.tsfile.utils.Pair;
4546
import org.apache.tsfile.write.record.Tablet;
4647
import org.slf4j.Logger;
@@ -78,7 +79,7 @@ public TsFileInsertionEventQueryParser(
7879
final long endTime,
7980
final PipeInsertionEvent sourceEvent)
8081
throws IOException {
81-
this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent, false);
82+
this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent, false, true);
8283
}
8384

8485
public TsFileInsertionEventQueryParser(
@@ -90,7 +91,8 @@ public TsFileInsertionEventQueryParser(
9091
final long endTime,
9192
final PipeTaskMeta pipeTaskMeta,
9293
final PipeInsertionEvent sourceEvent,
93-
final boolean isWithMod)
94+
final boolean isWithMod,
95+
final boolean notOnlyNeedObject)
9496
throws IOException {
9597
this(
9698
pipeName,
@@ -102,7 +104,8 @@ public TsFileInsertionEventQueryParser(
102104
pipeTaskMeta,
103105
sourceEvent,
104106
null,
105-
isWithMod);
107+
isWithMod,
108+
notOnlyNeedObject);
106109
}
107110

108111
public TsFileInsertionEventQueryParser(
@@ -115,7 +118,8 @@ public TsFileInsertionEventQueryParser(
115118
final PipeTaskMeta pipeTaskMeta,
116119
final PipeInsertionEvent sourceEvent,
117120
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
118-
final boolean isWithMod)
121+
final boolean isWithMod,
122+
final boolean notOnlyNeedObject)
119123
throws IOException {
120124
super(
121125
pipeName,
@@ -126,7 +130,8 @@ public TsFileInsertionEventQueryParser(
126130
endTime,
127131
pipeTaskMeta,
128132
sourceEvent,
129-
null); // tsFileResource will be obtained from sourceEvent
133+
null,
134+
notOnlyNeedObject); // tsFileResource will be obtained from sourceEvent
130135

131136
try {
132137
currentModifications =
@@ -339,9 +344,10 @@ private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
339344
final Map<IDeviceID, List<String>> result = new HashMap<>();
340345

341346
for (final IDeviceID device : devices) {
342-
tsFileSequenceReader
343-
.readDeviceMetadata(device)
344-
.values()
347+
tsFileSequenceReader.readDeviceMetadata(device).values().stream()
348+
.filter(
349+
timeseriesMetadata ->
350+
notOnlyNeedObject || timeseriesMetadata.getTsDataType() == TSDataType.OBJECT)
345351
.forEach(
346352
timeseriesMetadata ->
347353
result
@@ -491,6 +497,50 @@ public TabletInsertionEvent next() {
491497
return tabletInsertionIterable;
492498
}
493499

500+
@Override
501+
public Iterable<Binary> getObjectTypeData() {
502+
return createObjectTypeDataIterator();
503+
}
504+
505+
@Override
506+
protected ObjectTypeDataIteratorState createObjectTypeDataIteratorState() {
507+
return new ObjectTypeDataIteratorState() {
508+
private TsFileInsertionEventQueryParserTabletIterator tabletIterator = null;
509+
510+
@Override
511+
public boolean hasMoreDataSources() {
512+
while (tabletIterator == null || !tabletIterator.hasNext()) {
513+
if (!deviceMeasurementsMapIterator.hasNext()) {
514+
return false;
515+
}
516+
517+
final Map.Entry<IDeviceID, List<String>> entry = deviceMeasurementsMapIterator.next();
518+
519+
try {
520+
tabletIterator =
521+
new TsFileInsertionEventQueryParserTabletIterator(
522+
tsFileReader,
523+
measurementDataTypeMap,
524+
entry.getKey(),
525+
entry.getValue(),
526+
timeFilterExpression,
527+
allocatedMemoryBlockForTablet,
528+
currentModifications);
529+
} catch (final Exception e) {
530+
close();
531+
throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e);
532+
}
533+
}
534+
return true;
535+
}
536+
537+
@Override
538+
public Tablet getNextTablet() throws Exception {
539+
return tabletIterator.next();
540+
}
541+
};
542+
}
543+
494544
@Override
495545
public void close() {
496546
try {

0 commit comments

Comments
 (0)