Skip to content

Commit 64aea6f

Browse files
committed
refactor WriteRefresher
1 parent 6ec8fd9 commit 64aea6f

File tree

13 files changed

+133
-68
lines changed

13 files changed

+133
-68
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2694,6 +2694,18 @@ public String externalSpecificFS() {
26942694
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
26952695
}
26962696

2697+
public Map<String, String> configGroups(Set<String> groups) {
2698+
Map<String, String> configs = new HashMap<>();
2699+
// external-paths config group
2700+
String externalPaths = "external-paths";
2701+
if (groups.contains(externalPaths)) {
2702+
configs.put(DATA_FILE_EXTERNAL_PATHS.key(), dataFileExternalPaths());
2703+
configs.put(DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), externalPathStrategy().toString());
2704+
configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), externalSpecificFS());
2705+
}
2706+
return configs;
2707+
}
2708+
26972709
public Boolean forceRewriteAllFiles() {
26982710
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
26992711
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -530,13 +530,14 @@ public class FlinkConnectorOptions {
530530
.defaultValue(true)
531531
.withDescription("Enable pass job level filesystem settings to table file IO.");
532532

533-
public static final ConfigOption<String> SINK_WRITER_REFRESH_DETECT_OPTIONS =
534-
key("sink.writer-refresh-detect-options")
533+
public static final ConfigOption<String> SINK_WRITER_REFRESH_DETECTORS =
534+
key("sink.writer-refresh-detectors")
535535
.stringType()
536536
.noDefaultValue()
537537
.withDescription(
538-
"the options which are expected to be refreshed when streaming writing, "
539-
+ "multiple options separated by commas.");
538+
"The option groups which are expected to be refreshed when streaming writing, "
539+
+ "multiple option group separated by commas. "
540+
+ "Now only 'external-paths' is supported.");
540541

541542
public static List<ConfigOption<?>> getOptions() {
542543
final Field[] fields = FlinkConnectorOptions.class.getFields();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class AppendTableCompactor {
6464
@Nullable private final transient CompactionMetrics compactionMetrics;
6565
@Nullable private final transient CompactionMetrics.Reporter metricsReporter;
6666

67+
protected final transient WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
68+
6769
public AppendTableCompactor(
6870
FileStoreTable table,
6971
String commitUser,
@@ -83,6 +85,8 @@ public AppendTableCompactor(
8385
? null
8486
// partition and bucket fields are no use.
8587
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
88+
this.writeRefresher =
89+
new WriterRefresher<>(table, write, (newTable, writer) -> replace(newTable));
8690
}
8791

8892
public void processElement(AppendCompactTask task) throws Exception {
@@ -193,7 +197,7 @@ public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId
193197
.map(s -> new Committable(checkpointId, Committable.Kind.FILE, s))
194198
.collect(Collectors.toList());
195199

196-
refreshWrite();
200+
writeRefresher.tryRefresh();
197201
return committables;
198202
} catch (InterruptedException e) {
199203
throw new RuntimeException("Interrupted while waiting tasks done.", e);
@@ -216,8 +220,4 @@ private void replace(FileStoreTable newTable) throws Exception {
216220
write = (BaseAppendFileStoreWrite) newTable.store().newWrite(commitUser);
217221
write.restore((List) states);
218222
}
219-
220-
protected void refreshWrite() {
221-
WriterRefresher.doRefresh(table, write, (newTable, writer) -> replace(newTable));
222-
}
223223
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ public void processElement(StreamRecord<Tuple2<InternalRow, Integer>> element)
6060
@Override
6161
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
6262
throws IOException {
63-
List<Committable> committables = super.prepareCommit(waitCompaction, checkpointId);
64-
65-
refreshWrite(table, write);
66-
return committables;
63+
return super.prepareCommit(waitCompaction, checkpointId);
6764
}
6865

6966
/** {@link StreamOperatorFactory} of {@link DynamicBucketRowWriteOperator}. */

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
2323
import org.apache.paimon.memory.MemorySegmentPool;
2424
import org.apache.paimon.options.Options;
25-
import org.apache.paimon.table.FileStoreTable;
2625

2726
import org.apache.flink.runtime.memory.MemoryManager;
2827
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -106,15 +105,6 @@ private void emitCommittables(boolean waitCompaction, long checkpointId) throws
106105
.forEach(committable -> output.collect(new StreamRecord<>(committable)));
107106
}
108107

109-
protected void refreshWrite(FileStoreTable table, StoreSinkWrite write) {
110-
WriterRefresher.doRefresh(
111-
table,
112-
write,
113-
(newTable, writer) -> {
114-
writer.replace(newTable);
115-
});
116-
}
117-
118108
protected abstract List<OUT> prepareCommit(boolean waitCompaction, long checkpointId)
119109
throws IOException;
120110

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,6 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
221221
new LogOffsetCommittable(k, v))));
222222
}
223223

