Skip to content

Commit 9eb0272

Browse files
authored
[to dev/1.3] fix mem table query bug (#16970)
1 parent 2fe40b5 commit 9eb0272

File tree

2 files changed

+173
-31
lines changed

2 files changed

+173
-31
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,20 @@
1919

2020
package org.apache.iotdb.db.it;
2121

22+
import org.apache.iotdb.isession.ISession;
23+
import org.apache.iotdb.isession.SessionDataSet;
2224
import org.apache.iotdb.it.env.EnvFactory;
2325
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2426
import org.apache.iotdb.itbase.category.ClusterIT;
2527
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
28+
import org.apache.iotdb.rpc.IoTDBConnectionException;
29+
import org.apache.iotdb.rpc.StatementExecutionException;
2630

31+
import org.apache.tsfile.enums.TSDataType;
32+
import org.apache.tsfile.write.record.Tablet;
33+
import org.apache.tsfile.write.schema.MeasurementSchema;
2734
import org.junit.AfterClass;
35+
import org.junit.Assert;
2836
import org.junit.BeforeClass;
2937
import org.junit.Ignore;
3038
import org.junit.Test;
@@ -35,6 +43,8 @@
3543
import java.sql.ResultSet;
3644
import java.sql.SQLException;
3745
import java.sql.Statement;
46+
import java.util.Collections;
47+
import java.util.List;
3848
import java.util.Locale;
3949

4050
import static org.junit.Assert.assertEquals;
@@ -184,4 +194,76 @@ public void testFlushNotExistGroupNoData() {
184194
fail(e.getMessage());
185195
}
186196
}
197+
198+
@Test
199+
public void testStreamingQueryMemTableWithOverlappedData()
200+
throws IoTDBConnectionException, StatementExecutionException {
201+
String device = "root.stream1.d1";
202+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
203+
session.open();
204+
generateTimeRangeWithTimestamp(session, device, 1, 10);
205+
206+
generateTimeRangeWithTimestamp(session, device, 500000, 510000);
207+
session.executeNonQueryStatement("flush");
208+
generateTimeRangeWithTimestamp(session, device, 100000, 350000);
209+
210+
SessionDataSet sessionDataSet =
211+
session.executeQueryStatement("select count(*) from root.stream1.d1");
212+
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
213+
long count = 0;
214+
while (iterator.next()) {
215+
count = iterator.getLong(1);
216+
}
217+
Assert.assertEquals(10 + 10001 + 250001, count);
218+
}
219+
}
220+
221+
@Test
222+
public void testStreamingQueryMemTableWithOverlappedData2()
223+
throws IoTDBConnectionException, StatementExecutionException {
224+
String device = "root.stream2.d1";
225+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
226+
session.open();
227+
generateTimeRangeWithTimestamp(session, device, 1, 10);
228+
229+
generateTimeRangeWithTimestamp(session, device, 500000, 510000);
230+
session.executeNonQueryStatement("flush");
231+
generateTimeRangeWithTimestamp(session, device, 1, 20);
232+
generateTimeRangeWithTimestamp(session, device, 100000, 210000);
233+
session.executeNonQueryStatement("flush");
234+
235+
generateTimeRangeWithTimestamp(session, device, 150000, 450000);
236+
237+
SessionDataSet sessionDataSet =
238+
session.executeQueryStatement("select count(*) from root.stream2.d1");
239+
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
240+
long count = 0;
241+
while (iterator.next()) {
242+
count = iterator.getLong(1);
243+
}
244+
Assert.assertEquals(20 + 10001 + 350001, count);
245+
}
246+
}
247+
248+
private static void generateTimeRangeWithTimestamp(
249+
ISession session, String device, long start, long end)
250+
throws IoTDBConnectionException, StatementExecutionException {
251+
List<MeasurementSchema> measurementSchemas =
252+
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64));
253+
Tablet tablet = new Tablet(device, measurementSchemas);
254+
for (long currentTime = start; currentTime <= end; currentTime++) {
255+
int rowIndex = tablet.rowSize;
256+
if (rowIndex == tablet.getMaxRowNumber()) {
257+
session.insertTablet(tablet);
258+
tablet.reset();
259+
rowIndex = 0;
260+
}
261+
tablet.addTimestamp(rowIndex, currentTime);
262+
tablet.addValue("s1", 0, currentTime);
263+
tablet.rowSize++;
264+
}
265+
if (tablet.rowSize > 0) {
266+
session.insertTablet(tablet);
267+
}
268+
}
187269
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -869,15 +869,28 @@ private boolean hasNextOverlappedPage() throws IOException {
869869
return true;
870870
}
871871

872-
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
872+
// init the merge reader for current call
873+
// The original process is changed to lazy loading because different mem page readers
874+
// belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is
875+
// necessary to ensure that these mem page readers cannot coexist in the mergeReader at the
876+
// same time.
877+
// The initial endPointTime is calculated as follows:
878+
// 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped
879+
// unseq pages and take the end point.
880+
// 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping
881+
// unseq pages and take the end point.
882+
long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
873883

