Skip to content

Commit 83eee6f

Browse files
committed
try refresh if streaming
1 parent 64aea6f commit 83eee6f

12 files changed

+81
-38
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,10 +327,10 @@
327327
<td>Sink writer memory to control heap memory of writer.</td>
328328
</tr>
329329
<tr>
330-
<td><h5>sink.writer-refresh-detect-options</h5></td>
330+
<td><h5>sink.writer-refresh-detectors</h5></td>
331331
<td style="word-wrap: break-word;">(none)</td>
332332
<td>String</td>
333-
<td>the options which are expected to be refreshed when streaming writing, multiple options separated by commas.</td>
333+
<td>The option groups which are expected to be refreshed when streaming writing, multiple option group separated by commas. Now only 'external-paths' is supported.</td>
334334
</tr>
335335
<tr>
336336
<td><h5>source.checkpoint-align.enabled</h5></td>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ public class AppendTableCompactor {
6464
@Nullable private final transient CompactionMetrics compactionMetrics;
6565
@Nullable private final transient CompactionMetrics.Reporter metricsReporter;
6666

67-
protected final transient WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
67+
@Nullable protected final transient WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
6868

6969
public AppendTableCompactor(
7070
FileStoreTable table,
7171
String commitUser,
7272
Supplier<ExecutorService> lazyCompactExecutor,
73-
@Nullable MetricGroup metricGroup) {
73+
@Nullable MetricGroup metricGroup,
74+
boolean isStreaming) {
7475
this.table = table;
7576
this.commitUser = commitUser;
7677
this.write = (BaseAppendFileStoreWrite) table.store().newWrite(commitUser);
@@ -85,8 +86,12 @@ public AppendTableCompactor(
8586
? null
8687
// partition and bucket fields are no use.
8788
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
88-
this.writeRefresher =
89-
new WriterRefresher<>(table, write, (newTable, writer) -> replace(newTable));
89+
if (isStreaming) {
90+
this.writeRefresher =
91+
new WriterRefresher<>(table, write, (newTable, writer) -> replace(newTable));
92+
} else {
93+
this.writeRefresher = null;
94+
}
9095
}
9196

9297
public void processElement(AppendCompactTask task) throws Exception {
@@ -196,8 +201,6 @@ public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId
196201
tempList.stream()
197202
.map(s -> new Committable(checkpointId, Committable.Kind.FILE, s))
198203
.collect(Collectors.toList());
199-
200-
writeRefresher.tryRefresh();
201204
return committables;
202205
} catch (InterruptedException e) {
203206
throw new RuntimeException("Interrupted while waiting tasks done.", e);
@@ -211,13 +214,21 @@ public Iterable<Future<CommitMessage>> result() {
211214
}
212215

213216
private void replace(FileStoreTable newTable) throws Exception {
214-
if (commitUser == null) {
215-
return;
216-
}
217217

218218
List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
219219
write.close();
220220
write = (BaseAppendFileStoreWrite) newTable.store().newWrite(commitUser);
221221
write.restore((List) states);
222222
}
223+
224+
public void tryRefreshWrite() {
225+
if (commitUser == null) {
226+
return;
227+
}
228+
229+
if (writeRefresher != null) {
230+
writeRefresher.tryRefresh();
231+
table = writeRefresher.updatedTable();
232+
}
233+
}
223234
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ public class AppendBypassCompactWorkerOperator
3434
private AppendBypassCompactWorkerOperator(
3535
StreamOperatorParameters<Committable> parameters,
3636
FileStoreTable table,
37-
String commitUser) {
38-
super(parameters, table, commitUser);
37+
String commitUser,
38+
boolean isStreaming) {
39+
super(parameters, table, commitUser, isStreaming);
3940
}
4041

4142
@Override
@@ -57,15 +58,17 @@ public void processElement(StreamRecord<Either<Committable, AppendCompactTask>>
5758
public static class Factory
5859
extends AppendCompactWorkerOperator.Factory<Either<Committable, AppendCompactTask>> {
5960

60-
public Factory(FileStoreTable table, String initialCommitUser) {
61-
super(table, initialCommitUser);
61+
public Factory(FileStoreTable table, String initialCommitUser, boolean isStreaming) {
62+
super(table, initialCommitUser, isStreaming);
6263
}
6364

6465
@Override
6566
@SuppressWarnings("unchecked")
6667
public <T extends StreamOperator<Committable>> T createStreamOperator(
6768
StreamOperatorParameters<Committable> parameters) {
68-
return (T) new AppendBypassCompactWorkerOperator(parameters, table, commitUser);
69+
return (T)
70+
new AppendBypassCompactWorkerOperator(
71+
parameters, table, commitUser, isStreaming);
6972
}
7073

7174
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,17 @@ public abstract class AppendCompactWorkerOperator<IN>
5555

5656
private transient ExecutorService lazyCompactExecutor;
5757

58+
private final boolean isStreaming;
59+
5860
public AppendCompactWorkerOperator(
5961
StreamOperatorParameters<Committable> parameters,
6062
FileStoreTable table,
61-
String commitUser) {
63+
String commitUser,
64+
boolean isStreaming) {
6265
super(parameters, Options.fromMap(table.options()));
6366
this.table = table;
6467
this.commitUser = commitUser;
68+
this.isStreaming = isStreaming;
6569
}
6670

6771
@VisibleForTesting
@@ -73,13 +77,17 @@ Iterable<Future<CommitMessage>> result() {
7377
public void open() throws Exception {
7478
LOG.debug("Opened a append-only table compaction worker.");
7579
this.unawareBucketCompactor =
76-
new AppendTableCompactor(table, commitUser, this::workerExecutor, getMetricGroup());
80+
new AppendTableCompactor(
81+
table, commitUser, this::workerExecutor, getMetricGroup(), isStreaming);
7782
}
7883

7984
@Override
8085
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
8186
throws IOException {
82-
return this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId);
87+
List<Committable> committables =
88+
this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId);
89+
this.unawareBucketCompactor.tryRefreshWrite();
90+
return committables;
8391
}
8492

8593
private ExecutorService workerExecutor() {
@@ -111,11 +119,13 @@ protected abstract static class Factory<IN>
111119
extends PrepareCommitOperator.Factory<IN, Committable> {
112120
protected final FileStoreTable table;
113121
protected final String commitUser;
122+
protected final boolean isStreaming;
114123

115-
protected Factory(FileStoreTable table, String commitUser) {
124+
protected Factory(FileStoreTable table, String commitUser, boolean isStreaming) {
116125
super(Options.fromMap(table.options()));
117126
this.table = table;
118127
this.commitUser = commitUser;
128+
this.isStreaming = isStreaming;
119129
}
120130
}
121131
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,18 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
6565

6666
private transient Catalog catalog;
6767

68+
private final boolean isStreaming;
69+
6870
private AppendOnlyMultiTableCompactionWorkerOperator(
6971
StreamOperatorParameters<MultiTableCommittable> parameters,
7072
CatalogLoader catalogLoader,
7173
String commitUser,
72-
Options options) {
74+
Options options,
75+
boolean isStreaming) {
7376
super(parameters, options);
7477
this.commitUser = commitUser;
7578
this.catalogLoader = catalogLoader;
79+
this.isStreaming = isStreaming;
7680
}
7781

7882
@Override
@@ -119,7 +123,8 @@ private AppendTableCompactor compactor(Identifier tableId) {
119123
(FileStoreTable) catalog.getTable(tableId).copy(options.toMap()),
120124
commitUser,
121125
this::workerExecutor,
122-
getMetricGroup());
126+
getMetricGroup(),
127+
isStreaming);
123128
} catch (Catalog.TableNotExistException e) {
124129
throw new RuntimeException(e);
125130
}
@@ -188,11 +193,17 @@ public static class Factory
188193

189194
private final String commitUser;
190195
private final CatalogLoader catalogLoader;
196+
private final boolean isStreaming;
191197

192-
public Factory(CatalogLoader catalogLoader, String commitUser, Options options) {
198+
public Factory(
199+
CatalogLoader catalogLoader,
200+
String commitUser,
201+
Options options,
202+
boolean isStreaming) {
193203
super(options);
194204
this.commitUser = commitUser;
195205
this.catalogLoader = catalogLoader;
206+
this.isStreaming = isStreaming;
196207
}
197208

198209
@Override
@@ -201,7 +212,7 @@ public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
201212
StreamOperatorParameters<MultiTableCommittable> parameters) {
202213
return (T)
203214
new AppendOnlyMultiTableCompactionWorkerOperator(
204-
parameters, catalogLoader, commitUser, options);
215+
parameters, catalogLoader, commitUser, options, isStreaming);
205216
}
206217

207218
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ public class AppendOnlySingleTableCompactionWorkerOperator
3737
private AppendOnlySingleTableCompactionWorkerOperator(
3838
StreamOperatorParameters<Committable> parameters,
3939
FileStoreTable table,
40-
String commitUser) {
41-
super(parameters, table, commitUser);
40+
String commitUser,
41+
boolean isStreaming) {
42+
super(parameters, table, commitUser, isStreaming);
4243
}
4344

4445
@Override
@@ -49,8 +50,8 @@ public void processElement(StreamRecord<AppendCompactTask> element) throws Excep
4950
/** {@link StreamOperatorFactory} of {@link AppendOnlySingleTableCompactionWorkerOperator}. */
5051
public static class Factory extends AppendCompactWorkerOperator.Factory<AppendCompactTask> {
5152

52-
public Factory(FileStoreTable table, String initialCommitUser) {
53-
super(table, initialCommitUser);
53+
public Factory(FileStoreTable table, String initialCommitUser, boolean isStreaming) {
54+
super(table, initialCommitUser, isStreaming);
5455
}
5556

5657
@Override
@@ -59,7 +60,7 @@ public <T extends StreamOperator<Committable>> T createStreamOperator(
5960
StreamOperatorParameters<Committable> parameters) {
6061
return (T)
6162
new AppendOnlySingleTableCompactionWorkerOperator(
62-
parameters, table, commitUser);
63+
parameters, table, commitUser, isStreaming);
6364
}
6465

6566
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableCompactSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,25 @@
3030
public class AppendTableCompactSink extends FlinkSink<AppendCompactTask> {
3131

3232
private final FileStoreTable table;
33+
private final boolean isStreaming;
3334

34-
public AppendTableCompactSink(FileStoreTable table) {
35+
public AppendTableCompactSink(FileStoreTable table, boolean isStreaming) {
3536
super(table, true);
3637
this.table = table;
38+
this.isStreaming = isStreaming;
3739
}
3840

3941
public static DataStreamSink<?> sink(
4042
FileStoreTable table, DataStream<AppendCompactTask> input) {
41-
return new AppendTableCompactSink(table).sinkFrom(input);
43+
boolean isStreaming = isStreaming(input);
44+
return new AppendTableCompactSink(table, isStreaming).sinkFrom(input);
4245
}
4346

4447
@Override
4548
protected OneInputStreamOperatorFactory<AppendCompactTask, Committable>
4649
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
47-
return new AppendOnlySingleTableCompactionWorkerOperator.Factory(table, commitUser);
50+
return new AppendOnlySingleTableCompactionWorkerOperator.Factory(
51+
table, commitUser, isStreaming);
4852
}
4953

5054
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public DataStream<Committable> doWrite(
115115
"Compact Worker: " + table.name(),
116116
new CommittableTypeInfo(),
117117
new AppendBypassCompactWorkerOperator.Factory(
118-
table, initialCommitUser))
118+
table, initialCommitUser, true))
119119
.startNewChain();
120120
setParallelism(newWritten, written.getParallelism(), false);
121121
written = newWritten;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public DataStream<MultiTableCommittable> doWrite(
120120
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
121121
new MultiTableCommittableTypeInfo(),
122122
new AppendOnlyMultiTableCompactionWorkerOperator.Factory(
123-
catalogLoader, commitUser, options));
123+
catalogLoader, commitUser, options, isStreaming));
124124
forwardParallelism(unawareBucketTableRewriter, unawareBucketTableSource);
125125

126126
if (!isStreaming) {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendTableCompactorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public Counter counter(String name) {
100100
counterMap.put(name, counter);
101101
return counter;
102102
}
103-
});
103+
},
104+
false);
104105

105106
for (int i = 0; i < 320; i++) {
106107
unawareBucketCompactor.processElement(new MockCompactTask());

0 commit comments

Comments
 (0)