Skip to content

Commit bf59480

Browse files
committed
address baiye's comments
1 parent 477d822 commit bf59480

File tree

29 files changed

+285
-324
lines changed

29 files changed

+285
-324
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,14 +456,15 @@ CompletableFuture<Void> releaseKvSnapshotLease(
456456
String leaseId, Set<TableBucket> bucketsToRelease);
457457

458458
/**
459-
* Releases the entire lease asynchronously.
459+
* Releases the entire lease asynchronously for all leased snapshots for all table buckets of
460+
* specified leaseId.
460461
*
461462
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
462463
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
463464
*
464465
* @param leaseId The lease id to release.
465466
*/
466-
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
467+
CompletableFuture<Void> releaseAllKvSnapshotLease(String leaseId);
467468

468469
/**
469470
* Get table lake snapshot info of the given table asynchronously.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
6363
import org.apache.fluss.rpc.messages.DropAclsRequest;
6464
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
65-
import org.apache.fluss.rpc.messages.DropKvSnapshotLeaseRequest;
6665
import org.apache.fluss.rpc.messages.DropTableRequest;
6766
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6867
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
@@ -85,6 +84,7 @@
8584
import org.apache.fluss.rpc.messages.PbTablePath;
8685
import org.apache.fluss.rpc.messages.RebalanceRequest;
8786
import org.apache.fluss.rpc.messages.RebalanceResponse;
87+
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
8888
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
8989
import org.apache.fluss.rpc.messages.TableExistsRequest;
9090
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -111,8 +111,8 @@
111111
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
112112
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
113113
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
114-
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest;
115114
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterProducerOffsetsRequest;
115+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest;
116116
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
117117
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
118118
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -418,9 +418,10 @@ public CompletableFuture<Void> releaseKvSnapshotLease(
418418
}
419419

420420
@Override
421-
public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {
422-
DropKvSnapshotLeaseRequest request = new DropKvSnapshotLeaseRequest().setLeaseId(leaseId);
423-
return gateway.dropKvSnapshotLease(request).thenApply(r -> null);
421+
public CompletableFuture<Void> releaseAllKvSnapshotLease(String leaseId) {
422+
ReleaseKvSnapshotLeaseRequest request =
423+
new ReleaseKvSnapshotLeaseRequest().setLeaseId(leaseId);
424+
return gateway.releaseKvSnapshotLease(request).thenApply(r -> null);
424425
}
425426

426427
@Override

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.HashMap;
5252
import java.util.List;
5353
import java.util.Map;
54+
import java.util.Optional;
5455

5556
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
5657
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
@@ -203,7 +204,7 @@ public void testKvSnapshotLease() throws Exception {
203204

204205
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
205206
String remoteDataDir = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir();
206-
KvSnapshotLeaseMetadataManager metadataHelper =
207+
KvSnapshotLeaseMetadataManager metadataManager =
207208
new KvSnapshotLeaseMetadataManager(zkClient, remoteDataDir);
208209

209210
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();
@@ -219,7 +220,7 @@ public void testKvSnapshotLease() throws Exception {
219220
kvSnapshotLease1, consumeBuckets, Duration.ofDays(1).toMillis())
220221
.get();
221222
checkKvSnapshotLeaseEquals(
222-
metadataHelper, kvSnapshotLease1, tableId, new Long[] {0L, 0L, 0L});
223+
metadataManager, kvSnapshotLease1, tableId, new Long[] {0L, 0L, 0L});
223224

224225
expectedRowByBuckets = putRows(tableId, tablePath, 10);
225226
// wait snapshot2 finish
@@ -236,7 +237,7 @@ public void testKvSnapshotLease() throws Exception {
236237
kvSnapshotLease2, consumeBuckets, Duration.ofDays(1).toMillis())
237238
.get();
238239
checkKvSnapshotLeaseEquals(
239-
metadataHelper, kvSnapshotLease2, tableId, new Long[] {1L, 1L, 1L});
240+
metadataManager, kvSnapshotLease2, tableId, new Long[] {1L, 1L, 1L});
240241
// check even snapshot1 is generated, snapshot0 also retained as lease exists.
241242
for (TableBucket tb : expectedRowByBuckets.keySet()) {
242243
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue();
@@ -252,14 +253,14 @@ public void testKvSnapshotLease() throws Exception {
252253
kvSnapshotLease1, Collections.singleton(new TableBucket(tableId, 0)))
253254
.get();
254255
checkKvSnapshotLeaseEquals(
255-
metadataHelper, kvSnapshotLease1, tableId, new Long[] {-1L, 0L, 0L});
256+
metadataManager, kvSnapshotLease1, tableId, new Long[] {-1L, 0L, 0L});
256257

257258
// release lease2.
258259
admin.releaseKvSnapshotLease(kvSnapshotLease2, consumeBuckets.keySet()).get();
259260
assertThat(zkClient.getKvSnapshotLeasesList()).doesNotContain(kvSnapshotLease2);
260261

261-
// drop lease1
262-
admin.dropKvSnapshotLease(kvSnapshotLease1).get();
262+
// release all kv snapshot lease of lease1
263+
admin.releaseAllKvSnapshotLease(kvSnapshotLease1).get();
263264
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();
264265

265266
expectedRowByBuckets = putRows(tableId, tablePath, 10);
@@ -272,7 +273,7 @@ public void testKvSnapshotLease() throws Exception {
272273
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isFalse();
273274
}
274275

275-
assertThatThrownBy(() -> admin.dropKvSnapshotLease("no-exist-lease").get())
276+
assertThatThrownBy(() -> admin.releaseAllKvSnapshotLease("no-exist-lease").get())
276277
.rootCause()
277278
.isInstanceOf(KvSnapshotLeaseNotExistException.class)
278279
.hasMessageContaining("kv snapshot lease 'no-exist-lease' not exits");
@@ -338,13 +339,13 @@ private static int getBucketId(InternalRow row) {
338339
}
339340

340341
private void checkKvSnapshotLeaseEquals(
341-
KvSnapshotLeaseMetadataManager metadataHelper,
342+
KvSnapshotLeaseMetadataManager metadataManager,
342343
String leaseId,
343344
long tableId,
344345
Long[] expectedBucketIndex)
345346
throws Exception {
346-
assertThat(metadataHelper.getLeasesList()).contains(leaseId);
347-
Optional<KvSnapshotLease> leaseOpt = metadataHelper.getLease(leaseId);
347+
assertThat(metadataManager.getLeasesList()).contains(leaseId);
348+
Optional<KvSnapshotLease> leaseOpt = metadataManager.getLease(leaseId);
348349
assertThat(leaseOpt).isPresent();
349350
KvSnapshotLease actualLease = leaseOpt.get();
350351
Map<Long, KvSnapshotTableLease> tableIdToTableLease = actualLease.getTableIdToTableLease();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ private enum ProcedureEnum {
7878
REBALANCE("sys.rebalance", RebalanceProcedure.class),
7979
CANCEL_REBALANCE("sys.cancel_rebalance", CancelRebalanceProcedure.class),
8080
LIST_REBALANCE_PROGRESS("sys.list_rebalance", ListRebalanceProcessProcedure.class),
81-
DROP_KV_SNAPSHOT_LEASE("sys.drop_kv_snapshot_lease", DropKvSnapshotLeaseProcedure.class);
81+
RELEASE_ALL_KV_SNAPSHOT_LEASE(
82+
"sys.release_all_kv_snapshot_lease", ReleaseAllKvSnapshotLeaseProcedure.class);
8283

8384
private final String path;
8485
private final Class<? extends ProcedureBase> procedureClass;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ReleaseAllKvSnapshotLeaseProcedure.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,25 @@
2222
import org.apache.flink.table.annotation.ProcedureHint;
2323
import org.apache.flink.table.procedure.ProcedureContext;
2424

25-
/** Procedure to release kv snapshot lease. */
26-
public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
25+
/**
26+
* Procedure to release all kv snapshots leased of specified leaseId. See {@link
27+
* org.apache.fluss.client.admin.Admin#releaseAllKvSnapshotLease(String)} for more details.
28+
*
29+
* <p>Usage examples:
30+
*
31+
* <pre>
32+
* -- Release all kv snapshots leased of specified leaseId
33+
* CALL sys.release_all_kv_snapshot_lease('test-lease-id');
34+
* </pre>
35+
*/
36+
public class ReleaseAllKvSnapshotLeaseProcedure extends ProcedureBase {
2737

2838
@ProcedureHint(
2939
argument = {
3040
@ArgumentHint(name = "leaseId", type = @DataTypeHint("STRING")),
3141
})
3242
public String[] call(ProcedureContext context, String leaseId) throws Exception {
33-
admin.dropKvSnapshotLease(leaseId).get();
43+
admin.releaseAllKvSnapshotLease(leaseId).get();
3444
return new String[] {"success"};
3545
}
3646
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.initializer.OffsetsInitializer;
2222
import org.apache.fluss.config.Configuration;
2323
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
24+
import org.apache.fluss.flink.source.reader.LeaseContext;
2425
import org.apache.fluss.metadata.TablePath;
2526
import org.apache.fluss.types.RowType;
2627

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.config.Configuration;
2626
import org.apache.fluss.flink.FlinkConnectorOptions;
2727
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
28+
import org.apache.fluss.flink.source.reader.LeaseContext;
2829
import org.apache.fluss.metadata.TableInfo;
2930
import org.apache.fluss.metadata.TablePath;
3031
import org.apache.fluss.types.RowType;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 56 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
import org.apache.fluss.client.initializer.OffsetsInitializer;
2626
import org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
2727
import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
28-
import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent;
2928
import org.apache.fluss.client.metadata.KvSnapshots;
3029
import org.apache.fluss.config.ConfigOptions;
3130
import org.apache.fluss.config.Configuration;
3231
import org.apache.fluss.flink.lake.LakeSplitGenerator;
3332
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
3433
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
34+
import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent;
3535
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
3636
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
3737
import org.apache.fluss.flink.source.reader.LeaseContext;
@@ -62,7 +62,6 @@
6262
import org.slf4j.LoggerFactory;
6363

6464
import javax.annotation.Nullable;
65-
import javax.annotation.concurrent.GuardedBy;
6665

6766
import java.io.IOException;
6867
import java.util.ArrayList;
@@ -79,13 +78,10 @@
7978
import java.util.OptionalLong;
8079
import java.util.Set;
8180
import java.util.TreeMap;
82-
import java.util.concurrent.locks.ReadWriteLock;
83-
import java.util.concurrent.locks.ReentrantReadWriteLock;
8481
import java.util.stream.Collectors;
8582

8683
import static org.apache.fluss.utils.Preconditions.checkNotNull;
8784
import static org.apache.fluss.utils.Preconditions.checkState;
88-
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
8985

9086
/**
9187
* An implementation of {@link SplitEnumerator} for the data of Fluss.
@@ -142,9 +138,7 @@ public class FlinkSourceEnumerator
142138

143139
private final LeaseContext leaseContext;
144140

145-
private final ReadWriteLock lock = new ReentrantReadWriteLock();
146141
/** checkpointId -> tableBuckets who finished consume kv snapshots. */
147-
@GuardedBy("lock")
148142
private final TreeMap<Long, Set<TableBucket>> consumedKvSnapshotMap = new TreeMap<>();
149143

150144
// Lazily instantiated or mutable fields.
@@ -556,66 +550,58 @@ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String partitionNa
556550
Map<Integer, Long> snapshotIds = new HashMap<>();
557551
Map<Integer, Long> logOffsets = new HashMap<>();
558552

559-
// retry to get the latest kv snapshots and acquire kvSnapshot lease util all buckets
560-
// acquire success. The reason is that getLatestKvSnapshots and acquireKvSnapshotLease
561-
// are not atomic operations, the latest kv snapshot obtained via get may become outdated by
562-
// the time it is passed to acquire. Therefore, this logic must implement a retry
563-
// mechanism: the unavailable tableBuckets in the AcquiredKvSnapshotLeaseResult returned by
564-
// acquireKvSnapshotLease must be retried repeatedly until all buckets are successfully
565-
// acquired.
553+
// retry to get the latest kv snapshots and acquire kvSnapshot lease.
566554
try {
567-
Set<TableBucket> remainingTableBuckets;
568-
do {
569-
KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
570-
remainingTableBuckets = new HashSet<>(kvSnapshots.getTableBuckets());
571-
572-
tableId = kvSnapshots.getTableId();
573-
partitionId = kvSnapshots.getPartitionId();
574-
575-
Set<TableBucket> ignoreBuckets = new HashSet<>();
576-
Map<TableBucket, Long> bucketsToLease = new HashMap<>();
577-
for (TableBucket tb : remainingTableBuckets) {
578-
int bucket = tb.getBucket();
579-
OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket);
580-
OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket);
581-
if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
582-
bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
583-
} else {
584-
ignoreBuckets.add(tb);
585-
}
586-
587-
snapshotIds.put(
588-
bucket, snapshotIdOpt.isPresent() ? snapshotIdOpt.getAsLong() : null);
589-
logOffsets.put(
590-
bucket, logOffsetOpt.isPresent() ? logOffsetOpt.getAsLong() : null);
555+
KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
556+
Set<TableBucket> remainingTableBuckets = new HashSet<>(kvSnapshots.getTableBuckets());
557+
558+
tableId = kvSnapshots.getTableId();
559+
partitionId = kvSnapshots.getPartitionId();
560+
561+
Set<TableBucket> ignoreBuckets = new HashSet<>();
562+
Map<TableBucket, Long> bucketsToLease = new HashMap<>();
563+
for (TableBucket tb : remainingTableBuckets) {
564+
int bucket = tb.getBucket();
565+
OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket);
566+
OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket);
567+
if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
568+
bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
569+
} else {
570+
ignoreBuckets.add(tb);
591571
}
592572

