Skip to content

Commit f4f12fc

Browse files
committed
[flink] Optimize lookup table refresh with full load for snapshot backlog.
1 parent 5b9999a commit f4f12fc

File tree

7 files changed

+145
-11
lines changed

7 files changed

+145
-11
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@
9292
<td>Integer</td>
9393
<td>If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync.</td>
9494
</tr>
95+
<tr>
96+
<td><h5>lookup.refresh.full-load-threshold</h5></td>
97+
<td style="word-wrap: break-word;">2147483647</td>
98+
<td>Integer</td>
99+
<td>If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). </td>
100+
</tr>
95101
<tr>
96102
<td><h5>lookup.refresh.time-periods-blacklist</h5></td>
97103
<td style="word-wrap: break-word;">(none)</td>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,15 @@ public class FlinkConnectorOptions {
292292
+ "cache refreshing is forbidden. Blacklist format is start1->end1,start2->end2,... , "
293293
+ "and the time format is yyyy-MM-dd HH:mm. Only used when lookup table is FULL cache mode.");
294294

295+
public static final ConfigOption<Integer> LOOKUP_REFRESH_FULL_LOAD_THRESHOLD =
296+
ConfigOptions.key("lookup.refresh.full-load-threshold")
297+
.intType()
298+
.defaultValue(Integer.MAX_VALUE)
299+
.withDescription(
300+
"If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates "
301+
+ "and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. "
302+
+ "Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). ");
303+
295304
public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
296305
ConfigOptions.key("sink.savepoint.auto-tag")
297306
.booleanType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
7070
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
71+
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
7172
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
7273
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
7374
import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
@@ -98,6 +99,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
9899
private transient Duration refreshInterval;
99100
// timestamp when refreshing lookup table
100101
private transient long nextRefreshTime;
102+
// threshold for triggering full table reload when snapshots are pending
103+
private transient int refreshFullThreshold;
101104

102105
protected FunctionContext functionContext;
103106

@@ -175,6 +178,7 @@ private void open() throws Exception {
175178
this.refreshInterval =
176179
options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
177180
.orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
181+
this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD);
178182

179183
List<String> fieldNames = table.rowType().getFieldNames();
180184
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
@@ -332,18 +336,56 @@ void tryRefresh() throws Exception {
332336
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
333337
lookupTable.close();
334338
lookupTable.open();
339+
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
335340
// no need to refresh the lookup table because it is reopened
336341
return;
337342
}
338343
}
339344

340345
// 3. refresh lookup table
341346
if (shouldRefreshLookupTable()) {
342-
lookupTable.refresh();
347+
// Check if we should do full load (close and reopen table) instead of incremental
348+
// refresh
349+
boolean doFullLoad = shouldDoFullLoad();
350+
351+
if (doFullLoad) {
352+
LOG.info(
353+
"Doing full load for table {} instead of incremental refresh",
354+
table.name());
355+
lookupTable.close();
356+
lookupTable.open();
357+
} else {
358+
lookupTable.refresh();
359+
}
360+
343361
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
344362
}
345363
}
346364