224-
refreshWrite(table, write);
225-
226224
return committables;
227225
}
228226

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.slf4j.Logger;
4141
import org.slf4j.LoggerFactory;
4242

43+
import javax.annotation.Nullable;
44+
4345
import java.io.IOException;
4446
import java.util.LinkedHashSet;
4547
import java.util.List;
@@ -68,6 +70,8 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
6870
private transient DataFileMetaSerializer dataFileMetaSerializer;
6971
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
7072

73+
protected transient @Nullable WriterRefresher<StoreSinkWrite> writeRefresher;
74+
7175
public StoreCompactOperator(
7276
StreamOperatorParameters<Committable> parameters,
7377
FileStoreTable table,
@@ -115,6 +119,18 @@ public void initializeState(StateInitializationContext context) throws Exception
115119
getContainingTask().getEnvironment().getIOManager(),
116120
memoryPool,
117121
getMetricGroup());
122+
123+
if (write.streamingMode()) {
124+
writeRefresher =
125+
new WriterRefresher<>(
126+
table,
127+
write,
128+
(newTable, writer) -> {
129+
writer.replace(newTable);
130+
});
131+
} else {
132+
writeRefresher = null;
133+
}
118134
}
119135

120136
@Override
@@ -170,7 +186,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
170186

171187
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
172188

173-
refreshWrite(table, write);
189+
tryRefreshWrite();
174190
return committables;
175191
}
176192

@@ -192,6 +208,13 @@ public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
192208
return waitToCompact;
193209
}
194210