593-
if (!ignoreBuckets.isEmpty()) {
594-
remainingTableBuckets.removeAll(ignoreBuckets);
595-
}
573+
snapshotIds.put(
574+
bucket, snapshotIdOpt.isPresent() ? snapshotIdOpt.getAsLong() : null);
575+
logOffsets.put(bucket, logOffsetOpt.isPresent() ? logOffsetOpt.getAsLong() : null);
576+
}
596577

597-
if (!bucketsToLease.isEmpty()) {
598-
String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId();
578+
if (!ignoreBuckets.isEmpty()) {
579+
remainingTableBuckets.removeAll(ignoreBuckets);
580+
}
581+
582+
if (!bucketsToLease.isEmpty()) {
583+
String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId();
584+
LOG.info(
585+
"Try to acquire kv snapshot lease {} for table {}",
586+
kvSnapshotLeaseId,
587+
tablePath);
588+
Long kvSnapshotLeaseDurationMs = leaseContext.getKvSnapshotLeaseDurationMs();
589+
checkNotNull(kvSnapshotLeaseDurationMs, "kv snapshot lease duration is null.");
590+
remainingTableBuckets =
591+
flussAdmin
592+
.acquireKvSnapshotLease(
593+
kvSnapshotLeaseId,
594+
bucketsToLease,
595+
kvSnapshotLeaseDurationMs)
596+
.get()
597+
.getUnavailableTableBucketSet();
598+
if (!remainingTableBuckets.isEmpty()) {
599599
LOG.info(
600-
"Try to acquire kv snapshot lease {} for table {}",
601-
kvSnapshotLeaseId,
602-
tablePath);
603-
remainingTableBuckets =
604-
flussAdmin
605-
.acquireKvSnapshotLease(
606-
kvSnapshotLeaseId,
607-
bucketsToLease,
608-
leaseContext.getKvSnapshotLeaseDurationMs())
609-
.get()
610-
.getUnavailableTableBucketSet();
611-
if (!remainingTableBuckets.isEmpty()) {
612-
LOG.info(
613-
"Failed to acquire kv snapshot lease for table {}: {}. Retry to re-acquire",
614-
tablePath,
615-
remainingTableBuckets);
616-
}
600+
"Failed to acquire kv snapshot lease for table {}: {}. Retry to re-acquire",
601+
tablePath,
602+
remainingTableBuckets);
617603
}
618-
} while (!remainingTableBuckets.isEmpty());
604+
}
619605
} catch (Exception e) {
620606
throw new FlinkRuntimeException(
621607
String.format("Failed to get table snapshot for %s", tablePath),
@@ -1055,29 +1041,19 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
10551041

10561042
/** Add bucket who has been consumed kv snapshot to the consumedKvSnapshotMap. */
10571043
public void addConsumedBucket(long checkpointId, TableBucket tableBucket) {
1058-
inWriteLock(
1059-
lock,
1060-
() -> {
1061-
consumedKvSnapshotMap
1062-
.computeIfAbsent(checkpointId, k -> new HashSet<>())
1063-
.add(tableBucket);
1064-
});
1044+
consumedKvSnapshotMap.computeIfAbsent(checkpointId, k -> new HashSet<>()).add(tableBucket);
10651045
}
10661046

10671047
/** Get and remove the buckets who have been consumed kv snapshot up to the checkpoint id. */
10681048
public Set<TableBucket> getAndRemoveConsumedBucketsUpTo(long checkpointId) {
1069-
return inWriteLock(
1070-
lock,
1071-
() -> {
1072-
NavigableMap<Long, Set<TableBucket>> toRemove =
1073-
consumedKvSnapshotMap.headMap(checkpointId, false);
1074-
Set<TableBucket> result = new HashSet<>();
1075-
for (Set<TableBucket> snapshots : toRemove.values()) {
1076-
result.addAll(snapshots);
1077-
}
1078-
toRemove.clear();
1079-
return result;
1080-
});
1049+
NavigableMap<Long, Set<TableBucket>> toRemove =
1050+
consumedKvSnapshotMap.headMap(checkpointId, false);
1051+
Set<TableBucket> result = new HashSet<>();
1052+
for (Set<TableBucket> snapshots : toRemove.values()) {
1053+
result.addAll(snapshots);
1054+
}
1055+
toRemove.clear();
1056+
return result;
10811057
}
10821058

10831059
@Override

0 commit comments

Comments
 (0)