365+
/**
366+
* Check if we should do full load instead of incremental refresh. This can improve performance
367+
* when there are many pending snapshots.
368+
*/
369+
@VisibleForTesting
370+
public boolean shouldDoFullLoad() {
371+
if (refreshFullThreshold <= 0 || refreshFullThreshold == Integer.MAX_VALUE) {
372+
return false;
373+
}
374+
375+
Long latestSnapshotId = ((FileStoreTable) table).snapshotManager().latestSnapshotId();
376+
Long nextSnapshotId = lookupTable.nextSnapshotId();
377+
if (latestSnapshotId == null || nextSnapshotId == null) {
378+
return false;
379+
}
380+
381+
LOG.info(
382+
"Check if should do full load, latestSnapshotId: {}, nextSnapshotId: {}, refreshFullThreshold: {}",
383+
latestSnapshotId,
384+
nextSnapshotId,
385+
refreshFullThreshold);
386+
return latestSnapshotId - nextSnapshotId + 1 >= refreshFullThreshold;
387+
}
388+
347389
private boolean shouldRefreshLookupTable() {
348390
if (nextRefreshTime > System.currentTimeMillis()) {
349391
return false;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
151151
this.cacheRowFilter = filter;
152152
}
153153

154+
@Override
155+
public Long nextSnapshotId() {
156+
return this.reader.nextSnapshotId();
157+
}
158+
154159
protected void init() throws Exception {
155160
this.stateFactory = createStateFactory();
156161
this.refreshExecutor =

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ public interface LookupTable extends Closeable {
4141
void refresh() throws Exception;
4242

4343
void specifyCacheRowFilter(Filter<InternalRow> filter);
44+
45+
Long nextSnapshotId();
4446
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
197197
this.cacheRowFilter = filter;
198198
}
199199

200+
@Override
201+
public Long nextSnapshotId() {
202+
return this.queryExecutor.nextSnapshotId();
203+
}
204+
200205
@Override
201206
public void close() throws IOException {
202207
if (queryExecutor != null) {
@@ -243,6 +248,10 @@ interface QueryExecutor extends Closeable {
243248
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;
244249

245250
void refresh();
251+
252+
default Long nextSnapshotId() {
253+
return Long.MAX_VALUE;
254+
}
246255
}
247256

248257
static class LocalQueryExecutor implements QueryExecutor {
@@ -334,6 +343,11 @@ void refreshSplit(DataSplit split) {
334343
numBuckets.put(partition, totalBuckets);
335344
}
336345

346+
@Override
347+
public Long nextSnapshotId() {
348+
return this.scan.checkpoint();
349+
}
350+
337351
@Override
338352
public void close() throws IOException {
339353
tableQuery.close();

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.paimon.types.RowType;
4444
import org.apache.paimon.utils.TraceableFileIO;
4545

46+
import org.apache.flink.table.data.RowData;
4647
import org.junit.jupiter.api.AfterEach;
4748
import org.junit.jupiter.api.BeforeEach;
4849
import org.junit.jupiter.api.Test;
@@ -91,16 +92,19 @@ public void before() throws Exception {
9192
}
9293

9394
private void createLookupFunction(boolean refreshAsync) throws Exception {
94-
createLookupFunction(true, false, false, refreshAsync);
95+
createLookupFunction(true, false, false, refreshAsync, null);
9596
}
9697

9798
private void createLookupFunction(
9899
boolean isPartition,
99100
boolean joinEqualPk,
100101
boolean dynamicPartition,
101-
boolean refreshAsync)
102+
boolean refreshAsync,
103+
Integer fullLoadThreshold)
102104
throws Exception {
103-
table = createFileStoreTable(isPartition, dynamicPartition, refreshAsync);
105+
table =
106+
createFileStoreTable(
107+
isPartition, dynamicPartition, refreshAsync, fullLoadThreshold);
104108
lookupFunction = createLookupFunction(table, joinEqualPk);
105109
lookupFunction.open(tempDir.toString());
106110
}
@@ -116,7 +120,11 @@ private FileStoreLookupFunction createLookupFunction(
116120
}
117121

118122
private FileStoreTable createFileStoreTable(
119-
boolean isPartition, boolean dynamicPartition, boolean refreshAsync) throws Exception {
123+
boolean isPartition,
124+
boolean dynamicPartition,
125+
boolean refreshAsync,
126+
Integer fullLoadThreshold)
127+
throws Exception {
120128
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
121129
Options conf = new Options();
122130
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
@@ -128,6 +136,10 @@ private FileStoreTable createFileStoreTable(
128136
conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, "max_pt()");
129137
}
130138

139+
if (fullLoadThreshold != null) {
140+
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD, fullLoadThreshold);
141+
}
142+
131143
RowType rowType =
132144
RowType.of(
133145
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
@@ -153,7 +165,7 @@ public void close() throws Exception {
153165

154166
@Test
155167
public void testCompatibilityForOldVersion() throws Exception {
156-
createLookupFunction(false, true, false, false);
168+
createLookupFunction(false, true, false, false, null);
157169
commit(writeCommit(1));
158170
PrimaryKeyPartialLookupTable lookupTable =
159171
(PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
@@ -174,7 +186,7 @@ public void testCompatibilityForOldVersion() throws Exception {
174186
@ParameterizedTest
175187
@ValueSource(booleans = {false, true})
176188
public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
177-
createLookupFunction(false, true, false, refreshAsync);
189+
createLookupFunction(false, true, false, refreshAsync, null);
178190
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
179191
QueryExecutor queryExecutor =
180192
((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor();
@@ -184,7 +196,7 @@ public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
184196
@ParameterizedTest
185197
@ValueSource(booleans = {false, true})
186198
public void testDefaultRemotePartial(boolean refreshAsync) throws Exception {
187-
createLookupFunction(false, true, false, refreshAsync);
199+
createLookupFunction(false, true, false, refreshAsync, null);
188200
ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
189201
serviceManager.resetService(
190202
PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new InetSocketAddress(1)});
@@ -232,7 +244,7 @@ public void testLookupExpiredSnapshot(boolean refreshAsync) throws Exception {
232244

233245
@Test
234246
public void testLookupDynamicPartition() throws Exception {
235-
createLookupFunction(true, false, true, false);
247+
createLookupFunction(true, false, true, false, null);
236248
commit(writeCommit(1));
237249
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
238250
assertThat(
@@ -252,7 +264,7 @@ public void testLookupDynamicPartition() throws Exception {
252264

253265
@Test
254266
public void testParseWrongTimePeriodsBlacklist() throws Exception {
255-
FileStoreTable table = createFileStoreTable(false, false, false);
267+
FileStoreTable table = createFileStoreTable(false, false, false, null);
256268

257269
FileStoreTable table1 =
258270
table.copy(
@@ -299,7 +311,7 @@ public void testCheckRefreshInBlacklist() throws Exception {
299311
String right = end.atZone(ZoneId.systemDefault()).format(formatter);
300312

301313
FileStoreTable table =
302-
createFileStoreTable(false, false, false)
314+
createFileStoreTable(false, false, false, null)
303315
.copy(
304316
Collections.singletonMap(
305317
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
@@ -312,6 +324,50 @@ public void testCheckRefreshInBlacklist() throws Exception {
312324
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli() + 1);
313325
}
314326

327+
@ParameterizedTest
328+
@ValueSource(booleans = {false, true})
329+
public void testLookupTableWithFullLoad(boolean joinEqualPk) throws Exception {
330+
createLookupFunction(false, joinEqualPk, false, false, 3);
331+
332+
if (joinEqualPk) {
333+
assertThat(lookupFunction.lookupTable())
334+
.isInstanceOf(PrimaryKeyPartialLookupTable.class);
335+
} else {
336+
assertThat(lookupFunction.lookupTable()).isInstanceOf(FullCacheLookupTable.class);
337+
}
338+
339+
GenericRow expectedRow = GenericRow.of(1, 1, 1L);
340+
StreamTableWrite writer = table.newStreamWriteBuilder().newWrite();
341+
writer.write(expectedRow);
342+
commit(writer.prepareCommit(true, 1));
343+
344+
List<RowData> result =
345+
new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 1L))));
346+
assertThat(result).size().isEqualTo(1);
347+
RowData resultRow = result.get(0);
348+
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
349+
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
350+
351+
// Create more commits to exceed threshold (3 more to have gap > 3)
352+
for (int i = 2; i < 6; i++) {
353+
writer.write(GenericRow.of(i, i, (long) i));
354+
commit(writer.prepareCommit(true, i));
355+
}
356+
writer.close();
357+
358+
// wait refresh
359+
Thread.sleep(2000);
360+
361+
expectedRow = GenericRow.of(5, 5, 5L);
362+
assertThat(lookupFunction.shouldDoFullLoad()).isTrue();
363+
lookupFunction.tryRefresh();
364+
result = new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(5, 5, 5L))));
365+
assertThat(result).size().isEqualTo(1);
366+
resultRow = result.get(0);
367+
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
368+
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
369+
}
370+
315371
private void commit(List<CommitMessage> messages) throws Exception {
316372
TableCommitImpl commit = table.newCommit(commitUser);
317373
commit.commit(messages);

0 commit comments

Comments
 (0)