Skip to content

Commit 792cd12

Browse files
committed
fix comments
1 parent fea7560 commit 792cd12

File tree

8 files changed

+101
-80
lines changed

8 files changed

+101
-80
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2727
import org.apache.paimon.flink.sink.Committable;
28-
import org.apache.paimon.flink.sink.CompactWriterRefresher;
28+
import org.apache.paimon.flink.sink.CompactRefresher;
2929
import org.apache.paimon.io.DataFileMeta;
3030
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
3131
import org.apache.paimon.operation.FileStoreWrite.State;
@@ -66,7 +66,7 @@ public class AppendTableCompactor {
6666
@Nullable private final CompactionMetrics compactionMetrics;
6767
@Nullable private final CompactionMetrics.Reporter metricsReporter;
6868

69-
@Nullable protected final CompactWriterRefresher compactWriteRefresher;
69+
@Nullable protected final CompactRefresher compactRefresher;
7070

7171
public AppendTableCompactor(
7272
FileStoreTable table,
@@ -92,8 +92,7 @@ public AppendTableCompactor(
9292
? null
9393
// partition and bucket fields are no use.
9494
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
95-
this.compactWriteRefresher =
96-
CompactWriterRefresher.create(isStreaming, table, this::replace);
95+
this.compactRefresher = CompactRefresher.create(isStreaming, table, this::replace);
9796
}
9897

9998
public void processElement(AppendCompactTask task) throws Exception {
@@ -225,8 +224,8 @@ public void tryRefreshWrite(List<DataFileMeta> files) {
225224
if (commitUser == null) {
226225
return;
227226
}
228-
if (compactWriteRefresher != null && (!files.isEmpty())) {
229-
compactWriteRefresher.tryRefresh(files);
227+
if (compactRefresher != null && (!files.isEmpty())) {
228+
compactRefresher.tryRefresh(files);
230229
}
231230
}
232231
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactWriterRefresher.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,30 @@
3131
import java.util.List;
3232
import java.util.Optional;
3333

34-
import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
34+
import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
3535

36-
/** Writer refresher for dedicated compaction. */
37-
public class CompactWriterRefresher {
36+
/** refresh write when table schema changed. This is for dedicated compaction. */
37+
public class CompactRefresher {
3838

39-
private static final Logger LOG = LoggerFactory.getLogger(CompactWriterRefresher.class);
39+
private static final Logger LOG = LoggerFactory.getLogger(CompactRefresher.class);
4040

4141
private FileStoreTable table;
42-
private final WriterRefresher.Refresher refresher;
43-
private final WriterRefresher writerRefresher;
42+
private final WriteRefresher refresher;
43+
private final ConfigRefresher configRefresher;
4444

45-
private CompactWriterRefresher(FileStoreTable table, WriterRefresher.Refresher refresher) {
45+
private CompactRefresher(FileStoreTable table, WriteRefresher refresher) {
4646
this.table = table;
4747
this.refresher = refresher;
48-
this.writerRefresher = WriterRefresher.create(true, table, refresher);
48+
this.configRefresher = ConfigRefresher.create(true, table, refresher);
4949
}
5050

5151
@Nullable
52-
public static CompactWriterRefresher create(
53-
boolean isStreaming, FileStoreTable table, WriterRefresher.Refresher refresher) {
52+
public static CompactRefresher create(
53+
boolean isStreaming, FileStoreTable table, WriteRefresher refresher) {
5454
if (!isStreaming) {
5555
return null;
5656
}
57-
return new CompactWriterRefresher(table, refresher);
57+
return new CompactRefresher(table, refresher);
5858
}
5959

6060
/**
@@ -79,13 +79,13 @@ public void tryRefresh(List<DataFileMeta> files) {
7979
table = table.copyWithLatestSchema();
8080

8181
// refresh configs allowed to be updated by the way
82-
if (writerRefresher != null) {
82+
if (configRefresher != null) {
8383
table =
8484
table.copy(
8585
configGroups(
86-
writerRefresher.configGroups(),
86+
configRefresher.configGroups(),
8787
CoreOptions.fromMap(latest.options())));
88-
writerRefresher.updateTable(table);
88+
configRefresher.updateTable(table);
8989
}
9090

9191
refresher.refresh(table);
@@ -98,8 +98,8 @@ public void tryRefresh(List<DataFileMeta> files) {
9898

9999
} else {
100100
// try refresh for configs
101-
if (writerRefresher != null) {
102-
writerRefresher.tryRefresh();
101+
if (configRefresher != null) {
102+
configRefresher.tryRefresh();
103103
}
104104
}
105105
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,25 @@
4242
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
4343
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
4444

45-
/** Writer refresher for refresh write when configs changed. */
46-
public class WriterRefresher {
45+
/** refresh write when configs changed. */
46+
public class ConfigRefresher {
4747

48-
private static final Logger LOG = LoggerFactory.getLogger(WriterRefresher.class);
48+
private static final Logger LOG = LoggerFactory.getLogger(ConfigRefresher.class);
4949

5050
private FileStoreTable table;
51-
private final Refresher refresher;
51+
private final WriteRefresher refresher;
5252
private final Set<String> configGroups;
5353

54-
private WriterRefresher(FileStoreTable table, Refresher refresher, Set<String> configGroups) {
54+
private ConfigRefresher(
55+
FileStoreTable table, WriteRefresher refresher, Set<String> configGroups) {
5556
this.table = table;
5657
this.refresher = refresher;
5758
this.configGroups = configGroups;
5859
}
5960

6061
@Nullable
61-
public static WriterRefresher create(
62-
boolean isStreaming, FileStoreTable table, Refresher refresher) {
62+
public static ConfigRefresher create(
63+
boolean isStreaming, FileStoreTable table, WriteRefresher refresher) {
6364
if (!isStreaming) {
6465
return null;
6566
}
@@ -74,7 +75,7 @@ public static WriterRefresher create(
7475
if (configGroups == null || configGroups.isEmpty()) {
7576
return null;
7677
}
77-
return new WriterRefresher(table, refresher, configGroups);
78+
return new ConfigRefresher(table, refresher, configGroups);
7879
}
7980

8081
/**
@@ -117,11 +118,6 @@ public Set<String> configGroups() {
117118
return configGroups;
118119
}
119120

120-
/** Refresher when configs changed. */
121-
public interface Refresher {
122-
void refresh(FileStoreTable table) throws Exception;
123-
}
124-
125121
public static Map<String, String> configGroups(Set<String> groups, CoreOptions options) {
126122
Map<String, String> configs = new HashMap<>();
127123
// external-paths config group

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
7070
private transient DataFileMetaSerializer dataFileMetaSerializer;
7171
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
7272

73-
protected transient @Nullable CompactWriterRefresher compactWriterRefresher;
73+
protected transient @Nullable CompactRefresher compactRefresher;
7474

7575
public StoreCompactOperator(
7676
StreamOperatorParameters<Committable> parameters,
@@ -119,8 +119,8 @@ public void initializeState(StateInitializationContext context) throws Exception
119119
getContainingTask().getEnvironment().getIOManager(),
120120
memoryPoolFactory,
121121
getMetricGroup());
122-
this.compactWriterRefresher =
123-
CompactWriterRefresher.create(write.streamingMode(), table, write::replace);
122+
this.compactRefresher =
123+
CompactRefresher.create(write.streamingMode(), table, write::replace);
124124
}
125125

126126
@Override
@@ -197,8 +197,8 @@ public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
197197
}
198198

199199
private void tryRefreshWrite(List<DataFileMeta> files) {
200-
if (compactWriterRefresher != null) {
201-
compactWriterRefresher.tryRefresh(files);
200+
if (compactRefresher != null) {
201+
compactRefresher.tryRefresh(files);
202202
}
203203
}
204204

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, C
5858
protected transient StoreSinkWriteState state;
5959
protected transient StoreSinkWrite write;
6060

61-
protected transient @Nullable WriterRefresher writeRefresher;
61+
protected transient @Nullable ConfigRefresher configRefresher;
6262

6363
public TableWriteOperator(
6464
StreamOperatorParameters<Committable> parameters,
@@ -99,7 +99,7 @@ public void initializeState(StateInitializationContext context) throws Exception
9999
if (writeRestore != null) {
100100
write.setWriteRestore(writeRestore);
101101
}
102-
this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace);
102+
this.configRefresher = ConfigRefresher.create(write.streamingMode(), table, write::replace);
103103
}
104104

105105
public void setWriteRestore(@Nullable WriteRestore writeRestore) {
@@ -157,8 +157,8 @@ public StoreSinkWrite getWrite() {
157157
}
158158

159159
protected void tryRefreshWrite() {
160-
if (writeRefresher != null) {
161-
writeRefresher.tryRefresh();
160+
if (configRefresher != null) {
161+
configRefresher.tryRefresh();
162162
}
163163
}
164164

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.table.FileStoreTable;
22+
23+
/** refresher for refreshing write in streaming mode. */
24+
public interface WriteRefresher {
25+
void refresh(FileStoreTable table) throws Exception;
26+
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactWriterRefresherTest.java renamed to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@
4747
import java.util.Set;
4848
import java.util.stream.Collectors;
4949

50-
import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
50+
import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
5151
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
5252
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
5353
import static org.assertj.core.api.Assertions.assertThat;
5454

55-
/** Test for {@link CompactWriterRefresher}. */
56-
public class CompactWriterRefresherTest {
55+
/** Test for {@link CompactRefresher}. */
56+
public class CompactRefresherTest {
5757

5858
@TempDir public java.nio.file.Path tempDir;
5959

@@ -93,11 +93,11 @@ public void testRefreshFieldsAndConfigs() throws Exception {
9393
List<DataField> dataFields = new ArrayList<>();
9494
Map<String, String> refreshedOptions = new HashMap<>();
9595
Set<String> groups = Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
96-
CompactWriterRefresher writerRefresher =
97-
CompactWriterRefresher.create(
96+
CompactRefresher writerRefresher =
97+
CompactRefresher.create(
9898
true,
9999
table1,
100-
new WriterRefresherTest.TestWriteRefresher(
100+
new ConfigRefresherTest.TestWriteRefresher(
101101
groups, refreshedOptions, dataFields));
102102
writerRefresher.tryRefresh(
103103
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
@@ -123,11 +123,11 @@ public void testRefreshOnlyFields() throws Exception {
123123
}
124124

125125
List<DataField> dataFields = new ArrayList<>();
126-
CompactWriterRefresher writerRefresher =
127-
CompactWriterRefresher.create(
126+
CompactRefresher writerRefresher =
127+
CompactRefresher.create(
128128
true,
129129
table1,
130-
new WriterRefresherTest.TestWriteRefresher(
130+
new ConfigRefresherTest.TestWriteRefresher(
131131
null, Collections.emptyMap(), dataFields));
132132
writerRefresher.tryRefresh(
133133
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
@@ -151,11 +151,11 @@ public void testRefreshOnlyConfigs() throws Exception {
151151

152152
Map<String, String> refreshedOptions = new HashMap<>();
153153
Set<String> groups = Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
154-
CompactWriterRefresher writerRefresher =
155-
CompactWriterRefresher.create(
154+
CompactRefresher writerRefresher =
155+
CompactRefresher.create(
156156
true,
157157
table1,
158-
new WriterRefresherTest.TestWriteRefresher(groups, refreshedOptions, null));
158+
new ConfigRefresherTest.TestWriteRefresher(groups, refreshedOptions, null));
159159
writerRefresher.tryRefresh(Collections.emptyList());
160160
assertThat(refreshedOptions).isEqualTo(configGroups(groups, table2.coreOptions()));
161161
}

0 commit comments

Comments
 (0)