Skip to content

Commit e34762d

Browse files
committed
[core] Refactor footer of lookup sst file to SortLookupStoreFooter
1 parent adcdb7b commit e34762d

File tree

11 files changed

+173
-118
lines changed

11 files changed

+173
-118
lines changed

paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import org.apache.paimon.compression.BlockCompressionFactory;
2222
import org.apache.paimon.compression.CompressOptions;
23+
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.fs.PositionOutputStream;
25+
import org.apache.paimon.fs.SeekableInputStream;
26+
import org.apache.paimon.fs.local.LocalFileIO;
2327
import org.apache.paimon.io.cache.CacheManager;
2428
import org.apache.paimon.lookup.LookupStoreFactory;
2529
import org.apache.paimon.memory.MemorySlice;
@@ -52,12 +56,16 @@ public SortLookupStoreFactory(
5256

5357
@Override
5458
public SortLookupStoreReader createReader(File file) throws IOException {
55-
return new SortLookupStoreReader(comparator, file, cacheManager);
59+
Path filePath = new Path(file.getAbsolutePath());
60+
SeekableInputStream input = LocalFileIO.INSTANCE.newInputStream(filePath);
61+
return new SortLookupStoreReader(comparator, filePath, file.length(), input, cacheManager);
5662
}
5763

5864
@Override
5965
public SortLookupStoreWriter createWriter(File file, @Nullable BloomFilter.Builder bloomFilter)
6066
throws IOException {
61-
return new SortLookupStoreWriter(file, blockSize, bloomFilter, compressionFactory);
67+
Path filePath = new Path(file.getAbsolutePath());
68+
PositionOutputStream out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
69+
return new SortLookupStoreWriter(out, blockSize, bloomFilter, compressionFactory);
6270
}
6371
}

paimon-common/src/main/java/org/apache/paimon/sst/Footer.java renamed to paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,29 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.sst;
19+
package org.apache.paimon.lookup.sort;
2020

2121
import org.apache.paimon.memory.MemorySlice;
2222
import org.apache.paimon.memory.MemorySliceInput;
2323
import org.apache.paimon.memory.MemorySliceOutput;
24+
import org.apache.paimon.sst.BlockHandle;
25+
import org.apache.paimon.sst.BloomFilterHandle;
2426

2527
import javax.annotation.Nullable;
2628

27-
import java.io.IOException;
28-
2929
import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
3030
import static org.apache.paimon.utils.Preconditions.checkArgument;
3131

