Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ private void tryLock() {
return;
}

// Try to recreate the cluster data directory in case it got removed
if (!Files.exists(leaderLockPath.getParent())) {
Files.createDirectories(leaderLockPath.getParent());
}

// Attempt to obtain cluster leadership
LOGGER.debug("Try to acquire a lock on {} (cluster-member-id={})", leaderLockPath, localMember.getUuid());
leaderLockFile = new RandomAccessFile(leaderLockPath.toFile(), "rw");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.component.file.cluster;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -24,9 +25,11 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.apache.camel.CamelContext;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.util.FileUtil;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
Expand Down Expand Up @@ -178,7 +181,7 @@ void multipleClusterMembersReelectLeaderIfLockFileDeleted() throws Exception {
}

@Test
void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception {
void multipleClusterMembersReelectLeaderIfClusterDataDirectoryDeleted() throws Exception {
ClusterConfig leaderConfig = new ClusterConfig();
leaderConfig.setTimerRepeatCount(-1);

Expand All @@ -192,7 +195,7 @@ void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clu

try {
MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
mockEndpointLeader.expectedMessageCount(5);
mockEndpointLeader.expectedMinimumMessageCount(1);

clusterLeader.start();
clusterFollower.start();
Expand Down Expand Up @@ -222,16 +225,113 @@ void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clu
mockEndpointLeader.reset();
mockEndpointLeader.expectedMinimumMessageCount(1);

// Simulate the file system becoming detached by moving the cluster data directory
Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING);
// Delete the cluster data directory
FileUtil.removeDir(clusterDir.toFile());

// Wait for leadership to be relinquished
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
assertFalse(getClusterMember(clusterLeader).isLeader());
});

// Wait for leadership to be gained by one of the members
CamelContext oldLeader = clusterLeader;
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
boolean newLeaderElected = false;

// Original cluster leader regained leadership
if (getClusterMember(oldLeader).isLeader()) {
newLeaderElected = true;
mockEndpointLeader.assertIsSatisfied();
}

// A different cluster member gained leadership
if (getClusterMember(clusterFollower).isLeader()) {
newLeaderElected = true;
mockEndpointFollower.assertIsSatisfied();
}

assertTrue(newLeaderElected);
});
} finally {
clusterLeader.stop();
clusterFollower.stop();
}
}

@Test
void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception {
ClusterConfig leaderConfig = new ClusterConfig();
leaderConfig.setTimerRepeatCount(-1);

CamelContext clusterLeader = createCamelContext(leaderConfig);

ClusterConfig followerConfig = new ClusterConfig();
followerConfig.setTimerRepeatCount(-1);
followerConfig.setAcquireLockDelay(2);

CamelContext clusterFollower = createCamelContext(followerConfig);

try {
MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
mockEndpointLeader.expectedMessageCount(5);

clusterLeader.start();
clusterFollower.start();

mockEndpointLeader.assertIsSatisfied();

AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new AtomicReference<>();
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertTrue(Files.exists(lockFile));
assertTrue(Files.exists(dataFile));
assertTrue(getClusterMember(clusterLeader).isLeader());

FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile);
assertNotNull(clusterLeaderInfo);
leaderInfo.set(clusterLeaderInfo);

String leaderId = clusterLeaderInfo.getId();
assertNotNull(leaderId);
assertDoesNotThrow(() -> UUID.fromString(leaderId));
});

// Wait enough time for the follower to have run its lock acquisition scheduled task
Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());

// The follower should not have produced any messages
MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
assertTrue(mockEndpointFollower.getExchanges().isEmpty());

mockEndpointLeader.reset();
mockEndpointLeader.expectedMinimumMessageCount(1);

// Simulate the file system becoming detached by moving the cluster data directory
Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING);

// Simulate reattaching the file system by moving the cluster directory back to the original location
Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING);
try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
stream.forEach(path -> {
try {
Path destination = clusterDir.resolve(clusterMovedLocation.relativize(path));
if (Files.isDirectory(path)) {
Files.createDirectories(destination);
} else {
Files.copy(path, destination, StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

FileLockClusterLeaderInfo updatedInfo
= new FileLockClusterLeaderInfo(
leaderInfo.get().getId(), TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis());
Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) {
FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo,
true);
}

// Since the lock file is not considered 'stale', the original leader should resume leadership
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
Expand Down Expand Up @@ -309,15 +409,28 @@ void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path clusterMove

FileLockClusterLeaderInfo updatedInfo
= new FileLockClusterLeaderInfo(
leaderInfo.get().getId(), TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp);
leaderInfo.get().getId(), TimeUnit.MILLISECONDS.toMillis(2), staleHeartbeatTimestamp);
Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) {
FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo,
true);
}

// Simulate reattaching the file system by moving the cluster directory back to the original location
Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING);
try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
stream.forEach(path -> {
try {
Path destination = clusterDir.resolve(clusterMovedLocation.relativize(path));
if (Files.isDirectory(path)) {
Files.createDirectories(destination);
} else {
Files.copy(path, destination, StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

mockEndpointFollower.expectedMinimumMessageCount(1);

Expand Down
Loading