Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
<td>Integer</td>
<td>If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync.</td>
</tr>
<tr>
<td><h5>lookup.refresh.full-load-threshold</h5></td>
<td style="word-wrap: break-word;">2147483647</td>
<td>Integer</td>
<td>If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). </td>
</tr>
<tr>
<td><h5>lookup.refresh.time-periods-blacklist</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ public class FlinkConnectorOptions {
+ "cache refreshing is forbidden. Blacklist format is start1->end1,start2->end2,... , "
+ "and the time format is yyyy-MM-dd HH:mm. Only used when lookup table is FULL cache mode.");

public static final ConfigOption<Integer> LOOKUP_REFRESH_FULL_LOAD_THRESHOLD =
ConfigOptions.key("lookup.refresh.full-load-threshold")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription(
"If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates "
+ "and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. "
+ "Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). ");

public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
ConfigOptions.key("sink.savepoint.auto-tag")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
Expand Down Expand Up @@ -98,6 +99,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private transient Duration refreshInterval;
// timestamp when refreshing lookup table
private transient long nextRefreshTime;
// threshold for triggering full table reload when snapshots are pending
private transient int refreshFullThreshold;

protected FunctionContext functionContext;

Expand Down Expand Up @@ -175,6 +178,7 @@ private void open() throws Exception {
this.refreshInterval =
options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
.orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD);

List<String> fieldNames = table.rowType().getFieldNames();
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
Expand Down Expand Up @@ -332,18 +336,56 @@ void tryRefresh() throws Exception {
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
// no need to refresh the lookup table because it is reopened
return;
}
}

// 3. refresh lookup table
if (shouldRefreshLookupTable()) {
lookupTable.refresh();
// Check if we should do full load (close and reopen table) instead of incremental
// refresh
boolean doFullLoad = shouldDoFullLoad();

if (doFullLoad) {
LOG.info(
"Doing full load for table {} instead of incremental refresh",
table.name());
lookupTable.close();
lookupTable.open();
} else {
lookupTable.refresh();
}

nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
}
}

/**
* Check if we should do full load instead of incremental refresh. This can improve performance
* when there are many pending snapshots.
*/
@VisibleForTesting
public boolean shouldDoFullLoad() {
if (refreshFullThreshold <= 0 || refreshFullThreshold == Integer.MAX_VALUE) {
return false;
}

Long latestSnapshotId = ((FileStoreTable) table).snapshotManager().latestSnapshotId();
Long nextSnapshotId = lookupTable.nextSnapshotId();
if (latestSnapshotId == null || nextSnapshotId == null) {
return false;
}

LOG.info(
"Check if should do full load, latestSnapshotId: {}, nextSnapshotId: {}, refreshFullThreshold: {}",
latestSnapshotId,
nextSnapshotId,
refreshFullThreshold);
return latestSnapshotId - nextSnapshotId + 1 >= refreshFullThreshold;
}

