Skip to content

Commit dccbb57

Browse files
authored
[core] SstFileReader supports range query (#6842)
1 parent 708dec8 commit dccbb57

File tree

2 files changed

+168
-11
lines changed

2 files changed

+168
-11
lines changed

paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.memory.MemorySliceInput;
2929
import org.apache.paimon.utils.FileBasedBloomFilter;
3030
import org.apache.paimon.utils.MurmurHashUtils;
31+
import org.apache.paimon.utils.Preconditions;
3132

3233
import javax.annotation.Nullable;
3334

@@ -39,7 +40,8 @@
3940
import static org.apache.paimon.utils.Preconditions.checkArgument;
4041

4142
/**
42-
* An SST File Reader which only serves point queries now.
43+
* An SST File Reader which serves point queries and range queries. Users can call {@code
44+
* createIterator} to create a file iterator and then use seek and read methods to do range queries.
4345
*
4446
* <p>Note that this class is NOT thread-safe.
4547
*/
@@ -109,6 +111,10 @@ public byte[] lookup(byte[] key) throws IOException {
109111
return null;
110112
}
111113

114+
public SstFileIterator createIterator() {
115+
return new SstFileIterator(indexBlock.iterator());
116+
}
117+
112118
private BlockIterator getNextBlock(BlockIterator indexBlockIterator) {
113119
// index block handle, point to the key, value position.
114120
MemorySlice blockHandle = indexBlockIterator.next().getValue();
@@ -180,4 +186,55 @@ public void close() throws IOException {
180186
blockCache.close();
181187
// do not need to close input, since it will be closed by outer classes
182188
}
189+
190+
/** An Iterator for range queries. */
191+
public class SstFileIterator {
192+
private final BlockIterator indexIterator;
193+
private @Nullable BlockIterator seekedDataBlock = null;
194+
195+
SstFileIterator(BlockIterator indexBlockIterator) {
196+
this.indexIterator = indexBlockIterator;
197+
}
198+
199+
/**
200+
* Seek to the position of the record whose key is exactly equal to or greater than the
201+
* specified key.
202+
*/
203+
public void seekTo(byte[] key) {
204+
MemorySlice keySlice = MemorySlice.wrap(key);
205+
206+
indexIterator.seekTo(keySlice);
207+
if (indexIterator.hasNext()) {
208+
seekedDataBlock = getNextBlock(indexIterator);
209+
// The index block entry key is the last key of the corresponding data block.
210+
// If there is some index entry key >= targetKey, the related data block must
211+
// also contain some key >= target key, which means seekedDataBlock.hasNext()
212+
// must be true
213+
seekedDataBlock.seekTo(keySlice);
214+
Preconditions.checkState(seekedDataBlock.hasNext());
215+
} else {
216+
seekedDataBlock = null;
217+
}
218+
}
219+
220+
/**
221+
* Read a batch of records from this SST File and move current record position to the next
222+
* batch.
223+
*
224+
* @return current batch of records, null if reaching file end.
225+
*/
226+
public BlockIterator readBatch() throws IOException {
227+
if (seekedDataBlock != null) {
228+
BlockIterator result = seekedDataBlock;
229+
seekedDataBlock = null;
230+
return result;
231+
}
232+
233+
if (!indexIterator.hasNext()) {
234+
return null;
235+
}
236+
237+
return getNextBlock(indexIterator);
238+
}
239+
}
183240
}

paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java

Lines changed: 110 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import java.io.IOException;
4445
import java.util.Arrays;
4546
import java.util.Comparator;
4647
import java.util.List;
@@ -109,12 +110,16 @@ private void innerTestLookup() throws Exception {
109110

110111
// 1. lookup random existing keys
111112
for (int i = 0; i < 100; i++) {
112-
int key = random.nextInt(5000);
113+
int key = random.nextInt(5000 * 2 - 1);
113114
keyOut.reset();
114115
keyOut.writeInt(key);
115116
byte[] queried = reader.lookup(keyOut.toSlice().getHeapMemory());
116-
Assertions.assertNotNull(queried);
117-
Assertions.assertEquals(key, MemorySlice.wrap(queried).readInt(0));
117+
if (key % 2 == 0) {
118+
Assertions.assertNotNull(queried);
119+
Assertions.assertEquals(key, MemorySlice.wrap(queried).readInt(0));
120+
} else {
121+
Assertions.assertNull(queried);
122+
}
118123
}
119124

120125
// 2. lookup boundaries
@@ -125,16 +130,16 @@ private void innerTestLookup() throws Exception {
125130
Assertions.assertEquals(0, MemorySlice.wrap(queried).readInt(0));
126131

127132
keyOut.reset();
128-
keyOut.writeInt(511);
133+
keyOut.writeInt(511 * 2);
129134
byte[] queried1 = reader.lookup(keyOut.toSlice().getHeapMemory());
130135
Assertions.assertNotNull(queried1);
131-
Assertions.assertEquals(511, MemorySlice.wrap(queried1).readInt(0));
136+
Assertions.assertEquals(511 * 2, MemorySlice.wrap(queried1).readInt(0));
132137

133138
keyOut.reset();
134-
keyOut.writeInt(4999);
139+
keyOut.writeInt(4999 * 2);
135140
byte[] queried2 = reader.lookup(keyOut.toSlice().getHeapMemory());
136141
Assertions.assertNotNull(queried2);
137-
Assertions.assertEquals(4999, MemorySlice.wrap(queried2).readInt(0));
142+
Assertions.assertEquals(4999 * 2, MemorySlice.wrap(queried2).readInt(0));
138143

139144
// 2. lookup key smaller than first key
140145
for (int i = 0; i < 100; i++) {
@@ -146,12 +151,91 @@ private void innerTestLookup() throws Exception {
146151
// 3. lookup key greater than last key
147152
for (int i = 0; i < 100; i++) {
148153
keyOut.reset();
149-
keyOut.writeInt(10000 + i);
154+
keyOut.writeInt(10001 + i);
150155
Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory()));
151156
}
152157
}
153158
}
154159

160+
@TestTemplate
161+
public void testScan() throws Exception {
162+
int recordNum = 20000;
163+
writeData(recordNum, bloomFilterEnabled);
164+
165+
long fileSize = fileIO.getFileSize(file);
166+
try (SeekableInputStream inputStream = fileIO.newInputStream(file);
167+
SstFileReader reader =
168+
new SstFileReader(
169+
Comparator.comparingInt(slice -> slice.readInt(0)),
170+
fileSize,
171+
file,
172+
inputStream,
173+
CACHE_MANAGER); ) {
174+
SstFileReader.SstFileIterator fileIterator = reader.createIterator();
175+
176+
MemorySliceOutput keyOut = new MemorySliceOutput(4);
177+
178+
// 1. test full scan
179+
assertScan(0, recordNum - 1, fileIterator);
180+
181+
// 2. test random seek and scan
182+
Random random = new Random();
183+
for (int i = 0; i < 1000; i++) {
184+
resetIterator(fileIterator, keyOut);
185+
int key = random.nextInt(recordNum * 2 - 1);
186+
int targetPosition = (key + 1) / 2;
187+
188+
keyOut.reset();
189+
keyOut.writeInt(key);
190+
fileIterator.seekTo(keyOut.toSlice().getHeapMemory());
191+
192+
// lookup should not affect reader position
193+
interleaveLookup(reader, keyOut);
194+
195+
assertScan(targetPosition, recordNum - 1, fileIterator);
196+
}
197+
198+
// 3. test boundaries
199+
resetIterator(fileIterator, keyOut);
200+
keyOut.reset();
201+
keyOut.writeInt(0);
202+
fileIterator.seekTo(keyOut.toSlice().getHeapMemory());
203+
assertScan(0, recordNum - 1, fileIterator);
204+
205+
resetIterator(fileIterator, keyOut);
206+
keyOut.reset();
207+
keyOut.writeInt(recordNum * 2 - 2);
208+
fileIterator.seekTo(keyOut.toSlice().getHeapMemory());
209+
assertScan(recordNum - 1, recordNum - 1, fileIterator);
210+
211+
// 4. test out of boundaries
212+
resetIterator(fileIterator, keyOut);
213+
keyOut.reset();
214+
keyOut.writeInt(-10);
215+
fileIterator.seekTo(keyOut.toSlice().getHeapMemory());
216+
assertScan(0, recordNum - 1, fileIterator);
217+
218+
resetIterator(fileIterator, keyOut);
219+
keyOut.reset();
220+
keyOut.writeInt(recordNum * 2 + 10);
221+
fileIterator.seekTo(keyOut.toSlice().getHeapMemory());
222+
Assertions.assertNull(fileIterator.readBatch());
223+
}
224+
}
225+
226+
private void resetIterator(SstFileReader.SstFileIterator iterator, MemorySliceOutput keyOut)
227+
throws IOException {
228+
keyOut.reset();
229+
keyOut.writeInt(-1);
230+
iterator.seekTo(keyOut.toSlice().getHeapMemory());
231+
}
232+
233+
private void interleaveLookup(SstFileReader reader, MemorySliceOutput keyOut) throws Exception {
234+
keyOut.reset();
235+
keyOut.writeInt(0);
236+
reader.lookup(keyOut.toSlice().getHeapMemory());
237+
}
238+
155239
private void writeData(int recordCount, boolean withBloomFilter) throws Exception {
156240
BloomFilter.Builder bloomFilter = null;
157241
if (withBloomFilter) {
@@ -168,11 +252,27 @@ private void writeData(int recordCount, boolean withBloomFilter) throws Exceptio
168252
for (int i = 0; i < recordCount; i++) {
169253
keyOut.reset();
170254
valueOut.reset();
171-
keyOut.writeInt(i);
172-
valueOut.writeInt(i);
255+
keyOut.writeInt(i * 2);
256+
valueOut.writeInt(i * 2);
173257
writer.put(keyOut.toSlice().getHeapMemory(), valueOut.toSlice().getHeapMemory());
174258
}
175259
LOG.info("Write {} data cost {} ms", recordCount, System.currentTimeMillis() - start);
176260
}
177261
}
262+
263+
private static void assertScan(
264+
int startPosition, int lastPosition, SstFileReader.SstFileIterator iterator)
265+
throws Exception {
266+
int count = startPosition;
267+
BlockIterator iter;
268+
while ((iter = iterator.readBatch()) != null) {
269+
while (iter.hasNext()) {
270+
BlockEntry entry = iter.next();
271+
Assertions.assertEquals(count * 2, entry.getKey().readInt(0));
272+
Assertions.assertEquals(count * 2, entry.getValue().readInt(0));
273+
count++;
274+
}
275+
}
276+
Assertions.assertEquals(lastPosition, count - 1);
277+
}
178278
}

0 commit comments

Comments
 (0)