Skip to content

Commit 9c52f46

Browse files
committed
[flink] introduce WriterRefresher to refresh write
1 parent 643d80e commit 9c52f46

File tree

8 files changed

+214
-69
lines changed

8 files changed

+214
-69
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1969,10 +1969,8 @@ public Map<String, String> toMap() {
19691969
public Map<String, String> getSpecificOptions(String... keys) {
19701970
Map<String, String> result = new HashMap<>();
19711971
for (String key : keys) {
1972-
String value = options.get(key);
1973-
// if value is null, it means the key is not set explicitly
1974-
if (value != null) {
1975-
result.put(key, value);
1972+
if (options.containsKey(key)) {
1973+
result.put(key, options.get(key));
19761974
}
19771975
}
19781976
return result;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,19 @@
1818

1919
package org.apache.paimon.flink.compact;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.annotation.VisibleForTesting;
2322
import org.apache.paimon.append.AppendCompactTask;
2423
import org.apache.paimon.data.BinaryRow;
25-
import org.apache.paimon.flink.FlinkConnectorOptions;
2624
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2725
import org.apache.paimon.flink.sink.Committable;
26+
import org.apache.paimon.flink.sink.WriterRefresher;
2827
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
2928
import org.apache.paimon.operation.FileStoreWrite;
3029
import org.apache.paimon.operation.metrics.CompactionMetrics;
3130
import org.apache.paimon.operation.metrics.MetricUtils;
32-
import org.apache.paimon.options.Options;
33-
import org.apache.paimon.schema.TableSchema;
3431
import org.apache.paimon.table.FileStoreTable;
3532
import org.apache.paimon.table.sink.CommitMessage;
3633
import org.apache.paimon.table.sink.TableCommitImpl;
37-
import org.apache.paimon.utils.StringUtils;
3834

3935
import org.apache.flink.metrics.MetricGroup;
4036
import org.slf4j.Logger;
@@ -46,7 +42,6 @@
4642
import java.util.ArrayList;
4743
import java.util.LinkedList;
4844
import java.util.List;
49-
import java.util.Optional;
5045
import java.util.Queue;
5146
import java.util.concurrent.ExecutorService;
5247
import java.util.concurrent.Future;
@@ -198,7 +193,7 @@ public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId
198193
.map(s -> new Committable(checkpointId, Committable.Kind.FILE, s))
199194
.collect(Collectors.toList());
200195

201-
updateWriteWithNewSchema();
196+
refreshWrite();
202197
return committables;
203198
} catch (InterruptedException e) {
204199
throw new RuntimeException("Interrupted while waiting tasks done.", e);
@@ -222,29 +217,7 @@ private void replace(FileStoreTable newTable) throws Exception {
222217
write.restore((List) states);
223218
}
224219

225-
protected void updateWriteWithNewSchema() {
226-
// the configs to be refreshed
227-
String refreshedConfigs =
228-
Options.fromMap(table.options())
229-
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
230-
231-
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
232-
Optional<TableSchema> lastestSchema = table.schemaManager().latest();
233-
if (lastestSchema.isPresent() && lastestSchema.get().id() > table.schema().id()) {
234-
LOG.info(
235-
"table schema has changed, current schema-id:{}, try to update write with new schema-id:{}.",
236-
table.schema().id(),
237-
lastestSchema.get().id());
238-
try {
239-
CoreOptions newCoreOptions = new CoreOptions(lastestSchema.get().options());
240-
table =
241-
table.copy(
242-
newCoreOptions.getSpecificOptions(refreshedConfigs.split(",")));
243-
replace(table);
244-
} catch (Exception e) {
245-
throw new RuntimeException("update write failed.", e);
246-
}
247-
}
248-
}
220+
protected void refreshWrite() {
221+
WriterRefresher.doRefresh(table, write, (newTable, writer) -> replace(newTable));
249222
}
250223
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
6262
throws IOException {
6363
List<Committable> committables = super.prepareCommit(waitCompaction, checkpointId);
6464

65-
updateWriteWithNewSchema(table, write, state.getSubtaskId());
65+
refreshWrite(table, write);
6666
return committables;
6767
}
6868

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,11 @@
1818

1919
package org.apache.paimon.flink.sink;
2020

21-
import org.apache.paimon.CoreOptions;
22-
import org.apache.paimon.flink.FlinkConnectorOptions;
2321
import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
2422
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
2523
import org.apache.paimon.memory.MemorySegmentPool;
2624
import org.apache.paimon.options.Options;
27-
import org.apache.paimon.schema.TableSchema;
2825
import org.apache.paimon.table.FileStoreTable;
29-
import org.apache.paimon.utils.StringUtils;
3026

3127
import org.apache.flink.runtime.memory.MemoryManager;
3228
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -46,7 +42,6 @@
4642

4743
import java.io.IOException;
4844
import java.util.List;
49-
import java.util.Optional;
5045

5146
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
5247
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.computeManagedMemory;
@@ -111,32 +106,13 @@ private void emitCommittables(boolean waitCompaction, long checkpointId) throws
111106
.forEach(committable -> output.collect(new StreamRecord<>(committable)));
112107
}
113108

114-
protected void updateWriteWithNewSchema(
115-
FileStoreTable table, StoreSinkWrite write, int taskId) {
116-
// the configs to be refreshed
117-
String refreshedConfigs =
118-
Options.fromMap(table.options())
119-
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
120-
121-
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
122-
Optional<TableSchema> lastestSchema = table.schemaManager().latest();
123-
if (lastestSchema.isPresent() && lastestSchema.get().id() > table.schema().id()) {
124-
LOG.info(
125-
"task#{}: table schema has changed, current schema-id:{}, try to update write with new schema-id:{}.",
126-
taskId,
127-
table.schema().id(),
128-
lastestSchema.get().id());
129-
try {
130-
CoreOptions newCoreOptions = new CoreOptions(lastestSchema.get().options());
131-
table =
132-
table.copy(
133-
newCoreOptions.getSpecificOptions(refreshedConfigs.split(",")));
134-
write.replace(table);
135-
} catch (Exception e) {
136-
throw new RuntimeException("update write failed.", e);
137-
}
138-
}
139-
}
109+
protected void refreshWrite(FileStoreTable table, StoreSinkWrite write) {
110+
WriterRefresher.doRefresh(
111+
table,
112+
write,
113+
(newTable, writer) -> {
114+
writer.replace(newTable);
115+
});
140116
}
141117

142118
protected abstract List<OUT> prepareCommit(boolean waitCompaction, long checkpointId)

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
221221
new LogOffsetCommittable(k, v))));
222222
}
223223

