Skip to content

Commit c869d90

Browse files
authored
[core] Refactor Index Reader in BlockReader (#6865)
1 parent b9ccee6 commit c869d90

File tree

4 files changed

+56
-49
lines changed

4 files changed

+56
-49
lines changed

paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,33 +21,25 @@
2121
import org.apache.paimon.memory.MemorySlice;
2222
import org.apache.paimon.memory.MemorySliceInput;
2323

24-
import java.util.Comparator;
2524
import java.util.Iterator;
2625
import java.util.Map;
2726
import java.util.NoSuchElementException;
2827

29-
import static java.util.Objects.requireNonNull;
30-
3128
/** An {@link Iterator} for a block. */
32-
public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice, MemorySlice>> {
33-
34-
protected final MemorySliceInput data;
35-
36-
private final int recordCount;
37-
private final Comparator<MemorySlice> comparator;
29+
public class BlockIterator implements Iterator<Map.Entry<MemorySlice, MemorySlice>> {
3830

31+
private final BlockReader reader;
32+
private final MemorySliceInput input;
3933
private BlockEntry polled;
4034

41-
public BlockIterator(
42-
MemorySliceInput data, int recordCount, Comparator<MemorySlice> comparator) {
43-
this.data = data;
44-
this.recordCount = recordCount;
45-
this.comparator = comparator;
35+
public BlockIterator(BlockReader reader) {
36+
this.reader = reader;
37+
this.input = reader.blockInput();
4638
}
4739

4840
@Override
4941
public boolean hasNext() {
50-
return polled != null || data.isReadable();
42+
return polled != null || input.isReadable();
5143
}
5244

5345
@Override
@@ -72,14 +64,14 @@ public void remove() {
7264

7365
public boolean seekTo(MemorySlice targetKey) {
7466
int left = 0;
75-
int right = recordCount - 1;
67+
int right = reader.recordCount() - 1;
7668

7769
while (left <= right) {
7870
int mid = left + (right - left) / 2;
7971

80-
seekTo(mid);
72+
input.setPosition(reader.seekTo(mid));
8173
BlockEntry midEntry = readEntry();
82-
int compare = comparator.compare(midEntry.getKey(), targetKey);
74+
int compare = reader.comparator().compare(midEntry.getKey(), targetKey);
8375

8476
if (compare == 0) {
8577
polled = midEntry;
@@ -96,18 +88,13 @@ public boolean seekTo(MemorySlice targetKey) {
9688
return false;
9789
}
9890

99-
/** Seek to the specified record position of current block. */
100-
public abstract void seekTo(int recordPosition);
101-
10291
private BlockEntry readEntry() {
103-
requireNonNull(data, "data is null");
104-
10592
int keyLength;
106-
keyLength = data.readVarLenInt();
107-
MemorySlice key = data.readSlice(keyLength);
93+
keyLength = input.readVarLenInt();
94+
MemorySlice key = input.readSlice(keyLength);
10895

109-
int valueLength = data.readVarLenInt();
110-
MemorySlice value = data.readSlice(valueLength);
96+
int valueLength = input.readVarLenInt();
97+
MemorySlice value = input.readSlice(valueLength);
11198

11299
return new BlockEntry(key, value);
113100
}

paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,69 +19,88 @@
1919
package org.apache.paimon.sst;
2020

2121
import org.apache.paimon.memory.MemorySlice;
22+
import org.apache.paimon.memory.MemorySliceInput;
2223

2324
import java.util.Comparator;
2425

2526
import static org.apache.paimon.sst.BlockAlignedType.ALIGNED;
2627

2728
/** Reader for a block. */
28-
public class BlockReader {
29+
public abstract class BlockReader {
30+
2931
private final MemorySlice block;
32+
private final int recordCount;
3033
private final Comparator<MemorySlice> comparator;
3134

32-
public BlockReader(MemorySlice block, Comparator<MemorySlice> comparator) {
35+
private BlockReader(MemorySlice block, int recordCount, Comparator<MemorySlice> comparator) {
3336
this.block = block;
37+
this.recordCount = recordCount;
3438
this.comparator = comparator;
3539
}
3640

37-
public long size() {
38-
return block.length();
41+
public MemorySliceInput blockInput() {
42+
return block.toInput();
43+
}
44+
45+
public int recordCount() {
46+
return recordCount;
47+
}
48+
49+
public Comparator<MemorySlice> comparator() {
50+
return comparator;
3951
}
4052

4153
public BlockIterator iterator() {
54+
return new BlockIterator(this);
55+
}
56+
57+
/** Seek to slice position from record position. */
58+
public abstract int seekTo(int recordPosition);
59+
60+
public static BlockReader create(MemorySlice block, Comparator<MemorySlice> comparator) {
4261
BlockAlignedType alignedType =
4362
BlockAlignedType.fromByte(block.readByte(block.length() - 1));
4463
int intValue = block.readInt(block.length() - 5);
4564
if (alignedType == ALIGNED) {
46-
return new AlignedIterator(block.slice(0, block.length() - 5), intValue, comparator);
65+
return new AlignedBlockReader(block.slice(0, block.length() - 5), intValue, comparator);
4766
} else {
4867
int indexLength = intValue * 4;
4968
int indexOffset = block.length() - 5 - indexLength;
5069
MemorySlice data = block.slice(0, indexOffset);
5170
MemorySlice index = block.slice(indexOffset, indexLength);
52-
return new UnalignedIterator(data, index, comparator);
71+
return new UnalignedBlockReader(data, index, comparator);
5372
}
5473
}
5574

56-
private static class AlignedIterator extends BlockIterator {
75+
private static class AlignedBlockReader extends BlockReader {
5776

5877
private final int recordSize;
5978

60-
public AlignedIterator(
79+
public AlignedBlockReader(
6180
MemorySlice data, int recordSize, Comparator<MemorySlice> comparator) {
62-
super(data.toInput(), data.length() / recordSize, comparator);
81+
super(data, data.length() / recordSize, comparator);
6382
this.recordSize = recordSize;
6483
}
6584

6685
@Override
67-
public void seekTo(int recordPosition) {
68-
data.setPosition(recordPosition * recordSize);
86+
public int seekTo(int recordPosition) {
87+
return recordPosition * recordSize;
6988
}
7089
}
7190

72-
private static class UnalignedIterator extends BlockIterator {
91+
private static class UnalignedBlockReader extends BlockReader {
7392

7493
private final MemorySlice index;
7594

76-
public UnalignedIterator(
95+
public UnalignedBlockReader(
7796
MemorySlice data, MemorySlice index, Comparator<MemorySlice> comparator) {
78-
super(data.toInput(), index.length() / 4, comparator);
97+
super(data, index.length() / 4, comparator);
7998
this.index = index;
8099
}
81100

82101
@Override
83-
public void seekTo(int recordPosition) {
84-
data.setPosition(index.readInt(recordPosition * 4));
102+
public int seekTo(int recordPosition) {
103+
return index.readInt(recordPosition * 4);
85104
}
86105
}
87106
}

paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class SstFileReader implements Closeable {
4848
private final Comparator<MemorySlice> comparator;
4949
private final Path filePath;
5050
private final BlockCache blockCache;
51-
private final BlockIterator indexBlockIterator;
51+
private final BlockReader indexBlock;
5252
@Nullable private final FileBasedBloomFilter bloomFilter;
5353

5454
public SstFileReader(
@@ -65,7 +65,7 @@ public SstFileReader(
6565
blockCache.getBlock(
6666
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true);
6767
Footer footer = Footer.readFooter(MemorySlice.wrap(footerData).toInput());
68-
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator();
68+
this.indexBlock = readBlock(footer.getIndexBlockHandle(), true);
6969
BloomFilterHandle handle = footer.getBloomFilterHandle();
7070
if (handle == null) {
7171
this.bloomFilter = null;
@@ -95,20 +95,21 @@ public byte[] lookup(byte[] key) throws IOException {
9595

9696
MemorySlice keySlice = MemorySlice.wrap(key);
9797
// seek the index to the block containing the key
98+
BlockIterator indexBlockIterator = indexBlock.iterator();
9899
indexBlockIterator.seekTo(keySlice);
99100

100101
// if indexIterator does not have a next, it means the key does not exist in this iterator
101102
if (indexBlockIterator.hasNext()) {
102103
// seek the current iterator to the key
103-
BlockIterator current = getNextBlock();
104+
BlockIterator current = getNextBlock(indexBlockIterator);
104105
if (current.seekTo(keySlice)) {
105106
return current.next().getValue().copyBytes();
106107
}
107108
}
108109
return null;
109110
}
110111

111-
private BlockIterator getNextBlock() {
112+
private BlockIterator getNextBlock(BlockIterator indexBlockIterator) {
112113
// index block handle, point to the key, value position.
113114
MemorySlice blockHandle = indexBlockIterator.next().getValue();
114115
BlockReader dataBlock =
@@ -138,7 +139,7 @@ private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
138139
blockHandle.size(),
139140
bytes -> decompressBlock(bytes, blockTrailer),
140141
index);
141-
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
142+
return BlockReader.create(MemorySlice.wrap(unCompressedBlock), comparator);
142143
}
143144

144145
private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) {

paimon-common/src/test/java/org/apache/paimon/sst/BlockIteratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void testUnalignedIterator() throws IOException {
4646

4747
public void innerTest(boolean aligned) throws IOException {
4848
MemorySlice data = writeBlock(aligned);
49-
BlockIterator iterator = new BlockReader(data, COMPARATOR).iterator();
49+
BlockIterator iterator = BlockReader.create(data, COMPARATOR).iterator();
5050

5151
// 1. test for normal cases:
5252
final int step = 3;

0 commit comments

Comments
 (0)