Skip to content

Commit cc75225

Browse files
authored
GH-3235: Row count limit for each row group (#3236)
1 parent 1cda24f commit cc75225

File tree

6 files changed

+72
-10
lines changed

6 files changed

+72
-10
lines changed

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class ParquetProperties {
5757
public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD = Integer.MAX_VALUE / 2;
5858
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
5959
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
60+
public static final int DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT = Integer.MAX_VALUE;
6061
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
6162
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
6263
public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
@@ -122,6 +123,7 @@ public static WriterVersion fromString(String name) {
122123
private final ColumnProperty<Boolean> bloomFilterEnabled;
123124
private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
124125
private final ColumnProperty<Integer> numBloomFilterCandidates;
126+
private final int rowGroupRowCountLimit;
125127
private final int pageRowCountLimit;
126128
private final boolean pageWriteChecksumEnabled;
127129
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -153,6 +155,7 @@ private ParquetProperties(Builder builder) {
153155
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
154156
this.adaptiveBloomFilterEnabled = builder.adaptiveBloomFilterEnabled.build();
155157
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
158+
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
156159
this.pageRowCountLimit = builder.pageRowCountLimit;
157160
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
158161
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
@@ -302,6 +305,10 @@ public boolean estimateNextSizeCheck() {
302305
return estimateNextSizeCheck;
303306
}
304307

308+
public int getRowGroupRowCountLimit() {
309+
return rowGroupRowCountLimit;
310+
}
311+
305312
public int getPageRowCountLimit() {
306313
return pageRowCountLimit;
307314
}
@@ -400,6 +407,7 @@ public static class Builder {
400407
private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
401408
private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
402409
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
410+
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
403411
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
404412
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
405413
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -679,6 +687,12 @@ public Builder withBloomFilterEnabled(String columnPath, boolean enabled) {
679687
return this;
680688
}
681689

690+
public Builder withRowGroupRowCountLimit(int rowCount) {
691+
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for row groups: %s", rowCount);
692+
rowGroupRowCountLimit = rowCount;
693+
return this;
694+
}
695+
682696
public Builder withPageRowCountLimit(int rowCount) {
683697
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: %s", rowCount);
684698
pageRowCountLimit = rowCount;

parquet-hadoop/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,16 @@ conf.set("parquet.bloom.filter.fpp#column.path", 0.02)
266266

267267
---
268268

269-
270269
**Property:** `parquet.decrypt.off-heap.buffer.enabled`
271270
**Description:** Whether to use direct buffers to decrypt encrypted files. This should be set to
272271
true if the reader is using a `DirectByteBufferAllocator`
273272
**Default value:** `false`
274273

274+
---
275+
276+
**Property:** `parquet.block.row.count.limit`
277+
**Description:** The maximum number of rows per row group.
278+
**Default value:** `2147483647` (Integer.MAX_VALUE)
275279

276280
---
277281
**Property:** `parquet.page.row.count.limit`

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class InternalParquetRecordWriter<T> {
4848
private final WriteSupport<T> writeSupport;
4949
private final MessageType schema;
5050
private final Map<String, String> extraMetaData;
51-
private final long rowGroupSize;
5251
private long rowGroupSizeThreshold;
52+
private final int rowGroupRecordCountThreshold;
5353
private long nextRowGroupSize;
5454
private final BytesInputCompressor compressor;
5555
private final boolean validating;
@@ -91,8 +91,8 @@ public InternalParquetRecordWriter(
9191
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
9292
this.schema = schema;
9393
this.extraMetaData = extraMetaData;
94-
this.rowGroupSize = rowGroupSize;
9594
this.rowGroupSizeThreshold = rowGroupSize;
95+
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
9696
this.nextRowGroupSize = rowGroupSizeThreshold;
9797
this.compressor = compressor;
9898
this.validating = validating;
@@ -166,9 +166,16 @@ public long getDataSize() {
166166
}
167167

168168
private void checkBlockSizeReached() throws IOException {
169-
if (recordCount
170-
>= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it
171-
// for every record.
169+
if (recordCount >= rowGroupRecordCountThreshold) {
170+
LOG.debug("record count reaches threshold: flushing {} records to disk.", recordCount);
171+
flushRowGroupToStore();
172+
initStore();
173+
recordCountForNextMemCheck = min(
174+
max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
175+
props.getMaxRowCountForPageSizeCheck());
176+
this.lastRowGroupEndPos = parquetFileWriter.getPos();
177+
} else if (recordCount >= recordCountForNextMemCheck) {
178+
// checking the memory size is relatively expensive, so let's not do it for every record.
172179
long memSize = columnStore.getBufferedSize();
173180
long recordSize = memSize / recordCount;
174181
// flush the row group if it is within ~2 records of the limit

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public static enum JobSummaryLevel {
154154
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
155155
public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.adaptive.enabled";
156156
public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number";
157+
public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
157158
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
158159
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
159160
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";
@@ -366,6 +367,18 @@ private static int getStatisticsTruncateLength(Configuration conf) {
366367
return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
367368
}
368369

370+
public static void setBlockRowCountLimit(JobContext jobContext, int rowCount) {
371+
setBlockRowCountLimit(getConfiguration(jobContext), rowCount);
372+
}
373+
374+
public static void setBlockRowCountLimit(Configuration conf, int rowCount) {
375+
conf.setInt(BLOCK_ROW_COUNT_LIMIT, rowCount);
376+
}
377+
378+
static int getBlockRowCountLimit(Configuration conf) {
379+
return conf.getInt(BLOCK_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT);
380+
}
381+
369382
public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
370383
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
371384
}
@@ -500,6 +513,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
500513
.withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
501514
.withBloomFilterEnabled(getBloomFilterEnabled(conf))
502515
.withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
516+
.withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
503517
.withPageRowCountLimit(getPageRowCountLimit(conf))
504518
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
505519
.withStatisticsEnabled(getStatisticsEnabled(conf));

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,17 @@ public SELF withPageSize(int pageSize) {
609609
return self();
610610
}
611611

612+
/**
613+
* Sets the Parquet format row group row count limit used by the constructed writer.
614+
*
615+
* @param rowCount limit for the number of rows stored in a row group
616+
* @return this builder for method chaining
617+
*/
618+
public SELF withRowGroupRowCountLimit(int rowCount) {
619+
encodingPropsBuilder.withRowGroupRowCountLimit(rowCount);
620+
return self();
621+
}
622+
612623
/**
613624
* Sets the Parquet format page row count limit used by the constructed writer.
614625
*

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,17 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
443443
testParquetFileNumberOfBlocks(
444444
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
445445
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
446+
new Configuration(),
446447
1);
447-
testParquetFileNumberOfBlocks(1, 1, 3);
448+
testParquetFileNumberOfBlocks(1, 1, new Configuration(), 3);
449+
450+
Configuration conf = new Configuration();
451+
ParquetOutputFormat.setBlockRowCountLimit(conf, 1);
452+
testParquetFileNumberOfBlocks(
453+
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
454+
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
455+
conf,
456+
3);
448457
}
449458

450459
@Test
@@ -506,15 +515,17 @@ public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
506515
}
507516

508517
private void testParquetFileNumberOfBlocks(
509-
int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int expectedNumberOfBlocks)
518+
int minRowCountForPageSizeCheck,
519+
int maxRowCountForPageSizeCheck,
520+
Configuration conf,
521+
int expectedNumberOfBlocks)
510522
throws IOException {
511523
MessageType schema = Types.buildMessage()
512524
.required(BINARY)
513525
.as(stringType())
514526
.named("str")
515527
.named("msg");
516528

517-
Configuration conf = new Configuration();
518529
GroupWriteSupport.setSchema(schema, conf);
519530

520531
File file = temp.newFile();
@@ -523,7 +534,8 @@ private void testParquetFileNumberOfBlocks(
523534
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
524535
.withAllocator(allocator)
525536
.withConf(conf)
526-
// Set row group size to 1, to make sure we flush every time
537+
.withRowGroupRowCountLimit(ParquetOutputFormat.getBlockRowCountLimit(conf))
538+
// Set row group size to 1, to make sure we flush every time when
527539
// minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
528540
.withRowGroupSize(1)
529541
.withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)

0 commit comments

Comments
 (0)