Skip to content

Commit eface4d

Browse files
authored
PARQUET-2373: Improve performance with bloom_filter_length (#1184)
1 parent ed308ff commit eface4d

File tree

6 files changed

+298
-6
lines changed

6 files changed

+298
-6
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ private void addRowGroup(
563563
if (bloomFilterOffset >= 0) {
564564
metaData.setBloom_filter_offset(bloomFilterOffset);
565565
}
566+
int bloomFilterLength = columnMetaData.getBloomFilterLength();
567+
if (bloomFilterLength >= 0) {
568+
metaData.setBloom_filter_length(bloomFilterLength);
569+
}
566570
if (columnMetaData.getStatistics() != null
567571
&& !columnMetaData.getStatistics().isEmpty()) {
568572
metaData.setStatistics(
@@ -1689,6 +1693,9 @@ public ParquetMetadata fromParquetMetadata(
16891693
if (metaData.isSetBloom_filter_offset()) {
16901694
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
16911695
}
1696+
if (metaData.isSetBloom_filter_length()) {
1697+
column.setBloomFilterLength(metaData.getBloom_filter_length());
1698+
}
16921699
} else { // column encrypted with column key
16931700
// Metadata will be decrypted later, if this column is accessed
16941701
EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
3333
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
3434

35+
import java.io.ByteArrayInputStream;
3536
import java.io.Closeable;
3637
import java.io.IOException;
3738
import java.io.InputStream;
@@ -1442,11 +1443,24 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
14421443
}
14431444
}
14441445

1445-
// Read Bloom filter data header.
1446+
// Seek to Bloom filter offset.
14461447
f.seek(bloomFilterOffset);
1448+
1449+
// Read Bloom filter length.
1450+
int bloomFilterLength = meta.getBloomFilterLength();
1451+
1452+
// If it is set, read Bloom filter header and bitset together.
1453+
// Otherwise, read Bloom filter header first and then bitset.
1454+
InputStream in = f;
1455+
if (bloomFilterLength > 0) {
1456+
byte[] headerAndBitSet = new byte[bloomFilterLength];
1457+
f.readFully(headerAndBitSet);
1458+
in = new ByteArrayInputStream(headerAndBitSet);
1459+
}
1460+
14471461
BloomFilterHeader bloomFilterHeader;
14481462
try {
1449-
bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
1463+
bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD);
14501464
} catch (IOException e) {
14511465
LOG.warn("read no bloom filter");
14521466
return null;
@@ -1472,9 +1486,9 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
14721486
byte[] bitset;
14731487
if (null == bloomFilterDecryptor) {
14741488
bitset = new byte[numBytes];
1475-
f.readFully(bitset);
1489+
in.read(bitset);
14761490
} else {
1477-
bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
1491+
bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
14781492
if (bitset.length != numBytes) {
14791493
throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
14801494
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,6 +1620,9 @@ private static void serializeBloomFilters(
16201620
serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
16211621
}
16221622
out.write(serializedBitset);
1623+
1624+
int length = (int) (out.getPos() - offset);
1625+
column.setBloomFilterLength(length);
16231626
}
16241627
}
16251628
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ protected static boolean positiveLongFitsInAnInt(long value) {
260260
private IndexReference offsetIndexReference;
261261

262262
private long bloomFilterOffset = -1;
263+
private int bloomFilterLength = -1;
263264

264265
protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
265266
this(null, columnChunkProperties);
@@ -383,15 +384,29 @@ public void setBloomFilterOffset(long bloomFilterOffset) {
383384
}
384385

385386
/**
386-
* Method should be considered private
387-
*
387+
* @param bloomFilterLength
388+
* the reference to the Bloom filter
389+
*/
390+
public void setBloomFilterLength(int bloomFilterLength) {
391+
this.bloomFilterLength = bloomFilterLength;
392+
}
393+
394+
/**
388395
* @return the offset to the Bloom filter or {@code -1} if there is no bloom filter for this column chunk
389396
*/
390397
public long getBloomFilterOffset() {
391398
decryptIfNeeded();
392399
return bloomFilterOffset;
393400
}
394401

402+
/**
403+
* @return the length to the Bloom filter or {@code -1} if there is no bloom filter length for this column chunk
404+
*/
405+
public int getBloomFilterLength() {
406+
decryptIfNeeded();
407+
return bloomFilterLength;
408+
}
409+
395410
/**
396411
* @return all the encodings used in this column
397412
*/
@@ -693,6 +708,9 @@ protected void decryptIfNeeded() {
693708
if (metaData.isSetBloom_filter_offset()) {
694709
setBloomFilterOffset(metaData.getBloom_filter_offset());
695710
}
711+
if (metaData.isSetBloom_filter_length()) {
712+
setBloomFilterLength(metaData.getBloom_filter_length());
713+
}
696714
}
697715

698716
@Override

parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,31 @@ public void testBloomFilterOffset() throws IOException {
270270
1234, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
271271
}
272272

273+
@Test
274+
public void testBloomFilterLength() throws IOException {
275+
ParquetMetadata origMetaData = createParquetMetaData(null, Encoding.PLAIN);
276+
ParquetMetadataConverter converter = new ParquetMetadataConverter();
277+
278+
// Without bloom filter length
279+
FileMetaData footer = converter.toParquetMetadata(1, origMetaData);
280+
assertFalse(
281+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
282+
ParquetMetadata convertedMetaData = converter.fromParquetMetadata(footer);
283+
assertTrue(convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength() < 0);
284+
285+
// With bloom filter length
286+
origMetaData.getBlocks().get(0).getColumns().get(0).setBloomFilterLength(1024);
287+
footer = converter.toParquetMetadata(1, origMetaData);
288+
assertTrue(
289+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
290+
assertEquals(
291+
1024,
292+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().getBloom_filter_length());
293+
convertedMetaData = converter.fromParquetMetadata(footer);
294+
assertEquals(
295+
1024, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength());
296+
}
297+
273298
@Test
274299
public void testLogicalTypesBackwardCompatibleWithConvertedTypes() {
275300
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop;
21+
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
25+
import static org.junit.Assert.fail;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
import okhttp3.OkHttpClient;
30+
import okhttp3.Request;
31+
import okhttp3.Response;
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.FSDataOutputStream;
34+
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.Path;
36+
import org.apache.parquet.ParquetReadOptions;
37+
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
38+
import org.apache.parquet.example.data.Group;
39+
import org.apache.parquet.hadoop.example.GroupReadSupport;
40+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
41+
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
42+
import org.apache.parquet.hadoop.util.HadoopInputFile;
43+
import org.apache.parquet.io.api.Binary;
44+
import org.junit.Assert;
45+
import org.junit.Test;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
public class TestInteropBloomFilter {
50+
51+
// The link includes a reference to a specific commit. To take a newer version - update this link.
52+
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/d69d979/data/";
53+
private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
54+
// parquet-testing: https://github.com/apache/parquet-testing/pull/22
55+
private static String DATA_INDEX_BLOOM_FILE = "data_index_bloom_encoding_stats.parquet";
56+
// parquet-testing: https://github.com/apache/parquet-testing/pull/43
57+
private static String DATA_INDEX_BLOOM_WITH_LENGTH_FILE = "data_index_bloom_encoding_with_length.parquet";
58+
59+
private static final Logger LOG = LoggerFactory.getLogger(TestInteropBloomFilter.class);
60+
private OkHttpClient httpClient = new OkHttpClient();
61+
62+
@Test
63+
public void testReadDataIndexBloomParquetFiles() throws IOException {
64+
Path rootPath = new Path(PARQUET_TESTING_PATH);
65+
LOG.info("======== testReadDataIndexBloomParquetFiles {} ========", rootPath);
66+
67+
Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_FILE, httpClient);
68+
69+
int expectedRowCount = 14;
70+
String[] expectedValues = new String[] {
71+
"Hello",
72+
"This is",
73+
"a",
74+
"test",
75+
"How",
76+
"are you",
77+
"doing ",
78+
"today",
79+
"the quick",
80+
"brown fox",
81+
"jumps",
82+
"over",
83+
"the lazy",
84+
"dog"
85+
};
86+
87+
String[] unexpectedValues = new String[] {"b", "c", "d"};
88+
89+
try (ParquetReader<Group> reader =
90+
ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
91+
for (int i = 0; i < expectedRowCount; ++i) {
92+
Group group = reader.read();
93+
if (group == null) {
94+
fail("Should not reach end of file");
95+
}
96+
assertEquals(expectedValues[i], group.getString(0, 0));
97+
}
98+
}
99+
100+
ParquetFileReader reader = new ParquetFileReader(
101+
HadoopInputFile.fromPath(filePath, new Configuration()),
102+
ParquetReadOptions.builder().build());
103+
List<BlockMetaData> blocks = reader.getRowGroups();
104+
blocks.forEach(block -> {
105+
try {
106+
assertEquals(14, block.getRowCount());
107+
ColumnChunkMetaData idMeta = block.getColumns().get(0);
108+
BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
109+
Assert.assertNotNull(bloomFilter);
110+
assertEquals(192, idMeta.getBloomFilterOffset());
111+
assertEquals(-1, idMeta.getBloomFilterLength());
112+
for (int i = 0; i < expectedRowCount; ++i) {
113+
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
114+
}
115+
for (int i = 0; i < unexpectedValues.length; ++i) {
116+
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
117+
}
118+
assertEquals(152, idMeta.getTotalSize());
119+
assertEquals(163, idMeta.getTotalUncompressedSize());
120+
assertEquals(181, idMeta.getOffsetIndexReference().getOffset());
121+
assertEquals(11, idMeta.getOffsetIndexReference().getLength());
122+
assertEquals(156, idMeta.getColumnIndexReference().getOffset());
123+
assertEquals(25, idMeta.getColumnIndexReference().getLength());
124+
} catch (IOException e) {
125+
fail("Should not throw exception: " + e.getMessage());
126+
}
127+
});
128+
}
129+
130+
@Test
131+
public void testReadDataIndexBloomWithLengthParquetFiles() throws IOException {
132+
Path rootPath = new Path(PARQUET_TESTING_PATH);
133+
LOG.info("======== testReadDataIndexBloomWithLengthParquetFiles {} ========", rootPath);
134+
135+
Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_WITH_LENGTH_FILE, httpClient);
136+
137+
int expectedRowCount = 14;
138+
String[] expectedValues = new String[] {
139+
"Hello",
140+
"This is",
141+
"a",
142+
"test",
143+
"How",
144+
"are you",
145+
"doing ",
146+
"today",
147+
"the quick",
148+
"brown fox",
149+
"jumps",
150+
"over",
151+
"the lazy",
152+
"dog"
153+
};
154+
155+
String[] unexpectedValues = new String[] {"b", "c", "d"};
156+
157+
try (ParquetReader<Group> reader =
158+
ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
159+
for (int i = 0; i < expectedRowCount; ++i) {
160+
Group group = reader.read();
161+
if (group == null) {
162+
fail("Should not reach end of file");
163+
}
164+
assertEquals(expectedValues[i], group.getString(0, 0));
165+
}
166+
}
167+
168+
ParquetFileReader reader = new ParquetFileReader(
169+
HadoopInputFile.fromPath(filePath, new Configuration()),
170+
ParquetReadOptions.builder().build());
171+
List<BlockMetaData> blocks = reader.getRowGroups();
172+
blocks.forEach(block -> {
173+
try {
174+
assertEquals(14, block.getRowCount());
175+
ColumnChunkMetaData idMeta = block.getColumns().get(0);
176+
BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
177+
Assert.assertNotNull(bloomFilter);
178+
assertEquals(253, idMeta.getBloomFilterOffset());
179+
assertEquals(2064, idMeta.getBloomFilterLength());
180+
for (int i = 0; i < expectedRowCount; ++i) {
181+
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
182+
}
183+
for (int i = 0; i < unexpectedValues.length; ++i) {
184+
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
185+
}
186+
assertEquals(199, idMeta.getTotalSize());
187+
assertEquals(199, idMeta.getTotalUncompressedSize());
188+
assertEquals(2342, idMeta.getOffsetIndexReference().getOffset());
189+
assertEquals(11, idMeta.getOffsetIndexReference().getLength());
190+
assertEquals(2317, idMeta.getColumnIndexReference().getOffset());
191+
assertEquals(25, idMeta.getColumnIndexReference().getLength());
192+
} catch (Exception e) {
193+
fail("Should not throw exception: " + e.getMessage());
194+
}
195+
});
196+
}
197+
198+
private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
199+
LOG.info("Download interOp files if needed");
200+
Configuration conf = new Configuration();
201+
FileSystem fs = rootPath.getFileSystem(conf);
202+
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
203+
if (!fs.exists(rootPath)) {
204+
LOG.info("Create folder for interOp files: " + rootPath);
205+
if (!fs.mkdirs(rootPath)) {
206+
throw new IOException("Cannot create path " + rootPath);
207+
}
208+
}
209+
210+
Path file = new Path(rootPath, fileName);
211+
if (!fs.exists(file)) {
212+
String downloadUrl = PARQUET_TESTING_REPO + fileName;
213+
LOG.info("Download interOp file: " + downloadUrl);
214+
Request request = new Request.Builder().url(downloadUrl).build();
215+
Response response = httpClient.newCall(request).execute();
216+
if (!response.isSuccessful()) {
217+
throw new IOException("Failed to download file: " + response);
218+
}
219+
try (FSDataOutputStream fdos = fs.create(file)) {
220+
fdos.write(response.body().bytes());
221+
}
222+
}
223+
return file;
224+
}
225+
}

0 commit comments

Comments
 (0)