Skip to content

Commit 575db63

Browse files
authored
[hotfix][flink] Fix acquire kv snapshot lease cannot work issue for legacy clusters (#2714)
1 parent 43f76a5 commit 575db63

File tree

2 files changed

+393
-21
lines changed

2 files changed

+393
-21
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.client.metadata.KvSnapshots;
2929
import org.apache.fluss.config.ConfigOptions;
3030
import org.apache.fluss.config.Configuration;
31+
import org.apache.fluss.exception.UnsupportedVersionException;
3132
import org.apache.fluss.flink.lake.LakeSplitGenerator;
3233
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
3334
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
@@ -607,17 +608,33 @@ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String partitionNa
607608
kvSnapshotLeaseId,
608609
PhysicalTablePath.of(tablePath, partitionName));
609610
long kvSnapshotLeaseDurationMs = leaseContext.getKvSnapshotLeaseDurationMs();
610-
Set<TableBucket> unavailableTableBucketSet =
611-
flussAdmin
612-
.createKvSnapshotLease(kvSnapshotLeaseId, kvSnapshotLeaseDurationMs)
613-
.acquireSnapshots(bucketsToLease)
614-
.get()
615-
.getUnavailableTableBucketSet();
616-
if (!unavailableTableBucketSet.isEmpty()) {
617-
LOG.error(
618-
"Failed to acquire kv snapshot lease for table {}: {}.",
619-
tablePath,
620-
unavailableTableBucketSet);
611+
try {
612+
Set<TableBucket> unavailableTableBucketSet =
613+
flussAdmin
614+
.createKvSnapshotLease(
615+
kvSnapshotLeaseId, kvSnapshotLeaseDurationMs)
616+
.acquireSnapshots(bucketsToLease)
617+
.get()
618+
.getUnavailableTableBucketSet();
619+
if (!unavailableTableBucketSet.isEmpty()) {
620+
LOG.error(
621+
"Failed to acquire kv snapshot lease for table {}: {}.",
622+
tablePath,
623+
unavailableTableBucketSet);
624+
}
625+
} catch (Exception e) {
626+
if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class)
627+
.isPresent()) {
628+
LOG.warn(
629+
"Failed to acquire kv snapshot lease for table {} because the "
630+
+ "server does not support kv snapshot lease API. "
631+
+ "Snapshots may be cleaned up earlier than expected. "
632+
+ "Please upgrade the Fluss server to version 0.9 or later.",
633+
tablePath,
634+
e);
635+
} else {
636+
throw e;
637+
}
621638
}
622639
}
623640
} catch (Exception e) {
@@ -1054,10 +1071,21 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
10541071
.releaseSnapshots(consumedKvSnapshots)
10551072
.get();
10561073
} catch (Exception e) {
1057-
LOG.error("Failed to release kv snapshot lease. These snapshot need to re-enqueue", e);
1058-
// use the current checkpoint id to re-enqueue the buckets
1059-
consumedKvSnapshots.forEach(
1060-
tableBucket -> addConsumedBucket(checkpointId, tableBucket));
1074+
if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) {
1075+
LOG.warn(
1076+
"Failed to release kv snapshot lease because the server does not support "
1077+
+ "kv snapshot lease API. Snapshots may remain in storage longer "
1078+
+ "than necessary. Please upgrade the Fluss server to version 0.9 "
1079+
+ "or later.",
1080+
e);
1081+
} else {
1082+
LOG.error(
1083+
"Failed to release kv snapshot lease. These snapshots need to re-enqueue",
1084+
e);
1085+
// use the current checkpoint id to re-enqueue the buckets
1086+
consumedKvSnapshots.forEach(
1087+
tableBucket -> addConsumedBucket(checkpointId, tableBucket));
1088+
}
10611089
}
10621090
}
10631091

@@ -1114,12 +1142,25 @@ private void maybeDropKvSnapshotLease() throws Exception {
11141142
"Dropping kv snapshot lease {} when source enumerator close. isStreaming {}",
11151143
leaseContext.getKvSnapshotLeaseId(),
11161144
streaming);
1117-
flussAdmin
1118-
.createKvSnapshotLease(
1119-
leaseContext.getKvSnapshotLeaseId(),
1120-
leaseContext.getKvSnapshotLeaseDurationMs())
1121-
.dropLease()
1122-
.get();
1145+
try {
1146+
flussAdmin
1147+
.createKvSnapshotLease(
1148+
leaseContext.getKvSnapshotLeaseId(),
1149+
leaseContext.getKvSnapshotLeaseDurationMs())
1150+
.dropLease()
1151+
.get();
1152+
} catch (Exception e) {
1153+
if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class)
1154+
.isPresent()) {
1155+
LOG.warn(
1156+
"Failed to drop kv snapshot lease because the server does not support "
1157+
+ "kv snapshot lease API. Please upgrade the Fluss server to "
1158+
+ "version 0.9 or later.",
1159+
e);
1160+
} else {
1161+
throw e;
1162+
}
1163+
}
11231164
}
11241165
}
11251166

0 commit comments

Comments
 (0)