Skip to content

Commit 26cb7cd

Browse files
Flink: Backport RewriteDataFiles add max file group count (apache#14861)
Backports apache#14837
1 parent 60b42ec commit 26cb7cd

File tree

6 files changed

+80
-2
lines changed

6 files changed

+80
-2
lines changed

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,19 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
181181
return this;
182182
}
183183

184+
/**
185+
* Configures the max file count for rewriting. See {@link
186+
* SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more details.
187+
*
188+
* @param maxFileGroupInputFiles file count for rewrite
189+
*/
190+
public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
191+
this.rewriteOptions.put(
192+
SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
193+
String.valueOf(maxFileGroupInputFiles));
194+
return this;
195+
}
196+
184197
/**
185198
* Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
186199
* for more details.

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.assertj.core.api.Assertions.assertThat;
2323

2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Set;
2627
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2728
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ private RewriteUtil() {}
3839

3940
static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(TableLoader tableLoader)
4041
throws Exception {
42+
return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES, "2"));
43+
}
44+
45+
static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
46+
TableLoader tableLoader, Map<String, String> rewriterOptions) throws Exception {
4147
try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup>
4248
testHarness =
4349
ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(TableLoader
4854
tableLoader,
4955
11,
5056
10_000_000L,
51-
ImmutableMap.of(MIN_INPUT_FILES, "2"),
57+
rewriterOptions,
5258
Expressions.alwaysTrue()))) {
5359
testHarness.open();
5460

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.maintenance.operator;
2020

21+
import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
2122
import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
2223
import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
2324
import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ void testMaxRewriteBytes() throws Exception {
182183
}
183184
}
184185

186+
@Test
187+
void testMaxFileGroupCount() throws Exception {
188+
Table table = createPartitionedTable();
189+
insertPartitioned(table, 1, "p1");
190+
insertPartitioned(table, 2, "p1");
191+
insertPartitioned(table, 3, "p2");
192+
insertPartitioned(table, 4, "p2");
193+
insertPartitioned(table, 5, "p2");
194+
insertPartitioned(table, 6, "p2");
195+
196+
List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = planDataFileRewrite(tableLoader());
197+
assertThat(planWithNoLimit).hasSize(2);
198+
199+
List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
200+
planDataFileRewrite(
201+
tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2", MAX_FILE_GROUP_INPUT_FILES, "2"));
202+
assertThat(planWithMaxFileGroupCount).hasSize(3);
203+
}
204+
185205
void assertRewriteFileGroup(
186206
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set<DataFile> files) {
187207
assertThat(plannedGroup.table().currentSnapshot().snapshotId())

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,19 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
181181
return this;
182182
}
183183

184+
/**
185+
* Configures the max file count for rewriting. See {@link
186+
* SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more details.
187+
*
188+
* @param maxFileGroupInputFiles file count for rewrite
189+
*/
190+
public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
191+
this.rewriteOptions.put(
192+
SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
193+
String.valueOf(maxFileGroupInputFiles));
194+
return this;
195+
}
196+
184197
/**
185198
* Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
186199
* for more details.

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.assertj.core.api.Assertions.assertThat;
2323

2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Set;
2627
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2728
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ private RewriteUtil() {}
3839

3940
static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(TableLoader tableLoader)
4041
throws Exception {
42+
return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES, "2"));
43+
}
44+
45+
static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
46+
TableLoader tableLoader, Map<String, String> rewriterOptions) throws Exception {
4147
try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup>
4248
testHarness =
4349
ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(TableLoader
4854
tableLoader,
4955
11,
5056
10_000_000L,
51-
ImmutableMap.of(MIN_INPUT_FILES, "2"),
57+
rewriterOptions,
5258
Expressions.alwaysTrue()))) {
5359
testHarness.open();
5460

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.maintenance.operator;
2020

21+
import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
2122
import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
2223
import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
2324
import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ void testMaxRewriteBytes() throws Exception {
182183
}
183184
}
184185

186+
@Test
187+
void testMaxFileGroupCount() throws Exception {
188+
Table table = createPartitionedTable();
189+
insertPartitioned(table, 1, "p1");
190+
insertPartitioned(table, 2, "p1");
191+
insertPartitioned(table, 3, "p2");
192+
insertPartitioned(table, 4, "p2");
193+
insertPartitioned(table, 5, "p2");
194+
insertPartitioned(table, 6, "p2");
195+
196+
List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = planDataFileRewrite(tableLoader());
197+
assertThat(planWithNoLimit).hasSize(2);
198+
199+
List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
200+
planDataFileRewrite(
201+
tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2", MAX_FILE_GROUP_INPUT_FILES, "2"));
202+
assertThat(planWithMaxFileGroupCount).hasSize(3);
203+
}
204+
185205
void assertRewriteFileGroup(
186206
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set<DataFile> files) {
187207
assertThat(plannedGroup.table().currentSnapshot().snapshotId())

0 commit comments

Comments
 (0)