Skip to content

Commit 643d80e

Browse files
committed
[flink] introduce 'sink.writer-refresh-detect-options' to allow more configs to be refreshed
1 parent 63b4b7b commit 643d80e

File tree

9 files changed

+59
-34
lines changed

9 files changed

+59
-34
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -434,12 +434,6 @@
434434
<td>Boolean</td>
435435
<td>Optional endInput check partition expire used in case of batch mode or bounded stream.</td>
436436
</tr>
437-
<tr>
438-
<td><h5>external-paths.detect-config.enabled</h5></td>
439-
<td style="word-wrap: break-word;">false</td>
440-
<td>Boolean</td>
441-
<td>Whether to auto detected the change of data-file.external-paths when streaming writing.</td>
442-
</tr>
443437
<tr>
444438
<td><h5>fields.default-aggregate-function</h5></td>
445439
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,12 @@
326326
<td>MemorySize</td>
327327
<td>Sink writer memory to control heap memory of writer.</td>
328328
</tr>
329+
<tr>
330+
<td><h5>sink.writer-refresh-detect-options</h5></td>
331+
<td style="word-wrap: break-word;">(none)</td>
332+
<td>String</td>
333+
<td>the options which are expected to be refreshed when streaming writing, multiple options separated by commas.</td>
334+
</tr>
329335
<tr>
330336
<td><h5>source.checkpoint-align.enabled</h5></td>
331337
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,6 @@ public InlineElement getDescription() {
206206
+ ExternalPathStrategy.SPECIFIC_FS
207207
+ ", should be the prefix scheme of the external path, now supported are s3 and oss.");
208208

209-
public static final ConfigOption<Boolean> AUTO_DETECT_DATA_FILE_EXTERNAL_PATHS =
210-
key("external-paths.detect-config.enabled")
211-
.booleanType()
212-
.defaultValue(false)
213-
.withDescription(
214-
"Whether to auto detected the change of data-file.external-paths when streaming writing.");
215-
216209
public static final ConfigOption<Boolean> COMPACTION_FORCE_REWRITE_ALL_FILES =
217210
key("compaction.force-rewrite-all-files")
218211
.booleanType()
@@ -1973,6 +1966,18 @@ public Map<String, String> toMap() {
19731966
return options.toMap();
19741967
}
19751968

1969+
public Map<String, String> getSpecificOptions(String... keys) {
1970+
Map<String, String> result = new HashMap<>();
1971+
for (String key : keys) {
1972+
String value = options.get(key);
1973+
// if value is null, it means the key is not set explicitly
1974+
if (value != null) {
1975+
result.put(key, value);
1976+
}
1977+
}
1978+
return result;
1979+
}
1980+
19761981
public int bucket() {
19771982
return options.get(BUCKET);
19781983
}
@@ -2682,19 +2687,6 @@ public String dataFileExternalPaths() {
26822687
return options.get(DATA_FILE_EXTERNAL_PATHS);
26832688
}
26842689