224-
updateWriteWithNewSchema(table, write, state.getSubtaskId());
224+
refreshWrite(table, write);
225225

226226
return committables;
227227
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
170170

171171
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
172172

173-
updateWriteWithNewSchema(table, write, state.getSubtaskId());
173+
refreshWrite(table, write);
174174
return committables;
175175
}
176176

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.flink.FlinkConnectorOptions;
23+
import org.apache.paimon.options.Options;
24+
import org.apache.paimon.schema.TableSchema;
25+
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.utils.StringUtils;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.Map;
32+
import java.util.Objects;
33+
import java.util.Optional;
34+
35+
/** Writer refresher for refresh write when configs changed. */
36+
public class WriterRefresher {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(WriterRefresher.class);
39+
40+
public static <T> void doRefresh(FileStoreTable table, T write, Refresher<T> refresher) {
41+
42+
String refreshedConfigs =
43+
Options.fromMap(table.options())
44+
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS);
45+
46+
if (!StringUtils.isNullOrWhitespaceOnly(refreshedConfigs)) {
47+
Optional<TableSchema> latestSchema = table.schemaManager().latest();
48+
if (latestSchema.isPresent() && latestSchema.get().id() > table.schema().id()) {
49+
try {
50+
Map<String, String> currentOptions =
51+
CoreOptions.fromMap(table.schema().options())
52+
.getSpecificOptions(refreshedConfigs.split(","));
53+
Map<String, String> newOptions =
54+
CoreOptions.fromMap(latestSchema.get().options())
55+
.getSpecificOptions(refreshedConfigs.split(","));
56+
57+
if (!Objects.equals(newOptions, currentOptions)) {
58+
if (LOG.isDebugEnabled()) {
59+
LOG.debug(
60+
"table schema has changed, current schema-id:{}, try to update write with new schema-id:{}. "
61+
+ "current options:{}, new options:{}.",
62+
table.schema().id(),
63+
latestSchema.get().id(),
64+
currentOptions,
65+
newOptions);
66+
}
67+
FileStoreTable newTable = table.copy(newOptions);
68+
refresher.refresh(newTable, write);
69+
}
70+
} catch (Exception e) {
71+
throw new RuntimeException("update write failed.", e);
72+
}
73+
}
74+
}
75+
}
76+
77+
public interface Refresher<T> {
78+
void refresh(FileStoreTable table, T writer) throws Exception;
79+
}
80+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.Catalog;
23+
import org.apache.paimon.catalog.CatalogContext;
24+
import org.apache.paimon.catalog.CatalogFactory;
25+
import org.apache.paimon.catalog.Identifier;
26+
import org.apache.paimon.flink.FlinkConnectorOptions;
27+
import org.apache.paimon.options.Options;
28+
import org.apache.paimon.schema.Schema;
29+
import org.apache.paimon.schema.SchemaChange;
30+
import org.apache.paimon.table.FileStoreTable;
31+
import org.apache.paimon.types.DataTypes;
32+
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.io.TempDir;
36+
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
41+
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
42+
import static org.assertj.core.api.Assertions.assertThat;
43+
44+
/** Test for {@link WriterRefresher}. */
45+
public class WriterRefresherTest {
46+
@TempDir public java.nio.file.Path tempDir;
47+
48+
Catalog catalog;
49+
50+
@BeforeEach
51+
public void before() throws Exception {
52+
Options options = new Options();
53+
options.set(WAREHOUSE, tempDir.toString());
54+
options.set(CACHE_ENABLED, false);
55+
CatalogContext context = CatalogContext.create(options);
56+
catalog = CatalogFactory.createCatalog(context);
57+
catalog.createDatabase("default", true);
58+
}
59+
60+
@Test
61+
public void testDoRefresh() throws Exception {
62+
String detectOptions =
63+
"data-file.external-paths,data-file.external-paths.strategy,data-file.external-paths.specific-fs";
64+
Map<String, String> options = new HashMap<>();
65+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS.key(), detectOptions);
66+
createTable(options);
67+
68+
FileStoreTable table1 = getTable();
69+
70+
table1.schemaManager()
71+
.commitChanges(
72+
SchemaChange.setOption(
73+
FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECT_OPTIONS.key(),
74+
detectOptions),
75+
SchemaChange.setOption(
76+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"),
77+
SchemaChange.setOption(
78+
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
79+
"round-robin"));
80+
FileStoreTable table2 = getTable();
81+
82+
Map<String, String> refreshedOptions = new HashMap<>();
83+
WriterRefresher.doRefresh(
84+
table1, refreshedOptions, new TestWriteRefresher(detectOptions.split(",")));
85+
assertThat(refreshedOptions)
86+
.isEqualTo(table2.coreOptions().getSpecificOptions(detectOptions.split(",")));
87+
}
88+
89+
private void createTable(Map<String, String> options) throws Exception {
90+
catalog.createTable(
91+
Identifier.create("default", "T"),
92+
Schema.newBuilder()
93+
.column("a", DataTypes.INT())
94+
.column("b", DataTypes.INT())
95+
.options(options)
96+
.build(),
97+
false);
98+
}
99+
100+
private FileStoreTable getTable() throws Exception {
101+
return (FileStoreTable) catalog.getTable(Identifier.create("default", "T"));
102+
}
103+
104+
private static class TestWriteRefresher
105+
implements WriterRefresher.Refresher<Map<String, String>> {
106+
String[] specificOptions;
107+
108+
TestWriteRefresher(String[] specificOptions) {
109+
this.specificOptions = specificOptions;
110+
}
111+
112+
@Override
113+
public void refresh(FileStoreTable table, Map<String, String> options) throws Exception {
114+
options.clear();
115+
options.putAll(table.coreOptions().getSpecificOptions(specificOptions));
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)