Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class ParquetProperties {
public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD = Integer.MAX_VALUE / 2;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
Expand Down Expand Up @@ -122,6 +123,7 @@ public static WriterVersion fromString(String name) {
private final ColumnProperty<Boolean> bloomFilterEnabled;
private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
private final ColumnProperty<Integer> numBloomFilterCandidates;
private final int rowGroupRowCountLimit;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
Expand Down Expand Up @@ -153,6 +155,7 @@ private ParquetProperties(Builder builder) {
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
this.adaptiveBloomFilterEnabled = builder.adaptiveBloomFilterEnabled.build();
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
Expand Down Expand Up @@ -302,6 +305,10 @@ public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}

public int getRowGroupRowCountLimit() {
return rowGroupRowCountLimit;
}

public int getPageRowCountLimit() {
return pageRowCountLimit;
}
Expand Down Expand Up @@ -400,6 +407,7 @@ public static class Builder {
private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
Expand Down Expand Up @@ -679,6 +687,12 @@ public Builder withBloomFilterEnabled(String columnPath, boolean enabled) {
return this;
}

public Builder withRowGroupRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for row groups: %s", rowCount);
rowGroupRowCountLimit = rowCount;
return this;
}

public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: %s", rowCount);
pageRowCountLimit = rowCount;
Expand Down
6 changes: 5 additions & 1 deletion parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,16 @@ conf.set("parquet.bloom.filter.fpp#column.path", 0.02)

---


**Property:** `parquet.decrypt.off-heap.buffer.enabled`
**Description:** Whether to use direct buffers to decrypt encrypted files. This should be set to
true if the reader is using a `DirectByteBufferAllocator`
**Default value:** `false`

---

**Property:** `parquet.block.row.count.limit`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there is an existing parquet.block.size

**Description:** The maximum number of rows per row group.
**Default value:** `2147483647` (Integer.MAX_VALUE)

---
**Property:** `parquet.page.row.count.limit`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class InternalParquetRecordWriter<T> {
private final WriteSupport<T> writeSupport;
private final MessageType schema;
private final Map<String, String> extraMetaData;
private final long rowGroupSize;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused field

private long rowGroupSizeThreshold;
private final int rowGroupRecordCountThreshold;
private long nextRowGroupSize;
private final BytesInputCompressor compressor;
private final boolean validating;
Expand Down Expand Up @@ -91,8 +91,8 @@ public InternalParquetRecordWriter(
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
this.schema = schema;
this.extraMetaData = extraMetaData;
this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = compressor;
this.validating = validating;
Expand Down Expand Up @@ -166,9 +166,16 @@ public long getDataSize() {
}

private void checkBlockSizeReached() throws IOException {
if (recordCount
>= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it
// for every record.
if (recordCount >= rowGroupRecordCountThreshold) {
LOG.debug("record count reaches threshold: flushing {} records to disk.", recordCount);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to escalate the log level to INFO? It should not be noisy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would not be noisy to raise the level to INFO but what purpose does it serve? If we want to answer why a Parquet file looks like how it does (the number/size of row groups etc.), the logs of the file creation are probably long gone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a typical Hadoop YARN cluster, which serves Spark workloads, application logs are preserved for a few days by collecting and aggregating to HDFS. Anyway, it's not a big deal, we can always use parquet-cli to analyze the suspicious Parquet files :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, what I was thinking of, you write a Parquet file, and then, you read it somewhere else months/years later. You won't have the related logs for sure.

It is a very separate topic, but it would be a good idea to write these properties directly into the footer so if we have any issues with a file, at least we would know, how it was created.

flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(
max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
props.getMaxRowCountForPageSizeCheck());
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else if (recordCount >= recordCountForNextMemCheck) {
// checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = columnStore.getBufferedSize();
long recordSize = memSize / recordCount;
// flush the row group if it is within ~2 records of the limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public static enum JobSummaryLevel {
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.adaptive.enabled";
public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number";
public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";
Expand Down Expand Up @@ -366,6 +367,18 @@ private static int getStatisticsTruncateLength(Configuration conf) {
return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}

public static void setBlockRowCountLimit(JobContext jobContext, int rowCount) {
setBlockRowCountLimit(getConfiguration(jobContext), rowCount);
}

public static void setBlockRowCountLimit(Configuration conf, int rowCount) {
conf.setInt(BLOCK_ROW_COUNT_LIMIT, rowCount);
}

static int getBlockRowCountLimit(Configuration conf) {
return conf.getInt(BLOCK_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT);
}

public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}
Expand Down Expand Up @@ -500,6 +513,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
.withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
.withBloomFilterEnabled(getBloomFilterEnabled(conf))
.withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
.withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.withStatisticsEnabled(getStatisticsEnabled(conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,17 @@ public SELF withPageSize(int pageSize) {
return self();
}

/**
* Sets the Parquet format row group row count limit used by the constructed writer.
*
* @param rowCount limit for the number of rows stored in a row group
* @return this builder for method chaining
*/
public SELF withRowGroupRowCountLimit(int rowCount) {
encodingPropsBuilder.withRowGroupRowCountLimit(rowCount);
return self();
}

/**
* Sets the Parquet format page row count limit used by the constructed writer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,17 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
testParquetFileNumberOfBlocks(
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
new Configuration(),
1);
testParquetFileNumberOfBlocks(1, 1, 3);
testParquetFileNumberOfBlocks(1, 1, new Configuration(), 3);

Configuration conf = new Configuration();
ParquetOutputFormat.setBlockRowCountLimit(conf, 1);
testParquetFileNumberOfBlocks(
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
conf,
3);
}

@Test
Expand Down Expand Up @@ -506,15 +515,17 @@ public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
}

private void testParquetFileNumberOfBlocks(
int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int expectedNumberOfBlocks)
int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck,
Configuration conf,
int expectedNumberOfBlocks)
throws IOException {
MessageType schema = Types.buildMessage()
.required(BINARY)
.as(stringType())
.named("str")
.named("msg");

Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

File file = temp.newFile();
Expand All @@ -523,7 +534,8 @@ private void testParquetFileNumberOfBlocks(
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withAllocator(allocator)
.withConf(conf)
// Set row group size to 1, to make sure we flush every time
.withRowGroupRowCountLimit(ParquetOutputFormat.getBlockRowCountLimit(conf))
// Set row group size to 1, to make sure we flush every time when
// minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
.withRowGroupSize(1)
.withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)
Expand Down