Skip to content

Commit 9478b77

Browse files
authored
fix mem table query bug apache#16964
1 parent a6d65cb commit 9478b77

File tree

2 files changed

+173
-31
lines changed

2 files changed

+173
-31
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919

2020
package org.apache.iotdb.db.it;
2121

22+
import org.apache.iotdb.isession.ISession;
23+
import org.apache.iotdb.isession.SessionDataSet;
2224
import org.apache.iotdb.it.env.EnvFactory;
2325
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2426
import org.apache.iotdb.itbase.category.ClusterIT;
2527
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
28+
import org.apache.iotdb.rpc.IoTDBConnectionException;
29+
import org.apache.iotdb.rpc.StatementExecutionException;
2630

31+
import org.apache.tsfile.enums.TSDataType;
32+
import org.apache.tsfile.write.record.Tablet;
33+
import org.apache.tsfile.write.schema.IMeasurementSchema;
34+
import org.apache.tsfile.write.schema.MeasurementSchema;
2735
import org.junit.AfterClass;
36+
import org.junit.Assert;
2837
import org.junit.BeforeClass;
2938
import org.junit.Test;
3039
import org.junit.experimental.categories.Category;
@@ -34,6 +43,8 @@
3443
import java.sql.ResultSet;
3544
import java.sql.SQLException;
3645
import java.sql.Statement;
46+
import java.util.Collections;
47+
import java.util.List;
3748
import java.util.Locale;
3849

3950
import static org.junit.Assert.assertEquals;
@@ -181,4 +192,75 @@ public void testFlushNotExistGroupNoData() {
181192
fail(e.getMessage());
182193
}
183194
}
195+
196+
@Test
197+
public void testStreamingQueryMemTableWithOverlappedData()
198+
throws IoTDBConnectionException, StatementExecutionException {
199+
String device = "root.stream1.d1";
200+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
201+
session.open();
202+
generateTimeRangeWithTimestamp(session, device, 1, 10);
203+
204+
generateTimeRangeWithTimestamp(session, device, 500000, 510000);
205+
session.executeNonQueryStatement("flush");
206+
generateTimeRangeWithTimestamp(session, device, 100000, 350000);
207+
208+
SessionDataSet sessionDataSet =
209+
session.executeQueryStatement("select count(*) from root.stream1.d1");
210+
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
211+
long count = 0;
212+
while (iterator.next()) {
213+
count = iterator.getLong(1);
214+
}
215+
Assert.assertEquals(10 + 10001 + 250001, count);
216+
}
217+
}
218+
219+
@Test
220+
public void testStreamingQueryMemTableWithOverlappedData2()
221+
throws IoTDBConnectionException, StatementExecutionException {
222+
String device = "root.stream2.d1";
223+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
224+
session.open();
225+
generateTimeRangeWithTimestamp(session, device, 1, 10);
226+
227+
generateTimeRangeWithTimestamp(session, device, 500000, 510000);
228+
session.executeNonQueryStatement("flush");
229+
generateTimeRangeWithTimestamp(session, device, 1, 20);
230+
generateTimeRangeWithTimestamp(session, device, 100000, 210000);
231+
session.executeNonQueryStatement("flush");
232+
233+
generateTimeRangeWithTimestamp(session, device, 150000, 450000);
234+
235+
SessionDataSet sessionDataSet =
236+
session.executeQueryStatement("select count(*) from root.stream2.d1");
237+
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
238+
long count = 0;
239+
while (iterator.next()) {
240+
count = iterator.getLong(1);
241+
}
242+
Assert.assertEquals(20 + 10001 + 350001, count);
243+
}
244+
}
245+
246+
private static void generateTimeRangeWithTimestamp(
247+
ISession session, String device, long start, long end)
248+
throws IoTDBConnectionException, StatementExecutionException {
249+
List<IMeasurementSchema> measurementSchemas =
250+
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64));
251+
Tablet tablet = new Tablet(device, measurementSchemas);
252+
for (long currentTime = start; currentTime <= end; currentTime++) {
253+
int rowIndex = tablet.getRowSize();
254+
if (rowIndex == tablet.getMaxRowNumber()) {
255+
session.insertTablet(tablet);
256+
tablet.reset();
257+
rowIndex = 0;
258+
}
259+
tablet.addTimestamp(rowIndex, currentTime);
260+
tablet.addValue(rowIndex, 0, currentTime);
261+
}
262+
if (tablet.getRowSize() > 0) {
263+
session.insertTablet(tablet);
264+
}
265+
}
184266
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -890,15 +890,28 @@ private boolean hasNextOverlappedPage() throws IOException {
890890
return true;
891891
}
892892

893-
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
893+
// init the merge reader for current call
894+
// The original process is changed to lazy loading because different mem page readers
895+
// belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is
896+
// necessary to ensure that these mem page readers cannot coexist in the mergeReader at the
897+
// same time.
898+
// The initial endPointTime is calculated as follows:
899+
// 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped
900+
// unseq pages and take the end point.
901+
// 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping
902+
// unseq pages and take the end point.
903+
long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
894904

