Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ private void addRowGroup(
if (bloomFilterOffset >= 0) {
metaData.setBloom_filter_offset(bloomFilterOffset);
}
int bloomFilterLength = columnMetaData.getBloomFilterLength();
if (bloomFilterLength >= 0) {
metaData.setBloom_filter_length(bloomFilterLength);
}
if (columnMetaData.getStatistics() != null
&& !columnMetaData.getStatistics().isEmpty()) {
metaData.setStatistics(
Expand Down Expand Up @@ -1689,6 +1693,9 @@ public ParquetMetadata fromParquetMetadata(
if (metaData.isSetBloom_filter_offset()) {
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
}
if (metaData.isSetBloom_filter_length()) {
column.setBloomFilterLength(metaData.getBloom_filter_length());
}
} else { // column encrypted with column key
// Metadata will be decrypted later, if this column is accessed
EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -1442,11 +1443,24 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
}
}

// Read Bloom filter data header.
// Seek to Bloom filter offset.
f.seek(bloomFilterOffset);

// Read Bloom filter length.
int bloomFilterLength = meta.getBloomFilterLength();

// If it is set, read Bloom filter header and bitset together.
// Otherwise, read Bloom filter header first and then bitset.
InputStream in = f;
if (bloomFilterLength > 0) {
byte[] headerAndBitSet = new byte[bloomFilterLength];
f.readFully(headerAndBitSet);
in = new ByteArrayInputStream(headerAndBitSet);
}

BloomFilterHeader bloomFilterHeader;
try {
bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD);
} catch (IOException e) {
LOG.warn("read no bloom filter");
return null;
Expand All @@ -1472,9 +1486,9 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
byte[] bitset;
if (null == bloomFilterDecryptor) {
bitset = new byte[numBytes];
f.readFully(bitset);
in.read(bitset);
} else {
bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
if (bitset.length != numBytes) {
throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,9 @@ private static void serializeBloomFilters(
serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
}
out.write(serializedBitset);

int length = (int) (out.getPos() - offset);
column.setBloomFilterLength(length);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ protected static boolean positiveLongFitsInAnInt(long value) {
private IndexReference offsetIndexReference;

private long bloomFilterOffset = -1;
private int bloomFilterLength = -1;

protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
this(null, columnChunkProperties);
Expand Down Expand Up @@ -383,15 +384,29 @@ public void setBloomFilterOffset(long bloomFilterOffset) {
}

/**
* Method should be considered private
*
* @param bloomFilterLength
* the reference to the Bloom filter
*/
public void setBloomFilterLength(int bloomFilterLength) {
this.bloomFilterLength = bloomFilterLength;
}

/**
* @return the offset to the Bloom filter or {@code -1} if there is no bloom filter for this column chunk
*/
public long getBloomFilterOffset() {
decryptIfNeeded();
return bloomFilterOffset;
}

/**
* @return the length to the Bloom filter or {@code -1} if there is no bloom filter length for this column chunk
*/
public int getBloomFilterLength() {
decryptIfNeeded();
return bloomFilterLength;
}

/**
* @return all the encodings used in this column
*/
Expand Down Expand Up @@ -693,6 +708,9 @@ protected void decryptIfNeeded() {
if (metaData.isSetBloom_filter_offset()) {
setBloomFilterOffset(metaData.getBloom_filter_offset());
}
if (metaData.isSetBloom_filter_length()) {
setBloomFilterLength(metaData.getBloom_filter_length());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,31 @@ public void testBloomFilterOffset() throws IOException {
1234, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
}

@Test
public void testBloomFilterLength() throws IOException {
ParquetMetadata origMetaData = createParquetMetaData(null, Encoding.PLAIN);
ParquetMetadataConverter converter = new ParquetMetadataConverter();

// Without bloom filter length
FileMetaData footer = converter.toParquetMetadata(1, origMetaData);
assertFalse(
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
ParquetMetadata convertedMetaData = converter.fromParquetMetadata(footer);
assertTrue(convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength() < 0);

// With bloom filter length
origMetaData.getBlocks().get(0).getColumns().get(0).setBloomFilterLength(1024);
footer = converter.toParquetMetadata(1, origMetaData);
assertTrue(
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
assertEquals(
1024,
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().getBloom_filter_length());
convertedMetaData = converter.fromParquetMetadata(footer);
assertEquals(
1024, convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength());
}

@Test
public void testLogicalTypesBackwardCompatibleWithConvertedTypes() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.List;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestInteropBloomFilter {

// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/d69d979/data/";
private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
// parquet-testing: https://github.com/apache/parquet-testing/pull/22
private static String DATA_INDEX_BLOOM_FILE = "data_index_bloom_encoding_stats.parquet";
// parquet-testing: https://github.com/apache/parquet-testing/pull/43
private static String DATA_INDEX_BLOOM_WITH_LENGTH_FILE = "data_index_bloom_encoding_with_length.parquet";

private static final Logger LOG = LoggerFactory.getLogger(TestInteropBloomFilter.class);
private OkHttpClient httpClient = new OkHttpClient();

@Test
public void testReadDataIndexBloomParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testReadDataIndexBloomParquetFiles {} ========", rootPath);

Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_FILE, httpClient);

int expectedRowCount = 14;
String[] expectedValues = new String[] {
"Hello",
"This is",
"a",
"test",
"How",
"are you",
"doing ",
"today",
"the quick",
"brown fox",
"jumps",
"over",
"the lazy",
"dog"
};

String[] unexpectedValues = new String[] {"b", "c", "d"};

try (ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
for (int i = 0; i < expectedRowCount; ++i) {
Group group = reader.read();
if (group == null) {
fail("Should not reach end of file");
}
assertEquals(expectedValues[i], group.getString(0, 0));
}
}

ParquetFileReader reader = new ParquetFileReader(
HadoopInputFile.fromPath(filePath, new Configuration()),
ParquetReadOptions.builder().build());
List<BlockMetaData> blocks = reader.getRowGroups();
blocks.forEach(block -> {
try {
assertEquals(14, block.getRowCount());
ColumnChunkMetaData idMeta = block.getColumns().get(0);
BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
Assert.assertNotNull(bloomFilter);
assertEquals(192, idMeta.getBloomFilterOffset());
assertEquals(-1, idMeta.getBloomFilterLength());
for (int i = 0; i < expectedRowCount; ++i) {
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
}
for (int i = 0; i < unexpectedValues.length; ++i) {
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
}
assertEquals(152, idMeta.getTotalSize());
assertEquals(163, idMeta.getTotalUncompressedSize());
assertEquals(181, idMeta.getOffsetIndexReference().getOffset());
assertEquals(11, idMeta.getOffsetIndexReference().getLength());
assertEquals(156, idMeta.getColumnIndexReference().getOffset());
assertEquals(25, idMeta.getColumnIndexReference().getLength());
} catch (IOException e) {
fail("Should not throw exception: " + e.getMessage());
}
});
}

@Test
public void testReadDataIndexBloomWithLengthParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testReadDataIndexBloomWithLengthParquetFiles {} ========", rootPath);

Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_WITH_LENGTH_FILE, httpClient);

int expectedRowCount = 14;
String[] expectedValues = new String[] {
"Hello",
"This is",
"a",
"test",
"How",
"are you",
"doing ",
"today",
"the quick",
"brown fox",
"jumps",
"over",
"the lazy",
"dog"
};

String[] unexpectedValues = new String[] {"b", "c", "d"};

try (ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
for (int i = 0; i < expectedRowCount; ++i) {
Group group = reader.read();
if (group == null) {
fail("Should not reach end of file");
}
assertEquals(expectedValues[i], group.getString(0, 0));
}
}

ParquetFileReader reader = new ParquetFileReader(
HadoopInputFile.fromPath(filePath, new Configuration()),
ParquetReadOptions.builder().build());
List<BlockMetaData> blocks = reader.getRowGroups();
blocks.forEach(block -> {
try {
assertEquals(14, block.getRowCount());
ColumnChunkMetaData idMeta = block.getColumns().get(0);
BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
Assert.assertNotNull(bloomFilter);
assertEquals(253, idMeta.getBloomFilterOffset());
assertEquals(2064, idMeta.getBloomFilterLength());
for (int i = 0; i < expectedRowCount; ++i) {
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
}
for (int i = 0; i < unexpectedValues.length; ++i) {
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
}
assertEquals(199, idMeta.getTotalSize());
assertEquals(199, idMeta.getTotalUncompressedSize());
assertEquals(2342, idMeta.getOffsetIndexReference().getOffset());
assertEquals(11, idMeta.getOffsetIndexReference().getLength());
assertEquals(2317, idMeta.getColumnIndexReference().getOffset());
assertEquals(25, idMeta.getColumnIndexReference().getLength());
} catch (Exception e) {
fail("Should not throw exception: " + e.getMessage());
}
});
}

private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
LOG.info("Download interOp files if needed");
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
return file;
}
}