Skip to content

Commit f00936f

Browse files
committed
[flink] Optimize lookup table refresh with full load for snapshot backlog.
1 parent 5b9999a commit f00936f

File tree

6 files changed

+139
-11
lines changed

6 files changed

+139
-11
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,15 @@ public class FlinkConnectorOptions {
292292
+ "cache refreshing is forbidden. Blacklist format is start1->end1,start2->end2,... , "
293293
+ "and the time format is yyyy-MM-dd HH:mm. Only used when lookup table is FULL cache mode.");
294294

295+
public static final ConfigOption<Integer> LOOKUP_REFRESH_FULL_LOAD_THRESHOLD =
296+
ConfigOptions.key("lookup.refresh.full-load-threshold")
297+
.intType()
298+
.defaultValue(Integer.MAX_VALUE)
299+
.withDescription(
300+
"If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates "
301+
+ "and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. "
302+
+ "Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). ");
303+
295304
public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
296305
ConfigOptions.key("sink.savepoint.auto-tag")
297306
.booleanType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
7070
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
71+
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
7172
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
7273
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
7374
import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
@@ -98,6 +99,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
9899
private transient Duration refreshInterval;
99100
// timestamp when refreshing lookup table
100101
private transient long nextRefreshTime;
102+
// threshold for triggering full table reload when snapshots are pending
103+
private transient int refreshFullThreshold;
101104

102105
protected FunctionContext functionContext;
103106

@@ -175,6 +178,7 @@ private void open() throws Exception {
175178
this.refreshInterval =
176179
options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
177180
.orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
181+
this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD);
178182

179183
List<String> fieldNames = table.rowType().getFieldNames();
180184
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
@@ -332,18 +336,56 @@ void tryRefresh() throws Exception {
332336
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
333337
lookupTable.close();
334338
lookupTable.open();
339+
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
335340
// no need to refresh the lookup table because it is reopened
336341
return;
337342
}
338343
}
339344

340345
// 3. refresh lookup table
341346
if (shouldRefreshLookupTable()) {
342-
lookupTable.refresh();
347+
// Check if we should do full load (close and reopen table) instead of incremental
348+
// refresh
349+
boolean doFullLoad = shouldDoFullLoad();
350+
351+
if (doFullLoad) {
352+
LOG.info(
353+
"Doing full load for table {} instead of incremental refresh",
354+
table.name());
355+
lookupTable.close();
356+
lookupTable.open();
357+
} else {
358+
lookupTable.refresh();
359+
}
360+
343361
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
344362
}
345363
}
346364