874884
while (true) {
875885

876886
// may has overlapped data
877887
if (mergeReader.hasNextTimeValuePair()) {
878888

879889
TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
880-
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
890+
long currentPageEndPointTime =
891+
orderUtils.getAscending()
892+
? Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime)
893+
: Math.min(mergeReader.getCurrentReadStopTime(), initialEndPointTime);
881894
while (mergeReader.hasNextTimeValuePair()) {
882895

883896
/*
@@ -907,7 +920,7 @@ private boolean hasNextOverlappedPage() throws IOException {
907920
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
908921
timeValuePair.getTimestamp(), false);
909922
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
910-
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
923+
unpackAllOverlappedUnseqPageReadersToMergeReader();
911924

912925
// update if there are unpacked unSeqPageReaders
913926
timeValuePair = mergeReader.currentTimeValuePair();
@@ -996,33 +1009,71 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader
9961009
}
9971010
}
9981011

999-
private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
1012+
private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
1013+
do {
1014+
/*
1015+
* no cached page readers
1016+
*/
1017+
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
1018+
return mergeReader.getCurrentReadStopTime();
1019+
}
10001020

1001-
/*
1002-
* no cached page readers
1003-
*/
1004-
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
1005-
return;
1006-
}
1021+
/*
1022+
* init firstPageReader
1023+
*/
1024+
if (firstPageReader == null) {
1025+
initFirstPageReader();
1026+
}
1027+
if (!mergeReader.hasNextTimeValuePair()) {
1028+
putPageReaderToMergeReader(firstPageReader);
1029+
firstPageReader = null;
1030+
}
1031+
} while (!mergeReader.hasNextTimeValuePair());
10071032

10081033
/*
1009-
* init firstPageReader
1034+
* put all currently directly overlapped unseq page reader to merge reader
10101035
*/
1011-
if (firstPageReader == null) {
1012-
initFirstPageReader();
1013-
}
1036+
long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
1037+
unpackAllOverlappedUnseqPageReadersToMergeReader();
10141038

1015-
long currentPageEndpointTime;
1016-
if (mergeReader.hasNextTimeValuePair()) {
1017-
currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
1018-
} else {
1019-
currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
1020-
}
1039+
return calculateInitialEndPointTime(mergeReaderStopTime);
1040+
}
10211041

1022-
/*
1023-
* put all currently directly overlapped unseq page reader to merge reader
1024-
*/
1025-
unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
1042+
private long calculateInitialEndPointTime(final long currentReadStopTime) {
1043+
long initialReadStopTime = currentReadStopTime;
1044+
if (firstPageReader != null
1045+
&& !firstPageReader.isSeq()
1046+
&& orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) {
1047+
if (orderUtils.getAscending()) {
1048+
initialReadStopTime =
1049+
Math.max(
1050+
initialReadStopTime,
1051+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
1052+
} else {
1053+
initialReadStopTime =
1054+
Math.min(
1055+
initialReadStopTime,
1056+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
1057+
}
1058+
}
1059+
for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
1060+
if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) {
1061+
if (orderUtils.getAscending()) {
1062+
initialReadStopTime =
1063+
Math.max(
1064+
initialReadStopTime,
1065+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
1066+
} else {
1067+
initialReadStopTime =
1068+
Math.min(
1069+
initialReadStopTime,
1070+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
1071+
}
1072+
} else {
1073+
break;
1074+
}
1075+
}
1076+
return initialReadStopTime;
10261077
}
10271078

10281079
private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) {
@@ -1113,17 +1164,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() {
11131164
return firstPageReader;
11141165
}
11151166

1116-
private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
1117-
throws IOException {
1118-
while (!unSeqPageReaders.isEmpty()
1119-
&& orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) {
1120-
putPageReaderToMergeReader(unSeqPageReaders.poll());
1121-
}
1167+
// This process loads overlapped unseq pages based on the current time value pair of the
1168+
// mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq
1169+
// page is added.
1170+
// The current time obtained from mergeReader each time is not necessarily the minimum among all
1171+
// the actual unseq data, so it is necessary to repeatedly calculate and include potentially
1172+
// overlapping unseq pages.
1173+
private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException {
1174+
long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
11221175
if (firstPageReader != null
11231176
&& !firstPageReader.isSeq()
1124-
&& orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
1177+
&& orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) {
11251178
putPageReaderToMergeReader(firstPageReader);
11261179
firstPageReader = null;
1180+
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
1181+
}
1182+
while (!unSeqPageReaders.isEmpty()
1183+
&& orderUtils.isOverlapped(
1184+
actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) {
1185+
putPageReaderToMergeReader(unSeqPageReaders.poll());
1186+
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
11271187
}
11281188
}
11291189

0 commit comments

Comments
 (0)