Skip to content

Commit 9f2738f

Browse files
committed
Improve performance with bloom_filter_length
1 parent c8487c7 commit 9f2738f

File tree

5 files changed

+72
-6
lines changed

5 files changed

+72
-6
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,10 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
541541
if (bloomFilterOffset >= 0) {
542542
metaData.setBloom_filter_offset(bloomFilterOffset);
543543
}
544+
int bloomFilterLength = columnMetaData.getBloomFilterLength();
545+
if (bloomFilterLength >= 0) {
546+
metaData.setBloom_filter_length(bloomFilterLength);
547+
}
544548
if (columnMetaData.getStatistics() != null && !columnMetaData.getStatistics().isEmpty()) {
545549
metaData.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
546550
}
@@ -1595,6 +1599,9 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
15951599
if (metaData.isSetBloom_filter_offset()) {
15961600
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
15971601
}
1602+
if (metaData.isSetBloom_filter_length()) {
1603+
column.setBloomFilterLength(metaData.getBloom_filter_length());
1604+
}
15981605
} else { // column encrypted with column key
15991606
// Metadata will be decrypted later, if this column is accessed
16001607
EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
3333
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
3434

35+
import java.io.ByteArrayInputStream;
3536
import java.io.Closeable;
3637
import java.io.IOException;
3738
import java.io.InputStream;
@@ -1346,11 +1347,24 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
13461347
}
13471348
}
13481349

1349-
// Read Bloom filter data header.
1350+
// Seek to Bloom filter offset.
13501351
f.seek(bloomFilterOffset);
1352+
1353+
// Read Bloom filter length.
1354+
int bloomFilterLength = meta.getBloomFilterLength();
1355+
1356+
// If it is set, read Bloom filter header and bitset together.
1357+
// Otherwise, read Bloom filter header first and then bitset.
1358+
InputStream in = f;
1359+
if (bloomFilterLength > 0) {
1360+
byte[] headerAndBitSet = new byte[bloomFilterLength];
1361+
f.readFully(headerAndBitSet);
1362+
in = new ByteArrayInputStream(headerAndBitSet);
1363+
}
1364+
13511365
BloomFilterHeader bloomFilterHeader;
13521366
try {
1353-
bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
1367+
bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD);
13541368
} catch (IOException e) {
13551369
LOG.warn("read no bloom filter");
13561370
return null;
@@ -1372,9 +1386,13 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
13721386
byte[] bitset;
13731387
if (null == bloomFilterDecryptor) {
13741388
bitset = new byte[numBytes];
1375-
f.readFully(bitset);
1389+
if (in != null) {
1390+
in.read(bitset);
1391+
} else {
1392+
f.readFully(bitset);
1393+
}
13761394
} else {
1377-
bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
1395+
bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
13781396
if (bitset.length != numBytes) {
13791397
throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
13801398
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,9 @@ private static void serializeBloomFilters(
14171417
serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
14181418
}
14191419
out.write(serializedBitset);
1420+
1421+
int length = (int)(out.getPos() - offset);
1422+
column.setBloomFilterLength(length);
14201423
}
14211424
}
14221425
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ protected static boolean positiveLongFitsInAnInt(long value) {
210210
private IndexReference offsetIndexReference;
211211

212212
private long bloomFilterOffset = -1;
213+
private int bloomFilterLength = -1;
213214

214215
protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
215216
this(null, columnChunkProperties);
@@ -334,15 +335,29 @@ public void setBloomFilterOffset(long bloomFilterOffset) {
334335
}
335336

336337
/**
337-
* Method should be considered private
338-
*
338+
* @param bloomFilterLength
339+
* the reference to the Bloom filter
340+
*/
341+
public void setBloomFilterLength(int bloomFilterLength) {
342+
this.bloomFilterLength = bloomFilterLength;
343+
}
344+
345+
/**
339346
* @return the offset to the Bloom filter or {@code -1} if there is no bloom filter for this column chunk
340347
*/
341348
public long getBloomFilterOffset() {
342349
decryptIfNeeded();
343350
return bloomFilterOffset;
344351
}
345352

353+
/**
354+
* @return the length to the Bloom filter or {@code -1} if there is no bloom filter length for this column chunk
355+
*/
356+
public int getBloomFilterLength() {
357+
decryptIfNeeded();
358+
return bloomFilterLength;
359+
}
360+
346361
/**
347362
* @return all the encodings used in this column
348363
*/
@@ -633,6 +648,9 @@ protected void decryptIfNeeded() {
633648
if (metaData.isSetBloom_filter_offset()) {
634649
setBloomFilterOffset(metaData.getBloom_filter_offset());
635650
}
651+
if (metaData.isSetBloom_filter_length()) {
652+
setBloomFilterLength(metaData.getBloom_filter_length());
653+
}
636654
}
637655

638656
@Override

parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,26 @@ public void testBloomFilterOffset() throws IOException {
274274
assertEquals(1234, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
275275
}
276276

277+
@Test
278+
public void testBloomFilterLength() throws IOException {
279+
ParquetMetadata origMetaData = createParquetMetaData(null, Encoding.PLAIN);
280+
ParquetMetadataConverter converter = new ParquetMetadataConverter();
281+
282+
// Without bloom filter length
283+
FileMetaData footer = converter.toParquetMetadata(1, origMetaData);
284+
assertFalse(footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
285+
ParquetMetadata convertedMetaData = converter.fromParquetMetadata(footer);
286+
assertTrue(convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength() < 0);
287+
288+
// With bloom filter length
289+
origMetaData.getBlocks().get(0).getColumns().get(0).setBloomFilterLength(1024);
290+
footer = converter.toParquetMetadata(1, origMetaData);
291+
assertTrue(footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
292+
assertEquals(1024, footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().getBloom_filter_length());
293+
convertedMetaData = converter.fromParquetMetadata(footer);
294+
assertEquals(1024, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength());
295+
}
296+
277297
@Test
278298
public void testLogicalTypesBackwardCompatibleWithConvertedTypes() {
279299
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

0 commit comments

Comments
 (0)