Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,51 @@
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.leader.election;
package org.springframework.cloud.kubernetes.commons.leader.election;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;

import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties;
import org.springframework.core.log.LogAccessor;

final class Fabric8LeaderElectionInitiatorUtil {
/**
* @author wind57
*/
public final class LeaderElectionInitiatorUtil {

private static final LogAccessor LOG = new LogAccessor(LeaderElectionInitiatorUtil.class);

private static final LogAccessor LOG = new LogAccessor(Fabric8LeaderElectionInitiatorUtil.class);
private LeaderElectionInitiatorUtil() {

private Fabric8LeaderElectionInitiatorUtil() {
}

public static void blockReadinessCheck(CompletableFuture<?> ready) {
try {
ready.get();
}
catch (Exception e) {
LOG.error(e, () -> "block readiness check failed with : " + e.getMessage());
throw new RuntimeException(e);
}
}

public static void shutDownExecutor(ExecutorService podReadyWaitingExecutor, String candidateIdentity) {
LOG.debug(() -> "podReadyWaitingExecutor will be shutdown for : " + candidateIdentity);
podReadyWaitingExecutor.shutdownNow();
try {
podReadyWaitingExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* if 'ready' is already completed at this point, thread will run this, otherwise it
* will attach the pipeline and move on to 'blockReadinessCheck'.
*/
static CompletableFuture<?> attachReadinessLoggerPipeline(CompletableFuture<?> innerPodReadyFuture,
public static CompletableFuture<?> attachReadinessLoggerPipeline(CompletableFuture<?> innerPodReadyFuture,
String candidateIdentity) {
return innerPodReadyFuture.whenComplete((ok, error) -> {
if (error != null) {
Expand All @@ -51,7 +70,7 @@ static CompletableFuture<?> attachReadinessLoggerPipeline(CompletableFuture<?> i
});
}

static void sleep(LeaderElectionProperties leaderElectionProperties) {
public static void sleep(LeaderElectionProperties leaderElectionProperties) {
try {
TimeUnit.SECONDS.sleep(leaderElectionProperties.waitAfterRenewalFailure().toSeconds());
}
Expand All @@ -60,29 +79,4 @@ static void sleep(LeaderElectionProperties leaderElectionProperties) {
}
}

static LeaderElector leaderElector(LeaderElectionConfig config, KubernetesClient fabric8KubernetesClient) {
return fabric8KubernetesClient.leaderElector().withConfig(config).build();
}

static void blockReadinessCheck(CompletableFuture<?> ready) {
try {
ready.get();
}
catch (Exception e) {
LOG.error(e, () -> "block readiness check failed with : " + e.getMessage());
throw new RuntimeException(e);
}
}

static void shutDownExecutor(ExecutorService podReadyWaitingExecutor, String candidateIdentity) {
LOG.debug(() -> "podReadyWaitingExecutor will be shutdown for : " + candidateIdentity);
podReadyWaitingExecutor.shutdownNow();
try {
podReadyWaitingExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ LeaderElectionConfig fabric8LeaderElectionConfig(LeaderElectionProperties proper

@Bean
@ConditionalOnMissingBean
Lock fabric8LeaderElectionLock(KubernetesClient fabric8KubernetesClient, LeaderElectionProperties properties, String candidateIdentity) {
Lock fabric8LeaderElectionLock(KubernetesClient fabric8KubernetesClient, LeaderElectionProperties properties,
String candidateIdentity) {
boolean leaseSupported = fabric8KubernetesClient.getApiGroups()
.getGroups()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import org.springframework.core.log.LogAccessor;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.attachReadinessLoggerPipeline;
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.blockReadinessCheck;
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.leaderElector;
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.shutDownExecutor;
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.sleep;
import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionInitiatorUtil.attachReadinessLoggerPipeline;
import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionInitiatorUtil.blockReadinessCheck;
import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionInitiatorUtil.shutDownExecutor;
import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionInitiatorUtil.sleep;

/**
* @author wind57
Expand Down Expand Up @@ -152,7 +151,7 @@ CompletableFuture<?> leaderFeature() {

private void startLeaderElection() {

leaderFuture = leaderElector(leaderElectionConfig, fabric8KubernetesClient).start();
leaderFuture = fabric8KubernetesClient.leaderElector().withConfig(leaderElectionConfig).build().start();

leaderFuture.whenComplete((ok, error) -> {

Expand Down
Loading