Skip to content

Commit caa2730

Browse files
authored
[flink] make tableConf valid for database compaction job (#5287)
1 parent fefbaf6 commit caa2730

13 files changed

+108
-25
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ private void buildForCombinedMode() {
215215
.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)
216216
.toMillis())
217217
.withPartitionIdleTime(partitionIdleTime);
218+
sourceBuilder.withTableOptions(tableOptions.toMap());
218219

219220
Integer parallelism =
220221
tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM) == null

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,15 @@ public MultiAwareBucketTableScan(
5151
Pattern includingPattern,
5252
Pattern excludingPattern,
5353
Pattern databasePattern,
54-
boolean isStreaming) {
55-
super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming);
54+
boolean isStreaming,
55+
Map<String, String> tableOptions) {
56+
super(
57+
catalogLoader,
58+
includingPattern,
59+
excludingPattern,
60+
databasePattern,
61+
isStreaming,
62+
tableOptions);
5663
tablesMap = new HashMap<>();
5764
scansMap = new HashMap<>();
5865
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.LoggerFactory;
3333

3434
import java.util.List;
35+
import java.util.Map;
3536
import java.util.regex.Pattern;
3637

3738
import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable;
@@ -59,18 +60,22 @@ public abstract class MultiTableScanBase<T> implements AutoCloseable {
5960

6061
protected boolean isStreaming;
6162

63+
protected final Map<String, String> tableOptions;
64+
6265
public MultiTableScanBase(
6366
CatalogLoader catalogLoader,
6467
Pattern includingPattern,
6568
Pattern excludingPattern,
6669
Pattern databasePattern,
67-
boolean isStreaming) {
70+
boolean isStreaming,
71+
Map<String, String> tableOptions) {
6872
catalog = catalogLoader.load();
6973

7074
this.includingPattern = includingPattern;
7175
this.excludingPattern = excludingPattern;
7276
this.databasePattern = databasePattern;
7377
this.isStreaming = isStreaming;
78+
this.tableOptions = tableOptions;
7479
}
7580

7681
protected void updateTableMap()
@@ -93,7 +98,7 @@ protected void updateTableMap()
9398
continue;
9499
}
95100

96-
FileStoreTable fileStoreTable = (FileStoreTable) table;
101+
FileStoreTable fileStoreTable = ((FileStoreTable) table).copy(tableOptions);
97102
addScanTable(fileStoreTable, identifier);
98103
}
99104
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,15 @@ public MultiUnawareBucketTableScan(
4545
Pattern includingPattern,
4646
Pattern excludingPattern,
4747
Pattern databasePattern,
48-
boolean isStreaming) {
49-
super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming);
48+
boolean isStreaming,
49+
Map<String, String> tableOptions) {
50+
super(
51+
catalogLoader,
52+
includingPattern,
53+
excludingPattern,
54+
databasePattern,
55+
isStreaming,
56+
tableOptions);
5057
tablesMap = new HashMap<>();
5158
}
5259

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void processElement(StreamRecord<MultiTableUnawareAppendCompactionTask> e
118118
private UnawareBucketCompactor compactor(Identifier tableId) {
119119
try {
120120
return new UnawareBucketCompactor(
121-
(FileStoreTable) catalog.getTable(tableId),
121+
(FileStoreTable) catalog.getTable(tableId).copy(options.toMap()),
122122
commitUser,
123123
this::workerExecutor,
124124
getMetricGroup());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException
226226
while (true) {
227227
try {
228228
table = (FileStoreTable) catalog.getTable(tableId);
229+
table = table.copy(options.toMap());
229230
HashMap<String, String> dynamicOptions =
230231
new HashMap<String, String>() {
231232
{

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class PrepareCommitOperator<IN, OUT> extends AbstractStreamOpera
5353

5454
@Nullable protected transient MemorySegmentPool memoryPool;
5555
@Nullable private transient MemorySegmentAllocator memoryAllocator;
56-
private final Options options;
56+
protected final Options options;
5757
private boolean endOfInput = false;
5858

5959
public PrepareCommitOperator(StreamOperatorParameters<OUT> parameters, Options options) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import javax.annotation.Nullable;
3838

3939
import java.time.Duration;
40+
import java.util.HashMap;
41+
import java.util.Map;
4042
import java.util.regex.Pattern;
4143

4244
/**
@@ -54,6 +56,7 @@ public class CombinedTableCompactorSourceBuilder {
5456
private boolean isContinuous = false;
5557
private StreamExecutionEnvironment env;
5658
@Nullable private Duration partitionIdleTime = null;
59+
private Map<String, String> tableOptions = new HashMap<>();
5760

5861
public CombinedTableCompactorSourceBuilder(
5962
CatalogLoader catalogLoader,
@@ -84,6 +87,11 @@ public CombinedTableCompactorSourceBuilder withPartitionIdleTime(
8487
return this;
8588
}
8689

90+
public CombinedTableCompactorSourceBuilder withTableOptions(Map<String, String> tableOptions) {
91+
this.tableOptions = tableOptions;
92+
return this;
93+
}
94+
8795
public DataStream<RowData> buildAwareBucketTableSource() {
8896
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
8997
RowType produceType = CompactBucketsTable.getRowType();
@@ -96,6 +104,7 @@ public DataStream<RowData> buildAwareBucketTableSource() {
96104
includingPattern,
97105
excludingPattern,
98106
databasePattern,
107+
tableOptions,
99108
monitorInterval);
100109
} else {
101110
return CombinedAwareBatchSource.buildSource(
@@ -106,6 +115,7 @@ public DataStream<RowData> buildAwareBucketTableSource() {
106115
includingPattern,
107116
excludingPattern,
108117
databasePattern,
118+
tableOptions,
109119
partitionIdleTime);
110120
}
111121
}
@@ -120,6 +130,7 @@ public DataStream<MultiTableUnawareAppendCompactionTask> buildForUnawareBucketsT
120130
includingPattern,
121131
excludingPattern,
122132
databasePattern,
133+
tableOptions,
123134
monitorInterval);
124135
} else {
125136
return CombinedUnawareBatchSource.buildSource(
@@ -129,6 +140,7 @@ public DataStream<MultiTableUnawareAppendCompactionTask> buildForUnawareBucketsT
129140
includingPattern,
130141
excludingPattern,
131142
databasePattern,
143+
tableOptions,
132144
partitionIdleTime);
133145
}
134146
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.slf4j.LoggerFactory;
4444

4545
import java.time.Duration;
46+
import java.util.Map;
4647
import java.util.regex.Pattern;
4748

4849
import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
@@ -52,13 +53,16 @@
5253
public class CombinedAwareBatchSource extends CombinedCompactorSource<Tuple2<Split, String>> {
5354

5455
private static final Logger LOGGER = LoggerFactory.getLogger(CombinedAwareBatchSource.class);
56+
private final Map<String, String> tableOptions;
5557

5658
public CombinedAwareBatchSource(
5759
CatalogLoader catalogLoader,
5860
Pattern includingPattern,
5961
Pattern excludingPattern,
60-
Pattern databasePattern) {
62+
Pattern databasePattern,
63+
Map<String, String> tableOptions) {
6164
super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
65+
this.tableOptions = tableOptions;
6266
}
6367

6468
@Override
@@ -79,7 +83,8 @@ public void start() {
7983
includingPattern,
8084
excludingPattern,
8185
databasePattern,
82-
isStreaming);
86+
isStreaming,
87+
tableOptions);
8388
}
8489

8590
@Override
@@ -116,10 +121,15 @@ public static DataStream<RowData> buildSource(
116121
Pattern includingPattern,
117122
Pattern excludingPattern,
118123
Pattern databasePattern,
124+
Map<String, String> tableOptions,
119125
Duration partitionIdleTime) {
120126
CombinedAwareBatchSource source =
121127
new CombinedAwareBatchSource(
122-
catalogLoader, includingPattern, excludingPattern, databasePattern);
128+
catalogLoader,
129+
includingPattern,
130+
excludingPattern,
131+
databasePattern,
132+
tableOptions);
123133
TupleTypeInfo<Tuple2<Split, String>> tupleTypeInfo =
124134
new TupleTypeInfo<>(
125135
new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4141
import org.apache.flink.table.data.RowData;
4242

43+
import java.util.Map;
4344
import java.util.regex.Pattern;
4445

4546
import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
@@ -49,14 +50,17 @@
4950
public class CombinedAwareStreamingSource extends CombinedCompactorSource<Tuple2<Split, String>> {
5051

5152
private final long monitorInterval;
53+
private final Map<String, String> tableOptions;
5254

5355
public CombinedAwareStreamingSource(
5456
CatalogLoader catalogLoader,
5557
Pattern includingPattern,
5658
Pattern excludingPattern,
5759
Pattern databasePattern,
60+
Map<String, String> tableOptions,
5861
long monitorInterval) {
5962
super(catalogLoader, includingPattern, excludingPattern, databasePattern, true);
63+
this.tableOptions = tableOptions;
6064
this.monitorInterval = monitorInterval;
6165
}
6266

@@ -78,7 +82,8 @@ public void start() {
7882
includingPattern,
7983
excludingPattern,
8084
databasePattern,
81-
isStreaming);
85+
isStreaming,
86+
tableOptions);
8287
}
8388

8489
@Override
@@ -111,6 +116,7 @@ public static DataStream<RowData> buildSource(
111116
Pattern includingPattern,
112117
Pattern excludingPattern,
113118
Pattern databasePattern,
119+
Map<String, String> tableOptions,
114120
long monitorInterval) {
115121

116122
CombinedAwareStreamingSource source =
@@ -119,6 +125,7 @@ public static DataStream<RowData> buildSource(
119125
includingPattern,
120126
excludingPattern,
121127
databasePattern,
128+
tableOptions,
122129
monitorInterval);
123130
TupleTypeInfo<Tuple2<Split, String>> tupleTypeInfo =
124131
new TupleTypeInfo<>(

0 commit comments

Comments
 (0)