Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public static enum JobSummaryLevel {
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.adaptive.enabled";
public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number";
/** If true, selected writer configuration values will be stored in the Parquet footer's key-value metadata. */
public static final String PERSIST_WRITER_CONFIG = "parquet.persist.writer.config";

public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
Expand Down Expand Up @@ -379,6 +382,18 @@ static int getBlockRowCountLimit(Configuration conf) {
return conf.getInt(BLOCK_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT);
}

public static void setPersistWriterConfig(JobContext jobContext, boolean enabled) {
setPersistWriterConfig(getConfiguration(jobContext), enabled);
}

public static void setPersistWriterConfig(Configuration conf, boolean enabled) {
conf.setBoolean(PERSIST_WRITER_CONFIG, enabled);
}

static boolean getPersistWriterConfig(Configuration conf) {
return conf.getBoolean(PERSIST_WRITER_CONFIG, false);
}

public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,32 @@ public long getDataSize() {
* @param <SELF> The type of this builder that is returned by builder methods
*/
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private static final String CFG_BLOCK_SIZE = "parquet.block.size";
private static final String CFG_BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
private static final String CFG_PAGE_SIZE = "parquet.page.size";
private static final String CFG_PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
private static final String CFG_DICT_PAGE_SIZE = "parquet.dictionary.page.size";
private static final String CFG_COMPRESSION = "parquet.compression.codec";
private static final String CFG_DICT_ENABLED = "parquet.dictionary.enabled";
private static final String CFG_BLOOM_MAX_BYTES = "parquet.bloom.filter.max.bytes";
private static final String CFG_VALIDATION_ENABLED = "parquet.validation.enabled";
private static final String CFG_PAGE_CHECKSUM_ENABLED = "parquet.page.write.checksum.enabled";
private static final String CFG_WRITER_VERSION = "parquet.writer.version";
private static final String CFG_MAX_PADDING_SIZE = "parquet.max.padding.size";

private static final class ConfigCollector {
private final java.util.LinkedHashMap<String, String> m = new java.util.LinkedHashMap<>();

ConfigCollector add(String k, Object v) {
m.put(k, String.valueOf(v));
return this;
}

java.util.Map<String, String> done() {
return m;
}
}

private OutputFile file = null;
private Path path = null;
private FileEncryptionProperties encryptionProperties = null;
Expand Down Expand Up @@ -963,7 +989,14 @@ public ParquetWriter<T> build() throws IOException {
if (conf == null) {
conf = new HadoopParquetConfiguration();
}
ParquetProperties encodingProps = encodingPropsBuilder.build();

final ParquetProperties propsSnapshot = encodingPropsBuilder.build();

if (conf.getBoolean(org.apache.parquet.hadoop.ParquetOutputFormat.PERSIST_WRITER_CONFIG, false)) {
encodingPropsBuilder.withExtraMetaData(buildWriterConfigMetadata(propsSnapshot));
}

final ParquetProperties encodingProps = encodingPropsBuilder.build();
if (codecFactory == null) {
codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
}
Expand All @@ -983,5 +1016,25 @@ public ParquetWriter<T> build() throws IOException {
encodingProps,
encryptionProperties);
}

/**
* Builds a map of writer configuration parameters to be persisted in footer metadata.
*/
private java.util.Map<String, String> buildWriterConfigMetadata(ParquetProperties props) {
return new ConfigCollector()
.add(CFG_BLOCK_SIZE, rowGroupSize)
.add(CFG_BLOCK_ROW_COUNT_LIMIT, props.getRowGroupRowCountLimit())
.add(CFG_PAGE_SIZE, props.getPageSizeThreshold())
.add(CFG_PAGE_ROW_COUNT_LIMIT, props.getPageRowCountLimit())
.add(CFG_DICT_PAGE_SIZE, props.getDictionaryPageSizeThreshold())
.add(CFG_COMPRESSION, codecName.name())
.add(CFG_DICT_ENABLED, props.isEnableDictionary())
.add(CFG_BLOOM_MAX_BYTES, props.getMaxBloomFilterBytes())
.add(CFG_VALIDATION_ENABLED, enableValidation)
.add(CFG_PAGE_CHECKSUM_ENABLED, props.getPageWriteChecksumEnabled())
.add(CFG_WRITER_VERSION, props.getWriterVersion().name())
.add(CFG_MAX_PADDING_SIZE, maxPaddingSize)
.done();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
Expand Down Expand Up @@ -456,6 +457,116 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
3);
}

@Test
public void testWriterConfigPersisted() throws Exception {
Configuration conf = new Configuration();
ParquetOutputFormat.setBlockRowCountLimit(conf, 2);
ParquetOutputFormat.setPersistWriterConfig(conf, true);

MessageType schema = Types.buildMessage()
.required(BINARY)
.as(stringType())
.named("str")
.named("msg");
GroupWriteSupport.setSchema(schema, conf);

java.io.File file = new File(temp.getRoot(), "testWriterConfigPersisted.parquet");
Path path = new Path(file.toURI());

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withConf(conf)
.withRowGroupRowCountLimit(2)
.build()) {
writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "a"));
writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "b"));
}

