diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 4036668683..496a509a0c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -154,6 +154,9 @@ public static enum JobSummaryLevel { public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp"; public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.adaptive.enabled"; public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number"; + /** If true, selected writer configuration values will be stored in the Parquet footer's key-value metadata. */ + public static final String PERSIST_WRITER_CONFIG = "parquet.persist.writer.config"; + public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; @@ -379,6 +382,18 @@ static int getBlockRowCountLimit(Configuration conf) { return conf.getInt(BLOCK_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT); } + public static void setPersistWriterConfig(JobContext jobContext, boolean enabled) { + setPersistWriterConfig(getConfiguration(jobContext), enabled); + } + + public static void setPersistWriterConfig(Configuration conf, boolean enabled) { + conf.setBoolean(PERSIST_WRITER_CONFIG, enabled); + } + + static boolean getPersistWriterConfig(Configuration conf) { + return conf.getBoolean(PERSIST_WRITER_CONFIG, false); + } + public static void setPageRowCountLimit(JobContext jobContext, int rowCount) { setPageRowCountLimit(getConfiguration(jobContext), rowCount); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7789cad5c0..a1f0e0db24 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -464,6 +464,32 @@ public long getDataSize() { * @param The type of this builder that is returned by builder methods */ public abstract static class Builder> { + private static final String CFG_BLOCK_SIZE = "parquet.block.size"; + private static final String CFG_BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit"; + private static final String CFG_PAGE_SIZE = "parquet.page.size"; + private static final String CFG_PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; + private static final String CFG_DICT_PAGE_SIZE = "parquet.dictionary.page.size"; + private static final String CFG_COMPRESSION = "parquet.compression.codec"; + private static final String CFG_DICT_ENABLED = "parquet.dictionary.enabled"; + private static final String CFG_BLOOM_MAX_BYTES = "parquet.bloom.filter.max.bytes"; + private static final String CFG_VALIDATION_ENABLED = "parquet.validation.enabled"; + private static final String CFG_PAGE_CHECKSUM_ENABLED = "parquet.page.write.checksum.enabled"; + private static final String CFG_WRITER_VERSION = "parquet.writer.version"; + private static final String CFG_MAX_PADDING_SIZE = "parquet.max.padding.size"; + + private static final class ConfigCollector { + private final java.util.LinkedHashMap m = new java.util.LinkedHashMap<>(); + + ConfigCollector add(String k, Object v) { + m.put(k, String.valueOf(v)); + return this; + } + + java.util.Map done() { + return m; + } + } + private OutputFile file = null; private Path path = null; private FileEncryptionProperties encryptionProperties = null; @@ -963,7 +989,14 @@ public ParquetWriter build() throws IOException { if (conf == null) { conf = new HadoopParquetConfiguration(); } - ParquetProperties encodingProps = encodingPropsBuilder.build(); + + final ParquetProperties propsSnapshot = encodingPropsBuilder.build(); + + if (conf.getBoolean(org.apache.parquet.hadoop.ParquetOutputFormat.PERSIST_WRITER_CONFIG, false)) { + encodingPropsBuilder.withExtraMetaData(buildWriterConfigMetadata(propsSnapshot)); + } + + final ParquetProperties encodingProps = encodingPropsBuilder.build(); if (codecFactory == null) { codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); } @@ -983,5 +1016,25 @@ public ParquetWriter build() throws IOException { encodingProps, encryptionProperties); } + + /** + * Builds a map of writer configuration parameters to be persisted in footer metadata. + */ + private java.util.Map buildWriterConfigMetadata(ParquetProperties props) { + return new ConfigCollector() + .add(CFG_BLOCK_SIZE, rowGroupSize) + .add(CFG_BLOCK_ROW_COUNT_LIMIT, props.getRowGroupRowCountLimit()) + .add(CFG_PAGE_SIZE, props.getPageSizeThreshold()) + .add(CFG_PAGE_ROW_COUNT_LIMIT, props.getPageRowCountLimit()) + .add(CFG_DICT_PAGE_SIZE, props.getDictionaryPageSizeThreshold()) + .add(CFG_COMPRESSION, codecName.name()) + .add(CFG_DICT_ENABLED, props.isEnableDictionary()) + .add(CFG_BLOOM_MAX_BYTES, props.getMaxBloomFilterBytes()) + .add(CFG_VALIDATION_ENABLED, enableValidation) + .add(CFG_PAGE_CHECKSUM_ENABLED, props.getPageWriteChecksumEnabled()) + .add(CFG_WRITER_VERSION, props.getWriterVersion().name()) + .add(CFG_MAX_PADDING_SIZE, maxPaddingSize) + .done(); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 2cd83624f6..328da3cf8d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -79,6 +79,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; @@ -456,6 +457,116 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { 3); } + @Test + public void testWriterConfigPersisted() throws Exception { + Configuration conf = new Configuration(); + ParquetOutputFormat.setBlockRowCountLimit(conf, 2); + ParquetOutputFormat.setPersistWriterConfig(conf, true); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + GroupWriteSupport.setSchema(schema, conf); + + java.io.File file = new File(temp.getRoot(), "testWriterConfigPersisted.parquet"); + Path path = new Path(file.toURI()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withRowGroupRowCountLimit(2) + .build()) { + writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "a")); + writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "b")); + } + + try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + java.util.Map kv = r.getFooter().getFileMetaData().getKeyValueMetaData(); + Assert.assertEquals("2", kv.get(ParquetOutputFormat.BLOCK_ROW_COUNT_LIMIT)); + } + } + + @Test + public void testWriterConfigNotPersistedByDefault() throws Exception { + Configuration conf = new Configuration(); + ParquetOutputFormat.setBlockRowCountLimit(conf, 2); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + GroupWriteSupport.setSchema(schema, conf); + + java.io.File file = new File(temp.getRoot(), "testWriterConfigNotPersistedByDefault.parquet"); + Path path = new Path(file.toURI()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withRowGroupRowCountLimit(2) + .build()) { + writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "a")); + writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "b")); + } + + try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + java.util.Map kv = r.getFooter().getFileMetaData().getKeyValueMetaData(); + Assert.assertNull( + "Writer config should not be persisted by default", + kv.get(ParquetOutputFormat.BLOCK_ROW_COUNT_LIMIT)); + } + } + + @Test + public void testComprehensiveWriterConfigPersisted() throws Exception { + Configuration conf = new Configuration(); + ParquetOutputFormat.setPersistWriterConfig(conf, true); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + GroupWriteSupport.setSchema(schema, conf); + + java.io.File file = new File(temp.getRoot(), "testComprehensiveWriterConfigPersisted.parquet"); + Path path = new Path(file.toURI()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withRowGroupRowCountLimit(1000) + .withPageSize(1024) + .withPageRowCountLimit(500) + .withDictionaryPageSize(2048) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withDictionaryEncoding(true) + .withMaxBloomFilterBytes(1024 * 1024) + .withValidation(false) + .withPageWriteChecksumEnabled(true) + .withRowGroupSize(64 * 1024 * 1024) + .withMaxPaddingSize(8 * 1024 * 1024) + .build()) { + writer.write(new SimpleGroupFactory(schema).newGroup().append("str", "test")); + } + + try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + java.util.Map kv = r.getFooter().getFileMetaData().getKeyValueMetaData(); + + Assert.assertEquals("1000", kv.get("parquet.block.row.count.limit")); + Assert.assertEquals("67108864", kv.get("parquet.block.size")); + Assert.assertEquals("1024", kv.get("parquet.page.size")); + Assert.assertEquals("500", kv.get("parquet.page.row.count.limit")); + Assert.assertEquals("2048", kv.get("parquet.dictionary.page.size")); + Assert.assertEquals("SNAPPY", kv.get("parquet.compression.codec")); + Assert.assertEquals("true", kv.get("parquet.dictionary.enabled")); + Assert.assertEquals("1048576", kv.get("parquet.bloom.filter.max.bytes")); + Assert.assertEquals("false", kv.get("parquet.validation.enabled")); + Assert.assertEquals("true", kv.get("parquet.page.write.checksum.enabled")); + Assert.assertEquals("8388608", kv.get("parquet.max.padding.size")); + } + } + @Test public void testExtraMetaData() throws Exception { final Configuration conf = new Configuration();