895905
while (true) {
896906

897907
// may has overlapped data
898908
if (mergeReader.hasNextTimeValuePair()) {
899909

900910
TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
901-
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
911+
long currentPageEndPointTime =
912+
orderUtils.getAscending()
913+
? Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime)
914+
: Math.min(mergeReader.getCurrentReadStopTime(), initialEndPointTime);
902915
while (mergeReader.hasNextTimeValuePair()) {
903916

904917
/*
@@ -928,7 +941,7 @@ private boolean hasNextOverlappedPage() throws IOException {
928941
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
929942
timeValuePair.getTimestamp(), false);
930943
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
931-
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
944+
unpackAllOverlappedUnseqPageReadersToMergeReader();
932945

933946
// update if there are unpacked unSeqPageReaders
934947
timeValuePair = mergeReader.currentTimeValuePair();
@@ -1017,33 +1030,71 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader
10171030
}
10181031
}
10191032

1020-
private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
1033+
private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
1034+
do {
1035+
/*
1036+
* no cached page readers
1037+
*/
1038+
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
1039+
return mergeReader.getCurrentReadStopTime();
1040+
}
10211041

1022-
/*
1023-
* no cached page readers
1024-
*/
1025-
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
1026-
return;
1027-
}
1042+
/*
1043+
* init firstPageReader
1044+
*/
1045+
if (firstPageReader == null) {
1046+
initFirstPageReader();
1047+
}
1048+
if (!mergeReader.hasNextTimeValuePair()) {
1049+
putPageReaderToMergeReader(firstPageReader);
1050+
firstPageReader = null;
1051+
}
1052+
} while (!mergeReader.hasNextTimeValuePair());
10281053

10291054
/*
1030-
* init firstPageReader
1055+
* put all currently directly overlapped unseq page reader to merge reader
10311056
*/
1032-
if (firstPageReader == null) {
1033-
initFirstPageReader();
1034-
}
1057+
long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
1058+
unpackAllOverlappedUnseqPageReadersToMergeReader();
10351059

1036-
long currentPageEndpointTime;
1037-
if (mergeReader.hasNextTimeValuePair()) {
1038-
currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
1039-
} else {
1040-
currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
1041-
}
1060+
return calculateInitialEndPointTime(mergeReaderStopTime);
1061+
}
10421062

1043-
/*
1044-
* put all currently directly overlapped unseq page reader to merge reader
1045-
*/
1046-
unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
1063+
private long calculateInitialEndPointTime(final long currentReadStopTime) {
1064+
long initialReadStopTime = currentReadStopTime;
1065+
if (firstPageReader != null
1066+
&& !firstPageReader.isSeq()
1067+
&& orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) {
1068+
if (orderUtils.getAscending()) {
1069+
initialReadStopTime =
1070+
Math.max(
1071+
initialReadStopTime,
1072+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
1073+
} else {
1074+
initialReadStopTime =
1075+
Math.min(
1076+
initialReadStopTime,
1077+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
1078+
}
1079+
}
1080+
for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
1081+
if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) {
1082+
if (orderUtils.getAscending()) {
1083+
initialReadStopTime =
1084+
Math.max(
1085+
initialReadStopTime,
1086+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
1087+
} else {
1088+
initialReadStopTime =
1089+
Math.min(
1090+
initialReadStopTime,
1091+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
1092+
}
1093+
} else {
1094+
break;
1095+
}
1096+
}
1097+
return initialReadStopTime;
10471098
}
10481099

10491100
private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) {
@@ -1135,17 +1186,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() {
11351186
return firstPageReader;
11361187
}
11371188

1138-
private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
1139-
throws IOException {
1140-
while (!unSeqPageReaders.isEmpty()
1141-
&& orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) {
1142-
putPageReaderToMergeReader(unSeqPageReaders.poll());
1143-
}
1189+
// This process loads overlapped unseq pages based on the current time value pair of the
1190+
// mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq
1191+
// page is added.
1192+
// The current time obtained from mergeReader each time is not necessarily the minimum among all
1193+
// the actual unseq data, so it is necessary to repeatedly calculate and include potentially
1194+
// overlapping unseq pages.
1195+
private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException {
1196+
long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
11441197
if (firstPageReader != null
11451198
&& !firstPageReader.isSeq()
1146-
&& orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
1199+
&& orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) {
11471200
putPageReaderToMergeReader(firstPageReader);
11481201
firstPageReader = null;
1202+
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
1203+
}
1204+
while (!unSeqPageReaders.isEmpty()
1205+
&& orderUtils.isOverlapped(
1206+
actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) {
1207+
putPageReaderToMergeReader(unSeqPageReaders.poll());
1208+
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
11491209
}
11501210
}
11511211

0 commit comments

Comments
 (0)