private boolean shouldRefreshLookupTable() {
if (nextRefreshTime > System.currentTimeMillis()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
this.cacheRowFilter = filter;
}

@Override
public Long nextSnapshotId() {
return this.reader.nextSnapshotId();
}

protected void init() throws Exception {
this.stateFactory = createStateFactory();
this.refreshExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ public interface LookupTable extends Closeable {
void refresh() throws Exception;

void specifyCacheRowFilter(Filter<InternalRow> filter);

Long nextSnapshotId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public void specifyCacheRowFilter(Filter<InternalRow> filter) {
this.cacheRowFilter = filter;
}

@Override
public Long nextSnapshotId() {
return this.queryExecutor.nextSnapshotId();
}

@Override
public void close() throws IOException {
if (queryExecutor != null) {
Expand Down Expand Up @@ -243,6 +248,10 @@ interface QueryExecutor extends Closeable {
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;

void refresh();

default Long nextSnapshotId() {
return Long.MAX_VALUE;
}
}

static class LocalQueryExecutor implements QueryExecutor {
Expand Down Expand Up @@ -334,6 +343,11 @@ void refreshSplit(DataSplit split) {
numBuckets.put(partition, totalBuckets);
}

@Override
public Long nextSnapshotId() {
return this.scan.checkpoint();
}

@Override
public void close() throws IOException {
tableQuery.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -91,16 +92,19 @@ public void before() throws Exception {
}

private void createLookupFunction(boolean refreshAsync) throws Exception {
createLookupFunction(true, false, false, refreshAsync);
createLookupFunction(true, false, false, refreshAsync, null);
}

private void createLookupFunction(
boolean isPartition,
boolean joinEqualPk,
boolean dynamicPartition,
boolean refreshAsync)
boolean refreshAsync,
Integer fullLoadThreshold)
throws Exception {
table = createFileStoreTable(isPartition, dynamicPartition, refreshAsync);
table =
createFileStoreTable(
isPartition, dynamicPartition, refreshAsync, fullLoadThreshold);
lookupFunction = createLookupFunction(table, joinEqualPk);
lookupFunction.open(tempDir.toString());
}
Expand All @@ -116,7 +120,11 @@ private FileStoreLookupFunction createLookupFunction(
}

private FileStoreTable createFileStoreTable(
boolean isPartition, boolean dynamicPartition, boolean refreshAsync) throws Exception {
boolean isPartition,
boolean dynamicPartition,
boolean refreshAsync,
Integer fullLoadThreshold)
throws Exception {
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
Options conf = new Options();
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
Expand All @@ -128,6 +136,10 @@ private FileStoreTable createFileStoreTable(
conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, "max_pt()");
}

if (fullLoadThreshold != null) {
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD, fullLoadThreshold);
}

RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
Expand All @@ -153,7 +165,7 @@ public void close() throws Exception {

@Test
public void testCompatibilityForOldVersion() throws Exception {
createLookupFunction(false, true, false, false);
createLookupFunction(false, true, false, false, null);
commit(writeCommit(1));
PrimaryKeyPartialLookupTable lookupTable =
(PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
Expand All @@ -174,7 +186,7 @@ public void testCompatibilityForOldVersion() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
createLookupFunction(false, true, false, refreshAsync);
createLookupFunction(false, true, false, refreshAsync, null);
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
QueryExecutor queryExecutor =
((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor();
Expand All @@ -184,7 +196,7 @@ public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDefaultRemotePartial(boolean refreshAsync) throws Exception {
createLookupFunction(false, true, false, refreshAsync);
createLookupFunction(false, true, false, refreshAsync, null);
ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
serviceManager.resetService(
PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new InetSocketAddress(1)});
Expand Down Expand Up @@ -232,7 +244,7 @@ public void testLookupExpiredSnapshot(boolean refreshAsync) throws Exception {

@Test
public void testLookupDynamicPartition() throws Exception {
createLookupFunction(true, false, true, false);
createLookupFunction(true, false, true, false, null);
commit(writeCommit(1));
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
assertThat(
Expand All @@ -252,7 +264,7 @@ public void testLookupDynamicPartition() throws Exception {

@Test
public void testParseWrongTimePeriodsBlacklist() throws Exception {
FileStoreTable table = createFileStoreTable(false, false, false);
FileStoreTable table = createFileStoreTable(false, false, false, null);

FileStoreTable table1 =
table.copy(
Expand Down Expand Up @@ -299,7 +311,7 @@ public void testCheckRefreshInBlacklist() throws Exception {
String right = end.atZone(ZoneId.systemDefault()).format(formatter);

FileStoreTable table =
createFileStoreTable(false, false, false)
createFileStoreTable(false, false, false, null)
.copy(
Collections.singletonMap(
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
Expand All @@ -312,6 +324,50 @@ public void testCheckRefreshInBlacklist() throws Exception {
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli() + 1);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLookupTableWithFullLoad(boolean joinEqualPk) throws Exception {
createLookupFunction(false, joinEqualPk, false, false, 3);

if (joinEqualPk) {
assertThat(lookupFunction.lookupTable())
.isInstanceOf(PrimaryKeyPartialLookupTable.class);
} else {
assertThat(lookupFunction.lookupTable()).isInstanceOf(FullCacheLookupTable.class);
}

GenericRow expectedRow = GenericRow.of(1, 1, 1L);
StreamTableWrite writer = table.newStreamWriteBuilder().newWrite();
writer.write(expectedRow);
commit(writer.prepareCommit(true, 1));

List<RowData> result =
new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 1L))));
assertThat(result).size().isEqualTo(1);
RowData resultRow = result.get(0);
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));

// Create more commits to exceed threshold (3 more to have gap > 3)
for (int i = 2; i < 6; i++) {
writer.write(GenericRow.of(i, i, (long) i));
commit(writer.prepareCommit(true, i));
}
writer.close();

// wait refresh
Thread.sleep(2000);

expectedRow = GenericRow.of(5, 5, 5L);
assertThat(lookupFunction.shouldDoFullLoad()).isTrue();
lookupFunction.tryRefresh();
result = new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(5, 5, 5L))));
assertThat(result).size().isEqualTo(1);
resultRow = result.get(0);
assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
}

private void commit(List<CommitMessage> messages) throws Exception {
TableCommitImpl commit = table.newCommit(commitUser);
commit.commit(messages);
Expand Down
Loading