365+
/**
366+
* Check if we should do full load instead of incremental refresh. This can improve performance
367+
* when there are many pending snapshots.
368+
*/
369+
@VisibleForTesting
370+
public boolean shouldDoFullLoad() {
371+
if (refreshFullThreshold <= 0 || refreshFullThreshold == Integer.MAX_VALUE) {
372+
return false;
373+
}
374+
375+
Long latestSnapshotId = ((FileStoreTable) table).snapshotManager().latestSnapshotId();
376+
Long nextSnapshotId = lookupTable.nextSnapshotId();
377+
if (latestSnapshotId == null || nextSnapshotId == null) {
378+
return false;
379+
}
380+
381+
LOG.info(
382+
"Check if should do full load, latestSnapshotId: {}, nextSnapshotId: {}, refreshFullThreshold: {}",
383+
latestSnapshotId,
384+
nextSnapshotId,
385+
refreshFullThreshold);
386+
return latestSnapshotId - nextSnapshotId + 1 >= refreshFullThreshold;
387+
}
388+
347389
private boolean shouldRefreshLookupTable() {
348390
if (nextRefreshTime > System.currentTimeMillis()) {
349391
return false;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
151151
this.cacheRowFilter = filter;
152152
}
153153

154+
@Override
155+
public Long nextSnapshotId() {
156+
return this.reader.nextSnapshotId();
157+
}
158+
154159
protected void init() throws Exception {
155160
this.stateFactory = createStateFactory();
156161
this.refreshExecutor =

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ public interface LookupTable extends Closeable {
4141
void refresh() throws Exception;
4242

4343
void specifyCacheRowFilter(Filter<InternalRow> filter);
44+
45+
Long nextSnapshotId();
4446
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
197197
this.cacheRowFilter = filter;
198198
}
199199

200+
@Override
201+
public Long nextSnapshotId() {
202+
return this.queryExecutor.nextSnapshotId();
203+
}
204+
200205
@Override
201206
public void close() throws IOException {
202207
if (queryExecutor != null) {
@@ -243,6 +248,10 @@ interface QueryExecutor extends Closeable {
243248
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;
244249

245250
void refresh();
251+
252+
default Long nextSnapshotId() {
253+
return Long.MAX_VALUE;
254+
}
246255
}
247256

248257
static class LocalQueryExecutor implements QueryExecutor {
@@ -334,6 +343,11 @@ void refreshSplit(DataSplit split) {
334343
numBuckets.put(partition, totalBuckets);
335344
}
336345

346+
@Override
347+
public Long nextSnapshotId() {
348+
return this.scan.checkpoint();
349+
}
350+
337351
@Override
338352
public void close() throws IOException {
339353
tableQuery.close();

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.paimon.types.RowType;
4444
import org.apache.paimon.utils.TraceableFileIO;
4545

46+
import org.apache.flink.table.data.RowData;
4647
import org.junit.jupiter.api.AfterEach;
4748
import org.junit.jupiter.api.BeforeEach;
4849
import org.junit.jupiter.api.Test;
@@ -91,16 +92,19 @@ public void before() throws Exception {
9192
}
9293

9394
private void createLookupFunction(boolean refreshAsync) throws Exception {
94-
createLookupFunction(true, false, false, refreshAsync);
95+
createLookupFunction(true, false, false, refreshAsync, null);
9596
}
9697

9798
private void createLookupFunction(
9899
boolean isPartition,
99100
boolean joinEqualPk,
100101
boolean dynamicPartition,
101-
boolean refreshAsync)
102+
boolean refreshAsync,
103+
Integer fullLoadThreshold)
102104
throws Exception {
103-
table = createFileStoreTable(isPartition, dynamicPartition, refreshAsync);
105+
table =
106+
createFileStoreTable(
107+
isPartition, dynamicPartition, refreshAsync, fullLoadThreshold);
104108
lookupFunction = createLookupFunction(table, joinEqualPk);
105109
lookupFunction.open(tempDir.toString());
106110
}
@@ -116,7 +120,11 @@ private FileStoreLookupFunction createLookupFunction(
116120
}
117121

118122
private FileStoreTable createFileStoreTable(
119-
boolean isPartition, boolean dynamicPartition, boolean refreshAsync) throws Exception {
123+
boolean isPartition,
124+
boolean dynamicPartition,
125+
boolean refreshAsync,
126+
Integer fullLoadThreshold)
127+
throws Exception {
120128
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
121129
Options conf = new Options();
122130
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
@@ -128,6 +136,10 @@ private FileStoreTable createFileStoreTable(
128136
conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, "max_pt()");
129137
}
130138

139+
if (fullLoadThreshold != null) {
140+
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD, fullLoadThreshold);
141+
}
142+
131143
RowType rowType =
132144
RowType.of(
133145
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
@@ -153,7 +165,7 @@ public void close() throws Exception {
153165

154166
@Test
155167
public void testCompatibilityForOldVersion() throws Exception {
156-
createLookupFunction(false, true, false, false);
168+
createLookupFunction(false, true, false, false, null);
157169
commit(writeCommit(1));
158170
PrimaryKeyPartialLookupTable lookupTable =
159171
(PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
@@ -174,7 +186,7 @@ public void testCompatibilityForOldVersion() throws Exception {
174186
@ParameterizedTest
175187
@ValueSource(booleans = {false, true})
176188
public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
177-
createLookupFunction(false, true, false, refreshAsync);
189+
createLookupFunction(false, true, false, refreshAsync, null);
178190
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
179191
QueryExecutor queryExecutor =
180192
((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor();
@@ -184,7 +196,7 @@ public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
184196
@ParameterizedTest
185197
@ValueSource(booleans = {false, true})
186198
public void testDefaultRemotePartial(boolean refreshAsync) throws Exception {
187-
createLookupFunction(false, true, false, refreshAsync);
199+
createLookupFunction(false, true, false, refreshAsync, null);
188200
ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
189201
serviceManager.resetService(
190202
PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new InetSocketAddress(1)});
@@ -232,7 +244,7 @@ public void testLookupExpiredSnapshot(boolean refreshAsync) throws Exception {
232244

233245
@Test
234246
public void testLookupDynamicPartition() throws Exception {
235-
createLookupFunction(true, false, true, false);
247+
createLookupFunction(true, false, true, false, null);
236248
commit(writeCommit(1));
237249
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
238250
assertThat(
@@ -252,7 +264,7 @@ public void testLookupDynamicPartition() throws Exception {
252264

253265
@Test
254266
public void testParseWrongTimePeriodsBlacklist() throws Exception {
255-
FileStoreTable table = createFileStoreTable(false, false, false);
267+
FileStoreTable table = createFileStoreTable(false, false, false, null);
256268

257269
FileStoreTable table1 =
258270
table.copy(
@@ -299,7 +311,7 @@ public void testCheckRefreshInBlacklist() throws Exception {
299311
String right = end.atZone(ZoneId.systemDefault()).format(formatter);
300312

301313
FileStoreTable table =
302-
createFileStoreTable(false, false, false)
314+
createFileStoreTable(false, false, false, null)
303315
.copy(
304316
Collections.singletonMap(
305317
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
@@ -312,6 +324,50 @@ public void testCheckRefreshInBlacklist() throws Exception {
312324
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli() + 1);
313325
}
314326

327+
@ParameterizedTest
328+
@ValueSource(booleans = {false, true})
329+
public void testLookupTableWithFullLoad(boolean joinEqualPk) throws Exception {
330+
createLookupFunction(false, joinEqualPk, false, false, 3);
331+
332+
if (joinEqualPk) {
333+
assertThat(lookupFunction.lookupTable())
334+
.isInstanceOf(PrimaryKeyPartialLookupTable.class);
335+
} else {
336+
assertThat(lookupFunction.lookupTable()).isInstanceOf(FullCacheLookupTable.class);
337+
}
338+
339+
GenericRow expectedRow = GenericRow.of(1, 1, 1L);
340+
StreamTableWrite writer = table.newStreamWriteBuilder().newWrite();
341+
writer.write(expectedRow);
342+
commit(writer.prepareCommit(true, 1));
343+
344+
List<RowData> result =
345+
new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 1L))));
346+
assertThat(result).size().isEqualTo(1);
347+
RowData resultRow = result.get(0);
348+
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
349+
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
350+
351+
// Create more commits to exceed threshold (3 more to have gap > 3)
352+
for (int i = 2; i < 6; i++) {
353+
writer.write(GenericRow.of(i, i, (long) i));
354+
commit(writer.prepareCommit(true, i));
355+
}
356+
writer.close();
357+
358+
// wait refresh
359+
Thread.sleep(2000);
360+
361+
expectedRow = GenericRow.of(5, 5, 5L);
362+
assertThat(lookupFunction.shouldDoFullLoad()).isTrue();
363+
lookupFunction.tryRefresh();
364+
result = new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(5, 5, 5L))));
365+
assertThat(result).size().isEqualTo(1);
366+
resultRow = result.get(0);
367+
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
368+
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
369+
}
370+
315371
private void commit(List<CommitMessage> messages) throws Exception {
316372
TableCommitImpl commit = table.newCommit(commitUser);
317373
commit.commit(messages);

0 commit comments

Comments
 (0)