211+
private void tryRefreshWrite() {
212+
if (writeRefresher != null) {
213+
writeRefresher.tryRefresh();
214+
table = writeRefresher.updatedTable();
215+
}
216+
}
217+
195218
/** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */
196219
public static class Factory extends PrepareCommitOperator.Factory<RowData, Committable> {
197220
private final FileStoreTable table;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, C
5858
protected transient StoreSinkWriteState state;
5959
protected transient StoreSinkWrite write;
6060

61+
protected transient @Nullable WriterRefresher<StoreSinkWrite> writeRefresher;
62+
6163
public TableWriteOperator(
6264
StreamOperatorParameters<Committable> parameters,
6365
FileStoreTable table,
@@ -97,6 +99,18 @@ public void initializeState(StateInitializationContext context) throws Exception
9799
if (writeRestore != null) {
98100
write.setWriteRestore(writeRestore);
99101
}
102+
103+
if (write.streamingMode()) {
104+
writeRefresher =
105+
new WriterRefresher<>(
106+
table,
107+
write,
108+
(newTable, writer) -> {
109+
writer.replace(newTable);
110+
});
111+
} else {
112+
writeRefresher = null;
113+
}
100114
}
101115

102116
public void setWriteRestore(@Nullable WriteRestore writeRestore) {
@@ -145,14 +159,23 @@ public void close() throws Exception {
145159
@Override
146160
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
147161
throws IOException {
148-
return write.prepareCommit(waitCompaction, checkpointId);
162+
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
163+
tryRefreshWrite();
164+
return committables;
149165
}
150166

151167
@VisibleForTesting
152168
public StoreSinkWrite getWrite() {
153169
return write;
154170
}
155171

172+
private void tryRefreshWrite() {
173+
if (writeRefresher != null) {
174+
writeRefresher.tryRefresh();
175+
table = writeRefresher.updatedTable();
176+
}
177+
}
178+
156179
/** {@link StreamOperatorFactory} of {@link TableWriteOperator}. */
157180
protected abstract static class Factory<IN>
158181
extends PrepareCommitOperator.Factory<IN, Committable> {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,57 +23,80 @@
2323
import org.apache.paimon.options.Options;
2424
import org.apache.paimon.schema.TableSchema;
2525
import org.apache.paimon.table.FileStoreTable;
26-
import org.apache.paimon.utils.StringUtils;
2726

2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

30+
import javax.annotation.Nullable;
31+
32+
import java.util.Arrays;
3133
import java.util.Map;
3234
import java.util.Objects;
3335
import java.util.Optional;
36+
import java.util.Set;
37+
import java.util.stream.Collectors;
3438

3539
/** Writer refresher for refresh write when configs changed. */
36-
public class WriterRefresher {
40+
public class WriterRefresher<T> {
3741

3842
private static final Logger LOG = LoggerFactory.getLogger(WriterRefresher.class);
3943

40-
public static <T> void doRefresh(FileStoreTable table, T write, Refresher<T> refresher) {
44+
@Nullable private final Set<String> configGroups;
45+
private final Refresher<T> refresher;
46+
private FileStoreTable table;
47+
private T write;
4148

42-
String refreshedConfigs =
49+
public WriterRefresher(FileStoreTable table, T write, Refresher<T> refresher) {
50+
this.table = table;
51+
this.write = write;
52+
this.refresher = refresher;
53+
String refreshDetectors =
4354
Options.fromMap(table.options())
44-
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
55+
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS);
56+
if (refreshDetectors == null) {
57+
configGroups = null;
58+
} else {
59+
configGroups = Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
60+
}
61+
}
4562

46-
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
47-
Optional<TableSchema> latestSchema = table.schemaManager().latest();
48-
if (latestSchema.isPresent() && latestSchema.get().id() > table.schema().id()) {
49-
try {
50-
Map<String, String> currentOptions =
51-
CoreOptions.fromMap(table.schema().options())
52-
.getSpecificOptions(refreshedConfigs.split(","));
53-
Map<String, String> newOptions =
54-
CoreOptions.fromMap(latestSchema.get().options())
55-
.getSpecificOptions(refreshedConfigs.split(","));
63+
public void tryRefresh() {
64+
if (configGroups == null) {
65+
return;
66+
}
5667

57-
if (!Objects.equals(newOptions, currentOptions)) {
58-
if (LOG.isDebugEnabled()) {
59-
LOG.debug(
60-
"table schema has changed, current schema-id:{}, try to update write with new schema-id:{}. "
61-
+ "current options:{}, new options:{}.",
62-
table.schema().id(),
63-
latestSchema.get().id(),
64-
currentOptions,
65-
newOptions);
66-
}
67-
FileStoreTable newTable = table.copy(newOptions);
68-
refresher.refresh(newTable, write);
68+
Optional<TableSchema> latestSchema = table.schemaManager().latest();
69+
if (latestSchema.isPresent() && latestSchema.get().id() > table.schema().id()) {
70+
try {
71+
Map<String, String> currentOptions =
72+
CoreOptions.fromMap(table.schema().options()).configGroups(configGroups);
73+
Map<String, String> newOptions =
74+
CoreOptions.fromMap(latestSchema.get().options())
75+
.configGroups(configGroups);
76+
77+
if (!Objects.equals(newOptions, currentOptions)) {
78+
if (LOG.isDebugEnabled()) {
79+
LOG.debug(
80+
"table schema has changed, current schema-id:{}, try to update write with new schema-id:{}. "
81+
+ "current options:{}, new options:{}.",
82+
table.schema().id(),
83+
latestSchema.get().id(),
84+
currentOptions,
85+
newOptions);
6986
}
70-
} catch (Exception e) {
71-
throw new RuntimeException("update write failed.", e);
87+
table = table.copy(newOptions);
88+
refresher.refresh(table, write);
7289
}
90+
} catch (Exception e) {
91+
throw new RuntimeException("update write failed.", e);
7392
}
7493
}
7594
}
7695

96+
public FileStoreTable updatedTable() {
97+
return table;
98+
}
99+
77100
/**
78101
* Refresher for refresh write when configs changed.
79102
*

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ public void testTableReadWriteWithChangedExternalPath() throws Exception {
275275
+ bucket
276276
+ "',"
277277
+ bucketKey
278-
+ "'sink.writer-refresh-detect-options' = 'data-file.external-paths,"
279-
+ "data-file.external-paths.strategy,data-file.external-paths.specific-fs',"
278+
+ "'sink.writer-refresh-detectors' = 'external-paths',"
280279
+ "'data-file.external-paths' = '"
281280
+ externalPaths1
282281
+ "',"

0 commit comments

Comments
 (0)