try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
java.util.Map<String, String> kv = r.getFooter().getFileMetaData().getKeyValueMetaData();
Assert.assertEquals("2", kv.get(ParquetOutputFormat.BLOCK_ROW_COUNT_LIMIT));
}
}

@Test
public void testWriterConfigNotPersistedByDefault() throws Exception {
Configuration conf = new Configuration();
ParquetOutputFormat.setBlockRowCountLimit(conf, 2);

MessageType schema = Types.buildMessage()
.required(BINARY)
.as(stringType())
.named("str")
.named("msg");
GroupWriteSupport.setSchema(schema, conf);

java.io.File file = new File(temp.getRoot(), "testWriterConfigNotPersistedByDefault.parquet");
Path path = new Path(file.toURI());

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withConf(conf)
.withRowGroupRowCountLimit(2)
.build()) {
writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "a"));
writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "b"));
}

try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
java.util.Map<String, String> kv = r.getFooter().getFileMetaData().getKeyValueMetaData();
Assert.assertNull(
"Writer config should not be persisted by default",
kv.get(ParquetOutputFormat.BLOCK_ROW_COUNT_LIMIT));
}
}

@Test
public void testComprehensiveWriterConfigPersisted() throws Exception {
Configuration conf = new Configuration();
ParquetOutputFormat.setPersistWriterConfig(conf, true);

MessageType schema = Types.buildMessage()
.required(BINARY)
.as(stringType())
.named("str")
.named("msg");
GroupWriteSupport.setSchema(schema, conf);

java.io.File file = new File(temp.getRoot(), "testComprehensiveWriterConfigPersisted.parquet");
Path path = new Path(file.toURI());

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withConf(conf)
.withRowGroupRowCountLimit(1000)
.withPageSize(1024)
.withPageRowCountLimit(500)
.withDictionaryPageSize(2048)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withDictionaryEncoding(true)
.withMaxBloomFilterBytes(1024 * 1024)
.withValidation(false)
.withPageWriteChecksumEnabled(true)
.withRowGroupSize(64 * 1024 * 1024)
.withMaxPaddingSize(8 * 1024 * 1024)
.build()) {
writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "test"));
}

try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
java.util.Map<String, String> kv = r.getFooter().getFileMetaData().getKeyValueMetaData();

Assert.assertEquals("1000", kv.get("parquet.block.row.count.limit"));
Assert.assertEquals("67108864", kv.get("parquet.block.size"));
Assert.assertEquals("1024", kv.get("parquet.page.size"));
Assert.assertEquals("500", kv.get("parquet.page.row.count.limit"));
Assert.assertEquals("2048", kv.get("parquet.dictionary.page.size"));
Assert.assertEquals("SNAPPY", kv.get("parquet.compression.codec"));
Assert.assertEquals("true", kv.get("parquet.dictionary.enabled"));
Assert.assertEquals("1048576", kv.get("parquet.bloom.filter.max.bytes"));
Assert.assertEquals("false", kv.get("parquet.validation.enabled"));
Assert.assertEquals("true", kv.get("parquet.page.write.checksum.enabled"));
Assert.assertEquals("8388608", kv.get("parquet.max.padding.size"));
}
}

@Test
public void testExtraMetaData() throws Exception {
final Configuration conf = new Configuration();
Expand Down