Skip to content

Commit 7466e2c

Browse files
authored
Concurrently querying and writing to the memtable may cause the query results out of order (apache#16328)
1 parent 164b581 commit 7466e2c

File tree

13 files changed

+134
-75
lines changed

13 files changed

+134
-75
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.ArrayList;
5252
import java.util.List;
5353
import java.util.Map;
54-
import java.util.stream.Collectors;
5554

5655
public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk {
5756
private final String timeChunkName;
@@ -377,23 +376,7 @@ private TsBlock buildTsBlock() {
377376
}
378377

379378
private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOException {
380-
List<AlignedTVList> alignedTvLists =
381-
alignedTvListQueryMap.keySet().stream()
382-
.map(x -> (AlignedTVList) x)
383-
.collect(Collectors.toList());
384-
MemPointIterator timeValuePairIterator =
385-
MemPointIteratorFactory.create(
386-
dataTypes,
387-
columnIndexList,
388-
alignedTvLists,
389-
Ordering.ASC,
390-
null,
391-
timeColumnDeletion,
392-
valueColumnsDeletionList,
393-
floatPrecision,
394-
encodingList,
395-
context.isIgnoreAllNullRows(),
396-
MAX_NUMBER_OF_POINTS_IN_PAGE);
379+
MemPointIterator timeValuePairIterator = createMemPointIterator(Ordering.ASC, null);
397380

398381
while (timeValuePairIterator.hasNextTimeValuePair()) {
399382
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -475,14 +458,17 @@ public MemPointIterator getMemPointIterator() {
475458

476459
@Override
477460
public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter globalTimeFilter) {
478-
List<AlignedTVList> alignedTvLists =
479-
alignedTvListQueryMap.keySet().stream()
480-
.map(x -> (AlignedTVList) x)
481-
.collect(Collectors.toList());
461+
List<AlignedTVList> tvLists = new ArrayList<>(alignedTvListQueryMap.size());
462+
List<Integer> tvListRowCounts = new ArrayList<>(alignedTvListQueryMap.size());
463+
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
464+
tvLists.add((AlignedTVList) entry.getKey());
465+
tvListRowCounts.add(entry.getValue());
466+
}
482467
return MemPointIteratorFactory.create(
483468
dataTypes,
484469
columnIndexList,
485-
alignedTvLists,
470+
tvLists,
471+
tvListRowCounts,
486472
scanOrder,
487473
globalTimeFilter,
488474
timeColumnDeletion,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -292,17 +292,7 @@ private TsBlock buildTsBlock() {
292292

293293
// read all data in memory chunk and write to tsblock
294294
private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOException {
295-
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
296-
MemPointIterator timeValuePairIterator =
297-
MemPointIteratorFactory.create(
298-
getDataType(),
299-
tvLists,
300-
Ordering.ASC,
301-
null,
302-
deletionList,
303-
floatPrecision,
304-
encoding,
305-
MAX_NUMBER_OF_POINTS_IN_PAGE);
295+
MemPointIterator timeValuePairIterator = createMemPointIterator(Ordering.ASC, null);
306296

307297
while (timeValuePairIterator.hasNextTimeValuePair()) {
308298
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -379,10 +369,16 @@ public MemPointIterator getMemPointIterator() {
379369
}
380370

381371
public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter globalTimeFilter) {
382-
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
372+
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.size());
373+
List<Integer> tvListRowCounts = new ArrayList<>(tvListQueryMap.size());
374+
for (Map.Entry<TVList, Integer> entry : tvListQueryMap.entrySet()) {
375+
tvLists.add(entry.getKey());
376+
tvListRowCounts.add(entry.getValue());
377+
}
383378
return MemPointIteratorFactory.create(
384379
dataType,
385380
tvLists,
381+
tvListRowCounts,
386382
scanOrder,
387383
globalTimeFilter,
388384
deletionList,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1601,6 +1601,7 @@ public boolean isAllDeleted() {
16011601

16021602
public AlignedTVListIterator iterator(
16031603
Ordering scanOrder,
1604+
int rowCount,
16041605
Filter globalTimeFilter,
16051606
List<TSDataType> dataTypeList,
16061607
List<Integer> columnIndexList,
@@ -1612,6 +1613,7 @@ public AlignedTVListIterator iterator(
16121613
int maxNumberOfPointsInPage) {
16131614
return new AlignedTVListIterator(
16141615
scanOrder,
1616+
rowCount,
16151617
globalTimeFilter,
16161618
dataTypeList,
16171619
columnIndexList,
@@ -1643,6 +1645,7 @@ public class AlignedTVListIterator extends TVListIterator {
16431645

16441646
public AlignedTVListIterator(
16451647
Ordering scanOrder,
1648+
int rowCount,
16461649
Filter globalTimeFilter,
16471650
List<TSDataType> dataTypeList,
16481651
List<Integer> columnIndexList,
@@ -1652,7 +1655,7 @@ public AlignedTVListIterator(
16521655
List<TSEncoding> encodingList,
16531656
boolean ignoreAllNullRows,
16541657
int maxNumberOfPointsInPage) {
1655-
super(scanOrder, globalTimeFilter, null, null, null, maxNumberOfPointsInPage);
1658+
super(scanOrder, rowCount, globalTimeFilter, null, null, null, maxNumberOfPointsInPage);
16561659
this.dataTypeList = dataTypeList;
16571660
this.columnIndexList =
16581661
(columnIndexList == null)

0 commit comments

Comments
 (0)