3232
/** Footer for a sorted file. */
33-
public class Footer {
33+
public class SortLookupStoreFooter {
3434

3535
public static final int ENCODED_LENGTH = 36;
3636

3737
@Nullable private final BloomFilterHandle bloomFilterHandle;
3838
private final BlockHandle indexBlockHandle;
3939

40-
Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle indexBlockHandle) {
40+
public SortLookupStoreFooter(
41+
@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle indexBlockHandle) {
4142
this.bloomFilterHandle = bloomFilterHandle;
4243
this.indexBlockHandle = indexBlockHandle;
4344
}
@@ -51,7 +52,7 @@ public BlockHandle getIndexBlockHandle() {
5152
return indexBlockHandle;
5253
}
5354

54-
public static Footer readFooter(MemorySliceInput sliceInput) throws IOException {
55+
public static SortLookupStoreFooter readFooter(MemorySliceInput sliceInput) {
5556
// read bloom filter and index handles
5657
@Nullable
5758
BloomFilterHandle bloomFilterHandle =
@@ -71,16 +72,16 @@ public static Footer readFooter(MemorySliceInput sliceInput) throws IOException
7172
int magicNumber = sliceInput.readInt();
7273
checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad magic number)");
7374

74-
return new Footer(bloomFilterHandle, indexBlockHandle);
75+
return new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
7576
}
7677

77-
public static MemorySlice writeFooter(Footer footer) {
78+
public static MemorySlice writeFooter(SortLookupStoreFooter footer) {
7879
MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
7980
writeFooter(footer, output);
8081
return output.toSlice();
8182
}
8283

83-
public static void writeFooter(Footer footer, MemorySliceOutput sliceOutput) {
84+
public static void writeFooter(SortLookupStoreFooter footer, MemorySliceOutput sliceOutput) {
8485
// write bloom filter and index handles
8586
if (footer.bloomFilterHandle == null) {
8687
sliceOutput.writeLong(0);

paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@
2020

2121
import org.apache.paimon.fs.Path;
2222
import org.apache.paimon.fs.SeekableInputStream;
23-
import org.apache.paimon.fs.local.LocalFileIO;
2423
import org.apache.paimon.io.cache.CacheManager;
2524
import org.apache.paimon.lookup.LookupStoreReader;
25+
import org.apache.paimon.memory.MemorySegment;
2626
import org.apache.paimon.memory.MemorySlice;
27+
import org.apache.paimon.sst.BlockCache;
2728
import org.apache.paimon.sst.SstFileReader;
29+
import org.apache.paimon.utils.FileBasedBloomFilter;
2830

2931
import javax.annotation.Nullable;
3032

31-
import java.io.File;
3233
import java.io.IOException;
3334
import java.util.Comparator;
3435

@@ -39,11 +40,24 @@ public class SortLookupStoreReader implements LookupStoreReader {
3940
private final SstFileReader reader;
4041

4142
public SortLookupStoreReader(
42-
Comparator<MemorySlice> comparator, File file, CacheManager cacheManager)
43-
throws IOException {
44-
Path filePath = new Path(file.getAbsolutePath());
45-
this.input = LocalFileIO.INSTANCE.newInputStream(filePath);
46-
this.reader = new SstFileReader(comparator, file.length(), filePath, input, cacheManager);
43+
Comparator<MemorySlice> comparator,
44+
Path filePath,
45+
long fileLen,
46+
SeekableInputStream input,
47+
CacheManager cacheManager) {
48+
this.input = input;
49+
BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
50+
int footerLen = SortLookupStoreFooter.ENCODED_LENGTH;
51+
MemorySegment footerData =
52+
blockCache.getBlock(fileLen - footerLen, footerLen, b -> b, true);
53+
SortLookupStoreFooter footer =
54+
SortLookupStoreFooter.readFooter(MemorySlice.wrap(footerData).toInput());
55+
FileBasedBloomFilter bloomFilter =
56+
FileBasedBloomFilter.create(
57+
input, filePath, cacheManager, footer.getBloomFilterHandle());
58+
this.reader =
59+
new SstFileReader(
60+
comparator, blockCache, footer.getIndexBlockHandle(), bloomFilter);
4761
}
4862

4963
@Nullable
@@ -52,6 +66,10 @@ public byte[] lookup(byte[] key) throws IOException {
5266
return reader.lookup(key);
5367
}
5468

69+
public SstFileReader.SstFileIterator createIterator() {
70+
return reader.createIterator();
71+
}
72+
5573
@Override
5674
public void close() throws IOException {
5775
reader.close();

paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,49 @@
1919
package org.apache.paimon.lookup.sort;
2020

2121
import org.apache.paimon.compression.BlockCompressionFactory;
22-
import org.apache.paimon.fs.Path;
2322
import org.apache.paimon.fs.PositionOutputStream;
24-
import org.apache.paimon.fs.local.LocalFileIO;
2523
import org.apache.paimon.lookup.LookupStoreWriter;
24+
import org.apache.paimon.memory.MemorySlice;
25+
import org.apache.paimon.sst.BlockHandle;
26+
import org.apache.paimon.sst.BloomFilterHandle;
2627
import org.apache.paimon.sst.SstFileWriter;
2728
import org.apache.paimon.utils.BloomFilter;
2829

2930
import javax.annotation.Nullable;
3031

31-
import java.io.File;
3232
import java.io.IOException;
3333

34-
/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
34+
/**
35+
* A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. The SST File layout is as below:
36+
* (For layouts of each block type, please refer to corresponding classes)
37+
*
38+
* <pre>
39+
* +-----------------------------------+------+
40+
* | Footer | |
41+
* +-----------------------------------+ |
42+
* | Index Block | +--> Loaded on open
43+
* +-----------------------------------+ |
44+
* | Bloom Filter Block | |
45+
* +-----------------------------------+------+
46+
* | Data Block | |
47+
* +-----------------------------------+ |
48+
* | ...... | +--> Loaded on requested
49+
* +-----------------------------------+ |
50+
* | Data Block | |
51+
* +-----------------------------------+------+
52+
* </pre>
53+
*/
3554
public class SortLookupStoreWriter implements LookupStoreWriter {
3655

3756
private final SstFileWriter writer;
3857
private final PositionOutputStream out;
3958

4059
public SortLookupStoreWriter(
41-
File file,
60+
PositionOutputStream out,
4261
int blockSize,
4362
@Nullable BloomFilter.Builder bloomFilter,
44-
BlockCompressionFactory compressionFactory)
45-
throws IOException {
46-
Path filePath = new Path(file.getAbsolutePath());
47-
this.out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
63+
BlockCompressionFactory compressionFactory) {
64+
this.out = out;
4865
this.writer = new SstFileWriter(out, blockSize, bloomFilter, compressionFactory);
4966
}
5067

@@ -55,7 +72,13 @@ public void put(byte[] key, byte[] value) throws IOException {
5572

5673
@Override
5774
public void close() throws IOException {
58-
writer.close();
75+
writer.flush();
76+
BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
77+
BlockHandle indexBlockHandle = writer.writeIndexBlock();
78+
SortLookupStoreFooter footer =
79+
new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
80+
MemorySlice footerEncoding = SortLookupStoreFooter.writeFooter(footer);
81+
writer.writeSlice(footerEncoding);
5982
out.close();
6083
}
6184
}

paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@
2424

2525
/** Handle for a block. */
2626
public class BlockHandle {
27+
2728
public static final int MAX_ENCODED_LENGTH = 9 + 5;
2829

2930
private final long offset;
3031
private final int size;
3132

32-
BlockHandle(long offset, int size) {
33+
public BlockHandle(long offset, int size) {
3334
this.offset = offset;
3435
this.size = size;
3536
}

paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class BloomFilterHandle {
2727
private final int size;
2828
private final long expectedEntries;
2929

30-
BloomFilterHandle(long offset, int size, long expectedEntries) {
30+
public BloomFilterHandle(long offset, int size, long expectedEntries) {
3131
this.offset = offset;
3232
this.size = size;
3333
this.expectedEntries = expectedEntries;

paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020

2121
import org.apache.paimon.compression.BlockCompressionFactory;
2222
import org.apache.paimon.compression.BlockDecompressor;
23-
import org.apache.paimon.fs.Path;
24-
import org.apache.paimon.fs.SeekableInputStream;
25-
import org.apache.paimon.io.cache.CacheManager;
2623
import org.apache.paimon.memory.MemorySegment;
2724
import org.apache.paimon.memory.MemorySlice;
2825
import org.apache.paimon.memory.MemorySliceInput;
@@ -48,39 +45,19 @@
4845
public class SstFileReader implements Closeable {
4946

5047
private final Comparator<MemorySlice> comparator;
51-
private final Path filePath;
5248
private final BlockCache blockCache;
5349
private final BlockReader indexBlock;
5450
@Nullable private final FileBasedBloomFilter bloomFilter;
5551

5652
public SstFileReader(
5753
Comparator<MemorySlice> comparator,
58-
long fileSize,
59-
Path filePath,
60-
SeekableInputStream input,
61-
CacheManager cacheManager)
62-
throws IOException {
54+
BlockCache blockCache,
55+
BlockHandle indexBlockHandle,
56+
@Nullable FileBasedBloomFilter bloomFilter) {
6357
this.comparator = comparator;
64-
this.filePath = filePath;
65-
this.blockCache = new BlockCache(filePath, input, cacheManager);
66-
MemorySegment footerData =
67-
blockCache.getBlock(
68-
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true);
69-
Footer footer = Footer.readFooter(MemorySlice.wrap(footerData).toInput());
70-
this.indexBlock = readBlock(footer.getIndexBlockHandle(), true);
71-
BloomFilterHandle handle = footer.getBloomFilterHandle();
72-
if (handle == null) {
73-
this.bloomFilter = null;
74-
} else {
75-
this.bloomFilter =
76-
new FileBasedBloomFilter(
77-
input,
78-
filePath,
79-
cacheManager,
80-
handle.expectedEntries(),
81-
handle.offset(),
82-
handle.size());
83-
}
58+
this.blockCache = blockCache;
59+
this.indexBlock = readBlock(indexBlockHandle, true);
60+
this.bloomFilter = bloomFilter;
8461
}
8562

8663
/**
@@ -154,8 +131,8 @@ private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer
154131
checkArgument(
155132
blockTrailer.getCrc32c() == crc32cCode,
156133
String.format(
157-
"Expected CRC32C(%d) but found CRC32C(%d) for file(%s)",
158-
blockTrailer.getCrc32c(), crc32cCode, filePath));
134+
"Expected CRC32C(%d) but found CRC32C(%d)",
135+
blockTrailer.getCrc32c(), crc32cCode));
159136

160137
// decompress data
161138
BlockCompressionFactory compressionFactory =
@@ -184,11 +161,11 @@ public void close() throws IOException {
184161
bloomFilter.close();
185162
}
186163
blockCache.close();
187-
// do not need to close input, since it will be closed by outer classes
188164
}
189165

190166
/** An Iterator for range queries. */
191167
public class SstFileIterator {
168+
192169
private final BlockIterator indexIterator;
193170
private @Nullable BlockIterator seekedDataBlock = null;
194171

0 commit comments

Comments
 (0)