Skip to content

Commit 42a0542

Browse files
committed
fix tests
1 parent e5d3313 commit 42a0542

File tree

4 files changed

+112
-58
lines changed

4 files changed

+112
-58
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -307,39 +307,50 @@ public Optional<KvTablet> getKv(TableBucket tableBucket) {
307307
return Optional.ofNullable(currentKvs.get(tableBucket));
308308
}
309309

310-
public void dropKv(TableBucket tableBucket) {
311-
KvTablet dropKvTablet =
310+
public void closeOrDropKv(TableBucket tableBucket, boolean needDrop) {
311+
KvTablet removeTablet =
312312
inLock(tabletCreationOrDeletionLock, () -> currentKvs.remove(tableBucket));
313313

314-
if (dropKvTablet != null) {
315-
TablePath tablePath = dropKvTablet.getTablePath();
314+
if (removeTablet != null) {
315+
TablePath tablePath = removeTablet.getTablePath();
316316
try {
317-
dropKvTablet.drop();
318-
if (dropKvTablet.getPartitionName() == null) {
319-
LOG.info(
320-
"Deleted kv bucket {} for table {} in file path {}.",
321-
tableBucket.getBucket(),
322-
tablePath,
323-
dropKvTablet.getKvTabletDir().getAbsolutePath());
317+
if (needDrop) {
318+
removeTablet.drop();
319+
if (removeTablet.getPartitionName() == null) {
320+
LOG.info(
321+
"Deleted kv bucket {} for table {} in file path {}.",
322+
tableBucket.getBucket(),
323+
tablePath,
324+
removeTablet.getKvTabletDir().getAbsolutePath());
325+
} else {
326+
LOG.info(
327+
"Deleted kv bucket {} for the partition {} of table {} in file path {}.",
328+
tableBucket.getBucket(),
329+
removeTablet.getPartitionName(),
330+
tablePath,
331+
removeTablet.getKvTabletDir().getAbsolutePath());
332+
}
324333
} else {
334+
removeTablet.close();
325335
LOG.info(
326-
"Deleted kv bucket {} for the partition {} of table {} in file path {}.",
327-
tableBucket.getBucket(),
328-
dropKvTablet.getPartitionName(),
329-
tablePath,
330-
dropKvTablet.getKvTabletDir().getAbsolutePath());
336+
"Closed kvTablet and released RocksDB lock for bucket {} without deleting files",
337+
tableBucket);
331338
}
332339
} catch (Exception e) {
333340
throw new KvStorageException(
334341
String.format(
335-
"Exception while deleting kv for table %s, bucket %s in dir %s.",
342+
"Exception while deleting or closing kv for table %s, bucket %s in dir %s, needDrop: %s.",
336343
tablePath,
337344
tableBucket.getBucket(),
338-
dropKvTablet.getKvTabletDir().getAbsolutePath()),
345+
removeTablet.getKvTabletDir().getAbsolutePath(),
346+
needDrop),
339347
e);
340348
}
341349
} else {
342-
LOG.warn("Fail to delete kv bucket {}.", tableBucket.getBucket());
350+
LOG.warn(
351+
"Fail to delete or close kv bucket {}, needDrop: {}.",
352+
tableBucket.getBucket(),
353+
needDrop);
343354
}
344355
}
345356

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotManager.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import java.nio.file.Path;
4141
import java.nio.file.Paths;
4242
import java.util.ArrayList;
43+
import java.util.HashMap;
4344
import java.util.HashSet;
4445
import java.util.List;
46+
import java.util.Map;
4547
import java.util.Optional;
4648
import java.util.Set;
4749
import java.util.concurrent.CompletableFuture;
@@ -133,6 +135,8 @@ public static KvSnapshotManager create(
133135

134136
public void becomeLeader() {
135137
isLeader = true;
138+
// Clear standby download cache when leaving standby role
139+
clearStandbyDownloadCache();
136140
}
137141

138142
public void becomeFollower() {
@@ -143,6 +147,14 @@ public void becomeFollower() {
143147

144148
public void becomeStandby() {
145149
isLeader = false;
150+
// Clear standby download cache when new added to standby role
151+
clearStandbyDownloadCache();
152+
153+
// make db dir.
154+
Path kvDbPath = tabletDir.toPath().resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING);
155+
if (!kvDbPath.toFile().exists()) {
156+
kvDbPath.toFile().mkdirs();
157+
}
146158
}
147159

148160
@VisibleForTesting
@@ -228,18 +240,19 @@ private void incrementalDownloadSnapshot(CompletedSnapshot completedSnapshot) th
228240
completedSnapshot, downloadedSstFiles, sstFilesToDelete);
229241
CompletedSnapshot incrementalSnapshot =
230242
completedSnapshot.getIncrementalSnapshot(incrementalKvSnapshotHandle);
231-
downloadKvSnapshots(incrementalSnapshot, new CloseableRegistry());
232243

233244
// delete the sst files that are not needed.
234245
for (Path sstFileToDelete : sstFilesToDelete) {
235246
FileUtils.deleteFileOrDirectory(sstFileToDelete.toFile());
236247
}
237248

238-
// delete the misc files that are not needed.
249+
// delete all the misc files that are not needed.
239250
for (Path miscFileToDelete : downloadedMiscFiles) {
240251
FileUtils.deleteFileOrDirectory(miscFileToDelete.toFile());
241252
}
242253

254+
downloadKvSnapshots(incrementalSnapshot, new CloseableRegistry());
255+
243256
KvSnapshotHandle kvSnapshotHandle = completedSnapshot.getKvSnapshotHandle();
244257
downloadedSstFiles =
245258
kvSnapshotHandle.getSharedKvFileHandles().stream()
@@ -483,7 +496,8 @@ private void scheduleNextSnapshot(Executor guardedExecutor) {
483496
private void loadKvLocalFiles(Set<Path> downloadedSstFiles, Set<Path> downloadedMiscFiles)
484497
throws Exception {
485498
if (tabletDir.exists()) {
486-
Path[] files = FileUtils.listDirectory(tabletDir.toPath());
499+
Path kvDbPath = tabletDir.toPath().resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING);
500+
Path[] files = FileUtils.listDirectory(kvDbPath);
487501
for (Path filePath : files) {
488502
final String fileName = filePath.getFileName().toString();
489503
if (fileName.endsWith(SST_FILE_SUFFIX)) {
@@ -499,27 +513,33 @@ private KvSnapshotHandle getIncrementalKvSnapshotHandle(
499513
CompletedSnapshot completedSnapshot,
500514
Set<Path> downloadedSstFiles,
501515
Set<Path> sstFilesToDelete) {
516+
// get downloaded sst files name to path.
517+
Map<String, Path> downloadedSstFilesMap = new HashMap<>();
518+
for (Path sstPath : downloadedSstFiles) {
519+
downloadedSstFilesMap.put(sstPath.getFileName().toString(), sstPath);
520+
}
521+
502522
KvSnapshotHandle completedSnapshotHandler = completedSnapshot.getKvSnapshotHandle();
503523
List<KvFileHandleAndLocalPath> sstFileHandles =
504524
completedSnapshotHandler.getSharedKvFileHandles();
505525
List<KvFileHandleAndLocalPath> privateFileHandles =
506526
completedSnapshotHandler.getPrivateFileHandles();
507527

508528
List<KvFileHandleAndLocalPath> incrementalSstFileHandles = new ArrayList<>();
529+
Set<String> downloadedSstFileNames = downloadedSstFilesMap.keySet();
509530
for (KvFileHandleAndLocalPath sstFileHandle : sstFileHandles) {
510-
Path sstPath = Paths.get(sstFileHandle.getLocalPath());
511-
if (!downloadedSstFiles.contains(sstPath)) {
531+
if (!downloadedSstFileNames.contains(sstFileHandle.getLocalPath())) {
512532
incrementalSstFileHandles.add(sstFileHandle);
513533
}
514534
}
515535

516-
Set<Path> newSstFiles =
536+
Set<String> newSstFileNames =
517537
completedSnapshotHandler.getSharedKvFileHandles().stream()
518-
.map(handler -> Paths.get(handler.getLocalPath()))
538+
.map(KvFileHandleAndLocalPath::getLocalPath)
519539
.collect(Collectors.toSet());
520-
for (Path sstPath : downloadedSstFiles) {
521-
if (!newSstFiles.contains(sstPath)) {
522-
sstFilesToDelete.add(sstPath);
540+
for (String sstFileName : downloadedSstFileNames) {
541+
if (!newSstFileNames.contains(sstFileName)) {
542+
sstFilesToDelete.add(downloadedSstFilesMap.get(sstFileName));
523543
}
524544
}
525545

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,7 @@ public Replica(
254254
this.kvSnapshotManager =
255255
KvSnapshotManager.create(
256256
tableBucket,
257-
kvManager.getOrCreateTabletDir(
258-
physicalPath, tableBucket),
257+
kvManager.getOrCreateTabletDir(physicalPath, tableBucket),
259258
snapshotContext,
260259
clock);
261260
}
@@ -550,7 +549,18 @@ private void onBecomeNewLeader() {
550549
registerLakeTieringMetrics();
551550
}
552551

553-
if (!isLeader() && isKvTable()) {
552+
if (isKvTable()) {
553+
// If kvTablet already exists (e.g., Leader -> Leader scenario),
554+
// we need to close it first to release RocksDB lock before re-initializing.
555+
// Set need drop as false to avoid deleting files.
556+
if (kvTablet != null) {
557+
LOG.info(
558+
"Closing existing kvTablet before re-initializing as leader for bucket {}",
559+
tableBucket);
560+
kvManager.closeOrDropKv(tableBucket, false);
561+
kvTablet = null;
562+
}
563+
554564
// 1. If there is no sst files in local, download the latest kv snapshot and apply log.
555565
// 2. If there is already sst files in local, check the diff with the latest snapshot
556566
// and download the diff and delete the deleted sst files. And then apply log.
@@ -584,7 +594,10 @@ private void onBecomeNewFollower(int standbyReplica) {
584594
checkNotNull(kvSnapshotManager);
585595
if (isNowStandby) {
586596
kvSnapshotManager.becomeStandby();
587-
becomeStandby();
597+
// Mark as standby immediately to ensure coordinator's state is consistent.
598+
// The snapshot download is done asynchronously to avoid blocking makeFollower.
599+
isStandbyReplica = true;
600+
becomeStandbyAsync();
588601
} else {
589602
// to be new follower.
590603
kvSnapshotManager.becomeFollower();
@@ -637,24 +650,39 @@ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) {
637650
isDataLakeEnabled);
638651
}
639652

640-
private void becomeStandby() {
641-
try {
642-
checkNotNull(kvSnapshotManager);
643-
kvSnapshotManager.downloadLatestSnapshot();
644-
isStandbyReplica = true;
645-
LOG.info(
646-
"TabletServer {} becomes standby for bucket {}",
647-
localTabletServerId,
648-
tableBucket);
649-
} catch (Exception e) {
650-
// Mark as standby anyway to ensure coordinator's state is consistent.
651-
// The snapshot download can be retried via NotifyKvSnapshotOffsetRequest.
652-
isStandbyReplica = true;
653-
LOG.warn(
654-
"Failed to download snapshot when becoming standby replica for bucket {}.",
655-
tableBucket,
656-
e);
657-
}
653+
/**
654+
* Asynchronously download the latest snapshot for standby replica.
655+
*
656+
* <p>This method submits the snapshot download task to an async thread pool to avoid blocking
657+
* the makeFollower operation. The download can be retried later via
658+
* NotifyKvSnapshotOffsetRequest if it fails.
659+
*/
660+
private void becomeStandbyAsync() {
661+
checkNotNull(snapshotContext);
662+
snapshotContext
663+
.getAsyncOperationsThreadPool()
664+
.execute(
665+
() -> {
666+
try {
667+
checkNotNull(kvSnapshotManager);
668+
kvSnapshotManager.downloadLatestSnapshot();
669+
LOG.info(
670+
"TabletServer {} successfully downloaded snapshot and becomes standby for bucket {}",
671+
localTabletServerId,
672+
tableBucket);
673+
} catch (Exception e) {
674+
// The snapshot download can be retried via
675+
// NotifyKvSnapshotOffsetRequest.
676+
LOG.warn(
677+
"Failed to download snapshot when becoming standby replica for bucket {}. Will retry later.",
678+
tableBucket,
679+
e);
680+
}
681+
});
682+
LOG.info(
683+
"TabletServer {} is becoming standby for bucket {}, snapshot download started asynchronously",
684+
localTabletServerId,
685+
tableBucket);
658686
}
659687

660688
private void createKv() {
@@ -700,7 +728,7 @@ private void dropKv() {
700728

701729
// drop the kv tablet
702730
checkNotNull(kvManager);
703-
kvManager.dropKv(tableBucket);
731+
kvManager.closeOrDropKv(tableBucket, true);
704732
kvTablet = null;
705733
} else {
706734
// For standby replicas, kvTablet is null, so we need to manually delete the
@@ -760,11 +788,6 @@ private Optional<CompletedSnapshot> initKvTablet() {
760788
checkNotNull(kvTablet, "kv tablet should not be null.");
761789
restoreStartOffset = optCompletedSnapshot.get().getLogOffset();
762790
} else {
763-
LOG.info(
764-
"No snapshot found for {} of {}, restore from log.",
765-
tableBucket,
766-
physicalPath);
767-
kvManager.dropKv(tableBucket);
768791
// actually, kv manager always create a kv tablet since we will drop the kv
769792
// if it exists before init kv tablet
770793
kvTablet =

fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ void testSameTableNameInDifferentDb(String partitionName) throws Exception {
289289
void testDropKv(String partitionName) throws Exception {
290290
initTableBuckets(partitionName);
291291
KvTablet kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1);
292-
kvManager.dropKv(kv1.getTableBucket());
292+
kvManager.closeOrDropKv(kv1.getTableBucket(), true);
293293

294294
assertThat(kv1.getKvTabletDir()).doesNotExist();
295295
assertThat(kvManager.getKv(tableBucket1)).isNotPresent();

0 commit comments

Comments
 (0)