2685-
public boolean autoDetectDataFileExternalPaths() {
2686-
return options.get(AUTO_DETECT_DATA_FILE_EXTERNAL_PATHS);
2687-
}
2688-
2689-
public Map<String, String> dataFileExternalPathConfig() {
2690-
Map<String, String> externalPathsConfig = new HashMap<>();
2691-
externalPathsConfig.put(DATA_FILE_EXTERNAL_PATHS.key(), dataFileExternalPaths());
2692-
externalPathsConfig.put(
2693-
DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), externalPathStrategy().toString());
2694-
externalPathsConfig.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), externalSpecificFS());
2695-
return externalPathsConfig;
2696-
}
2697-
26982690
public ExternalPathStrategy externalPathStrategy() {
26992691
return options.get(DATA_FILE_EXTERNAL_PATHS_STRATEGY);
27002692
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,14 @@ public class FlinkConnectorOptions {
530530
.defaultValue(true)
531531
.withDescription("Enable pass job level filesystem settings to table file IO.");
532532

533+
public static final ConfigOption<String> SINK_WRITER_REFRESH_DETECT_OPTIONS =
534+
key("sink.writer-refresh-detect-options")
535+
.stringType()
536+
.noDefaultValue()
537+
.withDescription(
538+
"the options which are expected to be refreshed when streaming writing, "
539+
+ "multiple options separated by commas.");
540+
533541
public static List<ConfigOption<?>> getOptions() {
534542
final Field[] fields = FlinkConnectorOptions.class.getFields();
535543
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@
2222
import org.apache.paimon.annotation.VisibleForTesting;
2323
import org.apache.paimon.append.AppendCompactTask;
2424
import org.apache.paimon.data.BinaryRow;
25+
import org.apache.paimon.flink.FlinkConnectorOptions;
2526
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2627
import org.apache.paimon.flink.sink.Committable;
2728
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
2829
import org.apache.paimon.operation.FileStoreWrite;
2930
import org.apache.paimon.operation.metrics.CompactionMetrics;
3031
import org.apache.paimon.operation.metrics.MetricUtils;
32+
import org.apache.paimon.options.Options;
3133
import org.apache.paimon.schema.TableSchema;
3234
import org.apache.paimon.table.FileStoreTable;
3335
import org.apache.paimon.table.sink.CommitMessage;
3436
import org.apache.paimon.table.sink.TableCommitImpl;
37+
import org.apache.paimon.utils.StringUtils;
3538

3639
import org.apache.flink.metrics.MetricGroup;
3740
import org.slf4j.Logger;
@@ -220,7 +223,12 @@ private void replace(FileStoreTable newTable) throws Exception {
220223
}
221224

222225
protected void updateWriteWithNewSchema() {
223-
if (table.coreOptions().autoDetectDataFileExternalPaths()) {
226+
// the configs to be refreshed
227+
String refreshedConfigs =
228+
Options.fromMap(table.options())
229+
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
230+
231+
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
224232
Optional<TableSchema> lastestSchema = table.schemaManager().latest();
225233
if (lastestSchema.isPresent() && lastestSchema.get().id() > table.schema().id()) {
226234
LOG.info(
@@ -229,7 +237,9 @@ protected void updateWriteWithNewSchema() {
229237
lastestSchema.get().id());
230238
try {
231239
CoreOptions newCoreOptions = new CoreOptions(lastestSchema.get().options());
232-
table = table.copy(newCoreOptions.dataFileExternalPathConfig());
240+
table =
241+
table.copy(
242+
newCoreOptions.getSpecificOptions(refreshedConfigs.split(",")));
233243
replace(table);
234244
} catch (Exception e) {
235245
throw new RuntimeException("update write failed.", e);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
package org.apache.paimon.flink.sink;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.flink.FlinkConnectorOptions;
2223
import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
2324
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
2425
import org.apache.paimon.memory.MemorySegmentPool;
2526
import org.apache.paimon.options.Options;
2627
import org.apache.paimon.schema.TableSchema;
2728
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.utils.StringUtils;
2830

2931
import org.apache.flink.runtime.memory.MemoryManager;
3032
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -111,7 +113,12 @@ private void emitCommittables(boolean waitCompaction, long checkpointId) throws
111113

112114
protected void updateWriteWithNewSchema(
113115
FileStoreTable table, StoreSinkWrite write, int taskId) {
114-
if (table.coreOptions().autoDetectDataFileExternalPaths()) {
116+
// the configs to be refreshed
117+
String refreshedConfigs =
118+
Options.fromMap(table.options())
119+
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
120+
121+
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
115122
Optional<TableSchema> lastestSchema = table.schemaManager().latest();
116123
if (lastestSchema.isPresent() && lastestSchema.get().id() > table.schema().id()) {
117124
LOG.info(
@@ -121,7 +128,9 @@ protected void updateWriteWithNewSchema(
121128
lastestSchema.get().id());
122129
try {
123130
CoreOptions newCoreOptions = new CoreOptions(lastestSchema.get().options());
124-
table = table.copy(newCoreOptions.dataFileExternalPathConfig());
131+
table =
132+
table.copy(
133+
newCoreOptions.getSpecificOptions(refreshedConfigs.split(",")));
125134
write.replace(table);
126135
} catch (Exception e) {
127136
throw new RuntimeException("update write failed.", e);

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ public void testTableReadWriteWithChangedExternalPath() throws Exception {
275275
+ bucket
276276
+ "',"
277277
+ bucketKey
278-
+ "'external-paths.detect-config.enabled' = 'true',"
278+
+ "'sink.writer-refresh-detect-options' = 'data-file.external-paths,"
279+
+ "data-file.external-paths.strategy,data-file.external-paths.specific-fs',"
279280
+ "'data-file.external-paths' = '"
280281
+ externalPaths1
281282
+ "',"

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,8 @@ public void testTableReadWriteWithChangedExternalPath() throws Exception {
499499
+ "'bucket' = '"
500500
+ bucket
501501
+ "',"
502-
+ "'external-paths.detect-config.enabled' = 'true',"
502+
+ "'sink.writer-refresh-detect-options' = 'data-file.external-paths,"
503+
+ "data-file.external-paths.strategy,data-file.external-paths.specific-fs',"
503504
+ "'data-file.external-paths' = '"
504505
+ externalPaths1
505506
+ "',"

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,9 @@ public void testStreamingCompactWithChangedExternalPath() throws Exception {
300300
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
301301
TraceableFileIO.SCHEME + "://" + externalPath1);
302302
tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin");
303-
tableOptions.put(CoreOptions.AUTO_DETECT_DATA_FILE_EXTERNAL_PATHS.key(), "true");
303+
tableOptions.put(
304+
FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS.key(),
305+
"data-file.external-paths,data-file.external-paths.strategy,data-file.external-paths.specific-fs");
304306
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
305307

306308
FileStoreTable table =
@@ -420,7 +422,9 @@ public void testUnawareBucketStreamingCompactWithChangedExternalPath() throws Ex
420422
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
421423
TraceableFileIO.SCHEME + "://" + externalPath1);
422424
tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin");
423-
tableOptions.put(CoreOptions.AUTO_DETECT_DATA_FILE_EXTERNAL_PATHS.key(), "true");
425+
tableOptions.put(
426+
FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS.key(),
427+
"data-file.external-paths,data-file.external-paths.strategy,data-file.external-paths.specific-fs");
424428

425429
FileStoreTable table =
426430
prepareTable(

0 commit comments

Comments
 (0)