diff --git a/docs/modules/ROOT/pages/leader-election.adoc b/docs/modules/ROOT/pages/leader-election.adoc index bc0d920a77..da3a961f04 100644 --- a/docs/modules/ROOT/pages/leader-election.adoc +++ b/docs/modules/ROOT/pages/leader-election.adoc @@ -25,3 +25,93 @@ To specify the name of the configmap used for leader election use the following ---- spring.cloud.kubernetes.leader.config-map-name=leader ---- + +''' + +There is another way you can configure leader election, and it comes with native support in the fabric8 library (k8s native client support is not yet implemented). In the long run, this will be the default way to configure leader election, while the previous one will be dropped. You can treat this one much like the JDKs "preview" features. + +To be able to use it, you need to set the property: + +[source] +---- +spring.cloud.kubernetes.leader.election.enabled=true +---- + +Unlike the old implementation, this one will use either the `Lease` _or_ `ConfigMap` as the lock, depending on your cluster version. You can force using configMap still, even if leases are supported, via : + +[source] +---- +spring.cloud.kubernetes.leader.election.use-config-map-as-lock=true +---- + +The name of that `Lease` or `ConfigMap` can be defined using the property (default value is `spring-k8s-leader-election-lock`): + +[source] +---- +spring.cloud.kubernetes.leader.election.lockName=other-name +---- + +The namespace where the lock is created (`default` being set if no explicit one exists) can be set also: + +[source] +---- +spring.cloud.kubernetes.leader.election.lockNamespace=other-namespace +---- + +Before the leader election process kicks in, you can wait until the pod is ready (via the readiness check). This is enabled by default, but you can disable it if needed: + +[source] +---- +spring.cloud.kubernetes.leader.election.waitForPodReady=false +---- + +Like with the old implementation, we will publish events by default, but this can be disabled: + +[source] +---- +spring.cloud.kubernetes.leader.election.publishEvents=false +---- + +There are a few parameters that control how the leader election process will happen. To explain them, we need to look at the high-level implementation of this process. All the candidate pods try to become the leader, or they try to _acquire_ the lock. If the lock is already taken, they will continue to retry to acquire it every `spring.cloud.kubernetes.leader.election.retryPeriod` (value is specified as `java.time.Duration`, and by default it is 2 seconds). + +If the lock is not taken, current pod becomes the leader. It does so by inserting a so-called "record" into the lock (`Lease` or `ConfigMap`). Among the things that the "record" contains, is the `leaseDuration` (that you can specify via `spring.cloud.kubernetes.leader.election.leaseDuration`; by default it is 15 seconds and is of type `java.time.Duration`). This acts like a TTL on the lock: no other candidate can acquire the lock, unless this period has expired (from the last renewal time). + +Once a certain pod establishes itself as the leader (by acquiring the lock), it will continuously (every `spring.cloud.kubernetes.leader.election.retryPeriod`) try to renew its lease, or in other words: it will try to extend its leadership. When a renewal happens, the "record" that is stored inside the lock, is updated. For example, `renewTime` is updated inside the record, to denote when the last renewal happened. (You can always peek inside these fields by using `kubectl describe lease...` for example). + +Renewal must happen within a certain interval, specified by `spring.cloud.kubernetes.leader.election.renewDeadline`. By default, it is equal to 10 seconds, and it means that the leader pod has a maximum of 10 seconds to renew its leadership. If that does not happen, this pod loses its leadership and leader election starts again. Because other pods try to become leaders every 2 seconds (by default), it could mean that the pod that just lost leadership, will become leader again. If you want other pods to have a higher chance of becoming leaders, you can set the property (specified in seconds, by default it is 0) : + +[source] +---- +spring.cloud.kubernetes.leader.election.wait-after-renewal-failure=3 +---- + +This will mean that the pod (that could not renew its lease) and lost leadership, will wait this many seconds, before trying to become leader again. + +Let's try to explain these settings based on an example: there are two pods that participate in leader election. For simplicity let's call them `podA` and `podB`. They both start at the same time: `12:00:00`, but `podA` establishes itself as the leader. This means that every two seconds (`retryPeriod`), `podB` will try to become the new leader. So at `12:00:02`, then at `12:00:04` and so on, it will basically ask : "Can I become the leader?". In our simplified example, the answer to that question can be answered based on `podA` activity. + +After `podA` has become the leader, at every 2 seconds, it will try to "extend" or _renew_ its leadership. So at `12:00:02`, then at `12:00:04` and so on, `podA` goes to the lock and updates its record to reflect that it is still the leader. Between the last successful renewal and the next one, it has exactly 10 seconds (`renewalDeadline`). If it fails to renew its leadership (there is a connection problem or a big GC pause, etc.) within those 10 seconds, it stops leading and `podB` can acquire the leadership now. When `podA` stops being a leader in a graceful way, the lock record is "cleared", basically meaning that `podB` can acquire leadership immediately. + +A different story happens when `podA` dies with an OutOfMemory for example, without being able to gracefully update lock record and this is when `leaseDuration` argument matters. The easiest way to explain is via an example: + +`podA` has renewed its leadership at `12:00:04`, but at `12:00:05` it has been killed by the OOMKiller. At `12:00:06`, `podB` will try to become the leader. It will check if "now" (`12:00:06`) is _after_ last renewal + lease duration, essentially it will check: + +[source] +---- +12:00:06 > (12:00:04 + 00:00:10) +---- + +The condition is not fulfilled, so it can't become the leader. Same result will be at `12:00:08`, `12:00:10` and so on, until `12:00:16` and this is where the TTL (`leaseDuration`) of the lock will expire and `podB` can acquire it. As such, a lower value of `leaseDuration` will mean a faster acquiring of leadership by other pods. + +You might have to give proper RBAC to be able to use this functionality, for example: + +[source] +---- + - apiGroups: [ "coordination.k8s.io" ] + resources: [ "leases", "configmaps" ] + verbs: [ "get", "update", "create", "patch"] +---- + + + + + diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/CachedSingleThreadScheduler.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/CachedSingleThreadScheduler.java new file mode 100644 index 0000000000..0fd7541140 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/CachedSingleThreadScheduler.java @@ -0,0 +1,104 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This is taken from fabric8 with some minor changes (we need it, so it could be placed + * in the common package). + * + * @author wind57 + */ +public final class CachedSingleThreadScheduler { + + private final ReentrantLock lock = new ReentrantLock(); + + private final long ttlMillis; + + private ScheduledThreadPoolExecutor executor; + + public CachedSingleThreadScheduler(long ttlMillis) { + this.ttlMillis = ttlMillis; + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + try { + lock.lock(); + this.startExecutor(); + return this.executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + finally { + lock.unlock(); + } + } + + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + try { + lock.lock(); + this.startExecutor(); + return this.executor.schedule(command, delay, unit); + } + finally { + lock.unlock(); + } + } + + private void startExecutor() { + if (this.executor == null) { + this.executor = new ScheduledThreadPoolExecutor(1, threadFactory()); + this.executor.setRemoveOnCancelPolicy(true); + this.executor.scheduleWithFixedDelay(this::shutdownCheck, this.ttlMillis, this.ttlMillis, + TimeUnit.MILLISECONDS); + } + + } + + private void shutdownCheck() { + try { + lock.lock(); + if (this.executor.getQueue().isEmpty()) { + this.executor.shutdownNow(); + this.executor = null; + } + } + finally { + lock.unlock(); + } + + } + + private ThreadFactory threadFactory() { + return new ThreadFactory() { + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable runnable) { + Thread ret = threadFactory.newThread(runnable); + ret.setName("fabric8-leader-election" + "-" + ret.getName()); + ret.setDaemon(true); + return ret; + } + }; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/PodReady.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/PodReady.java new file mode 100644 index 0000000000..c0fd8ebbe3 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/PodReady.java @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; + +import org.springframework.core.log.LogAccessor; + +/** + * @author wind57 + */ +public final class PodReady { + + private static final LogAccessor LOG = new LogAccessor(PodReady.class); + + private final AtomicReference> podReadyTask = new AtomicReference<>(); + + private final CachedSingleThreadScheduler podReadyScheduler = new CachedSingleThreadScheduler( + TimeUnit.SECONDS.toMillis(10)); + + /** + * The resulting CompletableFuture is completed when pod is ready according to the + * BooleanSupplier. You are supposed to properly get rid of this task, but calling the + * other public method getPodReadyTask and cancel it, this will take care to shutdown + * the executor it was running in. + */ + public CompletableFuture podReady(BooleanSupplier isPodReady, String holderIdentity, String podNamespace) { + + CompletableFuture podReadyFuture = new CompletableFuture<>(); + + podReadyTask.set(podReadyScheduler.scheduleWithFixedDelay(() -> { + + try { + LOG.info(() -> "waiting for pod : " + holderIdentity + " in namespace : " + podNamespace + + " to be ready"); + if (isPodReady.getAsBoolean()) { + LOG.info(() -> "Pod : " + holderIdentity + " in namespace : " + podNamespace + " is ready"); + podReadyFuture.complete(null); + } + else { + LOG.debug(() -> "Pod : " + holderIdentity + " in namespace : " + podNamespace + " is not ready, " + + "will retry in one second"); + } + } + catch (Exception e) { + LOG.error(() -> "exception waiting for pod : " + e.getMessage()); + LOG.error(() -> "leader election for " + holderIdentity + " was not successful"); + podReadyFuture.completeExceptionally(e); + } + + }, 1, 1, TimeUnit.SECONDS)); + + return podReadyFuture; + + } + + public AtomicReference> getPodReadyTask() { + return podReadyTask; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtils.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtils.java index 94771916d3..e015b5c3fa 100644 --- a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtils.java +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtils.java @@ -16,21 +16,43 @@ package org.springframework.cloud.kubernetes.commons.leader; +import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; import java.util.concurrent.locks.ReentrantLock; import org.springframework.cloud.kubernetes.commons.EnvReader; +import org.springframework.core.log.LogAccessor; import org.springframework.util.StringUtils; +import static org.springframework.cloud.kubernetes.commons.KubernetesClientProperties.SERVICE_ACCOUNT_NAMESPACE_PATH; + /** * @author wind57 */ public final class LeaderUtils { + /** + * Prefix for all properties related to leader election. + */ + public static final String LEADER_ELECTION_PROPERTY_PREFIX = "spring.cloud.kubernetes.leader.election"; + + /** + * Property that controls whether leader election is enabled. + */ + public static final String LEADER_ELECTION_ENABLED_PROPERTY = LEADER_ELECTION_PROPERTY_PREFIX + ".enabled"; + + private static final LogAccessor LOG = new LogAccessor(LeaderUtils.class); + // k8s environment variable responsible for host name private static final String HOSTNAME = "HOSTNAME"; + private static final String POD_NAMESPACE = "POD_NAMESPACE"; + private LeaderUtils() { } @@ -45,6 +67,27 @@ public static String hostName() throws UnknownHostException { } } + /** + * ideally, should always be present. If not, downward api must enable this one. + */ + public static Optional podNamespace() { + Path serviceAccountPath = new File(SERVICE_ACCOUNT_NAMESPACE_PATH).toPath(); + boolean serviceAccountNamespaceExists = Files.isRegularFile(serviceAccountPath); + if (serviceAccountNamespaceExists) { + try { + String namespace = new String(Files.readAllBytes(serviceAccountPath)).replace(System.lineSeparator(), + ""); + LOG.info(() -> "read namespace : " + namespace + " from service account " + serviceAccountPath); + return Optional.of(namespace); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + } + return Optional.ofNullable(EnvReader.getEnv(POD_NAMESPACE)); + } + public static void guarded(ReentrantLock lock, Runnable runnable) { try { lock.lock(); diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionDisabled.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionDisabled.java new file mode 100644 index 0000000000..2ce35e8805 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionDisabled.java @@ -0,0 +1,43 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +import static org.springframework.cloud.kubernetes.commons.leader.LeaderUtils.LEADER_ELECTION_ENABLED_PROPERTY; + +/** + * Provides a more succinct conditional for: + * spring.cloud.kubernetes.leader.election.enabled. + * + * @author wind57 + */ +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +@ConditionalOnProperty(value = LEADER_ELECTION_ENABLED_PROPERTY, matchIfMissing = true, havingValue = "false") +public @interface ConditionalOnLeaderElectionDisabled { + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionEnabled.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionEnabled.java new file mode 100644 index 0000000000..6aba3cdf53 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/ConditionalOnLeaderElectionEnabled.java @@ -0,0 +1,43 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +import static org.springframework.cloud.kubernetes.commons.leader.LeaderUtils.LEADER_ELECTION_ENABLED_PROPERTY; + +/** + * Provides a more succinct conditional for: + * spring.cloud.kubernetes.leader.election.enabled. + * + * @author wind57 + */ +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +@ConditionalOnProperty(value = LEADER_ELECTION_ENABLED_PROPERTY, matchIfMissing = false, havingValue = "true") +public @interface ConditionalOnLeaderElectionEnabled { + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionCallbacks.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionCallbacks.java new file mode 100644 index 0000000000..a63b41561a --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionCallbacks.java @@ -0,0 +1,87 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election; + +import java.net.UnknownHostException; +import java.util.function.Consumer; + +import org.springframework.cloud.kubernetes.commons.leader.LeaderUtils; +import org.springframework.cloud.kubernetes.commons.leader.election.events.NewLeaderEvent; +import org.springframework.cloud.kubernetes.commons.leader.election.events.StartLeadingEvent; +import org.springframework.cloud.kubernetes.commons.leader.election.events.StopLeadingEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.core.log.LogAccessor; + +/** + * common leader election callbacks that are supposed to be used in both fabric8 and + * k8s-native clients. + * + * @author wind57 + */ +public class LeaderElectionCallbacks { + + private static final LogAccessor LOG = new LogAccessor(LeaderElectionCallbacks.class); + + @Bean + public final String holderIdentity() throws UnknownHostException { + String podHostName = LeaderUtils.hostName(); + LOG.debug(() -> "using pod hostname : " + podHostName); + return podHostName; + } + + @Bean + public final String podNamespace() { + String podNamespace = LeaderUtils.podNamespace().orElse("default"); + LOG.debug(() -> "using pod namespace : " + podNamespace); + return podNamespace; + } + + @Bean + public final Runnable onStartLeadingCallback(ApplicationEventPublisher applicationEventPublisher, + String holderIdentity, LeaderElectionProperties properties) { + return () -> { + LOG.info(() -> "id : " + holderIdentity + " is now a leader"); + if (properties.publishEvents()) { + applicationEventPublisher.publishEvent(new StartLeadingEvent(holderIdentity)); + } + }; + } + + @Bean + public final Runnable onStopLeadingCallback(ApplicationEventPublisher applicationEventPublisher, + String holderIdentity, LeaderElectionProperties properties) { + return () -> { + LOG.info(() -> "id : " + holderIdentity + " stopped being a leader"); + if (properties.publishEvents()) { + applicationEventPublisher.publishEvent(new StopLeadingEvent(holderIdentity)); + } + }; + } + + @Bean + public final Consumer onNewLeaderCallback(ApplicationEventPublisher applicationEventPublisher, + LeaderElectionProperties properties) { + return holderIdentity -> { + LOG.info(() -> "id : " + holderIdentity + " is the new leader"); + if (properties.publishEvents()) { + applicationEventPublisher.publishEvent(new NewLeaderEvent(holderIdentity)); + } + }; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionProperties.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionProperties.java new file mode 100644 index 0000000000..23900bbf32 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionProperties.java @@ -0,0 +1,58 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election; + +import java.time.Duration; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.bind.DefaultValue; + +import static org.springframework.cloud.kubernetes.commons.leader.LeaderUtils.LEADER_ELECTION_PROPERTY_PREFIX; + +/** + * @author wind57 + */ +// @formatter:off +@ConfigurationProperties(LEADER_ELECTION_PROPERTY_PREFIX) +public record LeaderElectionProperties( + @DefaultValue("true") boolean waitForPodReady, + @DefaultValue("true") boolean publishEvents, + @DefaultValue("15s") Duration leaseDuration, + @DefaultValue("default") String lockNamespace, + @DefaultValue("spring-k8s-leader-election-lock") String lockName, + @DefaultValue("10s") Duration renewDeadline, + @DefaultValue("2s") Duration retryPeriod, + @DefaultValue("0s") Duration waitAfterRenewalFailure, + @DefaultValue("false") boolean useConfigMapAsLock) { +// @formatter:on + + /** + * Coordination group for leader election. + */ + public static final String COORDINATION_GROUP = "coordination.k8s.io"; + + /** + * Coordination version for leader election. + */ + public static final String COORDINATION_VERSION = "v1"; + + /** + * Lease constant. + */ + public static final String LEASE = "Lease"; + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/NewLeaderEvent.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/NewLeaderEvent.java new file mode 100644 index 0000000000..e51d26230c --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/NewLeaderEvent.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election.events; + +import org.springframework.context.ApplicationEvent; + +public final class NewLeaderEvent extends ApplicationEvent { + + private final String holderIdentity; + + public NewLeaderEvent(Object source) { + super(source); + holderIdentity = (String) source; + } + + public String holderIdentity() { + return holderIdentity; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StartLeadingEvent.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StartLeadingEvent.java new file mode 100644 index 0000000000..8fee3db71e --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StartLeadingEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election.events; + +import org.springframework.context.ApplicationEvent; + +/** + * @author wind57 + */ +public final class StartLeadingEvent extends ApplicationEvent { + + private final String holderIdentity; + + public StartLeadingEvent(Object source) { + super(source); + holderIdentity = (String) source; + } + + public String holderIdentity() { + return holderIdentity; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StopLeadingEvent.java b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StopLeadingEvent.java new file mode 100644 index 0000000000..c15405b1dc --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/events/StopLeadingEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election.events; + +import org.springframework.context.ApplicationEvent; + +/** + * @author wind57 + */ +public final class StopLeadingEvent extends ApplicationEvent { + + private final String holderIdentity; + + public StopLeadingEvent(Object source) { + super(source); + holderIdentity = (String) source; + } + + public String holderIdentity() { + return holderIdentity; + } + +} diff --git a/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtilsTests.java b/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtilsTests.java index 25e6bac89b..af65321aea 100644 --- a/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtilsTests.java +++ b/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/LeaderUtilsTests.java @@ -18,6 +18,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Optional; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -62,4 +63,23 @@ void hostNameReadFromApiCall() throws UnknownHostException { inet4AddressMockedStatic.close(); } + @Test + void podNamespaceMissing() { + MockedStatic envReaderMockedStatic = Mockito.mockStatic(EnvReader.class); + // envReaderMockedStatic.when(() -> EnvReader.getEnv("")).thenReturn(""); + Optional podNamespace = LeaderUtils.podNamespace(); + Assertions.assertThat(podNamespace.isEmpty()).isTrue(); + envReaderMockedStatic.close(); + } + + @Test + void podNamespacePresent() { + MockedStatic envReaderMockedStatic = Mockito.mockStatic(EnvReader.class); + envReaderMockedStatic.when(() -> EnvReader.getEnv("POD_NAMESPACE")).thenReturn("podNamespace"); + Optional podNamespace = LeaderUtils.podNamespace(); + Assertions.assertThat(podNamespace.isPresent()).isTrue(); + Assertions.assertThat(podNamespace.get()).isEqualTo("podNamespace"); + envReaderMockedStatic.close(); + } + } diff --git a/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionPropertiesTests.java b/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionPropertiesTests.java new file mode 100644 index 0000000000..619182a817 --- /dev/null +++ b/spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionPropertiesTests.java @@ -0,0 +1,83 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.commons.leader.election; + +import java.time.Duration; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Configuration; + +/** + * @author wind57 + */ +class LeaderElectionPropertiesTests { + + @Test + void testDefaults() { + new ApplicationContextRunner().withUserConfiguration(Config.class).run(context -> { + LeaderElectionProperties properties = context.getBean(LeaderElectionProperties.class); + Assertions.assertThat(properties).isNotNull(); + Assertions.assertThat(properties.publishEvents()).isTrue(); + Assertions.assertThat(properties.waitForPodReady()).isTrue(); + Assertions.assertThat(properties.leaseDuration()).isEqualTo(Duration.ofSeconds(15)); + Assertions.assertThat(properties.lockNamespace()).isEqualTo("default"); + Assertions.assertThat(properties.lockName()).isEqualTo("spring-k8s-leader-election-lock"); + Assertions.assertThat(properties.renewDeadline()).isEqualTo(Duration.ofSeconds(10)); + Assertions.assertThat(properties.retryPeriod()).isEqualTo(Duration.ofSeconds(2)); + Assertions.assertThat(properties.waitAfterRenewalFailure()).isEqualTo(Duration.ofSeconds(0)); + Assertions.assertThat(properties.useConfigMapAsLock()).isFalse(); + }); + } + + @Test + void testNonDefaults() { + new ApplicationContextRunner().withUserConfiguration(Config.class) + .withPropertyValues("spring.cloud.kubernetes.leader.election.wait-for-pod-ready=false", + "spring.cloud.kubernetes.leader.election.publish-events=false", + "spring.cloud.kubernetes.leader.election.lease-duration=10s", + "spring.cloud.kubernetes.leader.election.lock-namespace=lock-namespace", + "spring.cloud.kubernetes.leader.election.lock-name=lock-name", + "spring.cloud.kubernetes.leader.election.renew-deadline=2d", + "spring.cloud.kubernetes.leader.election.retry-period=3m", + "spring.cloud.kubernetes.leader.election.wait-after-renewal-failure=13m", + "spring.cloud.kubernetes.leader.election.use-config-map-as-lock=true") + .run(context -> { + LeaderElectionProperties properties = context.getBean(LeaderElectionProperties.class); + Assertions.assertThat(properties).isNotNull(); + Assertions.assertThat(properties.waitForPodReady()).isFalse(); + Assertions.assertThat(properties.publishEvents()).isFalse(); + Assertions.assertThat(properties.leaseDuration()).isEqualTo(Duration.ofSeconds(10)); + Assertions.assertThat(properties.lockNamespace()).isEqualTo("lock-namespace"); + Assertions.assertThat(properties.lockName()).isEqualTo("lock-name"); + Assertions.assertThat(properties.renewDeadline()).isEqualTo(Duration.ofDays(2)); + Assertions.assertThat(properties.retryPeriod()).isEqualTo(Duration.ofMinutes(3)); + Assertions.assertThat(properties.waitAfterRenewalFailure()).isEqualTo(Duration.ofMinutes(13)); + Assertions.assertThat(properties.useConfigMapAsLock()).isTrue(); + }); + } + + @EnableConfigurationProperties(LeaderElectionProperties.class) + @Configuration + static class Config { + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderAutoConfiguration.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderAutoConfiguration.java index 76288e5006..bb4cc1bd14 100644 --- a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderAutoConfiguration.java +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderAutoConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.cloud.kubernetes.commons.leader.LeaderInitiator; import org.springframework.cloud.kubernetes.commons.leader.LeaderProperties; import org.springframework.cloud.kubernetes.commons.leader.LeaderUtils; +import org.springframework.cloud.kubernetes.commons.leader.election.ConditionalOnLeaderElectionDisabled; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -44,7 +45,8 @@ @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(LeaderProperties.class) @ConditionalOnBean(KubernetesClient.class) -@ConditionalOnProperty(value = "spring.cloud.kubernetes.leader.enabled", matchIfMissing = true) +@ConditionalOnLeaderElectionDisabled +@ConditionalOnProperty(value = "spring.cloud.kubernetes.leader.enabled", havingValue = "true", matchIfMissing = true) public class Fabric8LeaderAutoConfiguration { /* diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfiguration.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfiguration.java new file mode 100644 index 0000000000..a897c779b4 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfiguration.java @@ -0,0 +1,127 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import io.fabric8.kubernetes.api.model.APIResource; +import io.fabric8.kubernetes.api.model.APIResourceList; +import io.fabric8.kubernetes.api.model.GroupVersionForDiscovery; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock; + +import org.springframework.boot.actuate.info.InfoContributor; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnCloudPlatform; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.cloud.CloudPlatform; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.health.autoconfigure.contributor.ConditionalOnEnabledHealthIndicator; +import org.springframework.cloud.kubernetes.commons.leader.election.ConditionalOnLeaderElectionEnabled; +import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; + +import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties.COORDINATION_GROUP; +import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties.COORDINATION_VERSION; +import static org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties.LEASE; + +/** + * @author wind57 + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(LeaderElectionProperties.class) +@ConditionalOnBean(KubernetesClient.class) +@ConditionalOnLeaderElectionEnabled +@ConditionalOnCloudPlatform(CloudPlatform.KUBERNETES) +@AutoConfigureAfter(Fabric8LeaderElectionCallbacksAutoConfiguration.class) +class Fabric8LeaderElectionAutoConfiguration { + + private static final String COORDINATION_VERSION_GROUP = COORDINATION_GROUP + "/" + COORDINATION_VERSION; + + private static final LogAccessor LOG = new LogAccessor(Fabric8LeaderElectionAutoConfiguration.class); + + @Bean + @ConditionalOnClass(InfoContributor.class) + @ConditionalOnEnabledHealthIndicator("leader.election") + Fabric8LeaderElectionInfoContributor leaderElectionInfoContributor(String holderIdentity, + LeaderElectionConfig leaderElectionConfig, KubernetesClient fabric8KubernetesClient) { + return new Fabric8LeaderElectionInfoContributor(holderIdentity, leaderElectionConfig, fabric8KubernetesClient); + } + + @Bean + @ConditionalOnMissingBean + Fabric8LeaderElectionInitiator fabric8LeaderElectionInitiator(String holderIdentity, String podNamespace, + KubernetesClient fabric8KubernetesClient, LeaderElectionConfig fabric8LeaderElectionConfig, + LeaderElectionProperties leaderElectionProperties) { + return new Fabric8LeaderElectionInitiator(holderIdentity, podNamespace, fabric8KubernetesClient, + fabric8LeaderElectionConfig, leaderElectionProperties); + } + + @Bean + @ConditionalOnMissingBean + LeaderElectionConfig fabric8LeaderElectionConfig(LeaderElectionProperties properties, Lock lock, + Fabric8LeaderElectionCallbacks fabric8LeaderElectionCallbacks) { + return new LeaderElectionConfigBuilder().withReleaseOnCancel() + .withName("Spring k8s leader election") + .withLeaseDuration(properties.leaseDuration()) + .withLock(lock) + .withRenewDeadline(properties.renewDeadline()) + .withRetryPeriod(properties.retryPeriod()) + .withLeaderCallbacks(fabric8LeaderElectionCallbacks) + .build(); + } + + @Bean + @ConditionalOnMissingBean + Lock lock(KubernetesClient fabric8KubernetesClient, LeaderElectionProperties properties, String holderIdentity) { + boolean leaseSupported = fabric8KubernetesClient.getApiGroups() + .getGroups() + .stream() + .flatMap(x -> x.getVersions().stream()) + .map(GroupVersionForDiscovery::getGroupVersion) + .filter(COORDINATION_VERSION_GROUP::equals) + .findFirst() + .map(fabric8KubernetesClient::getApiResources) + .map(APIResourceList::getResources) + .map(x -> x.stream().map(APIResource::getKind)) + .flatMap(x -> x.filter(y -> y.equals(LEASE)).findFirst()) + .isPresent(); + + if (leaseSupported) { + if (properties.useConfigMapAsLock()) { + LOG.info(() -> "leases are supported on the cluster, but config map will be used " + + "(because 'spring.cloud.kubernetes.leader.election.use-config-map-as-lock=true')"); + return new ConfigMapLock(properties.lockNamespace(), properties.lockName(), holderIdentity); + } + else { + LOG.info(() -> "will use lease as the lock for leader election"); + return new LeaseLock(properties.lockNamespace(), properties.lockName(), holderIdentity); + } + } + else { + LOG.info(() -> "will use configmap as the lock for leader election"); + return new ConfigMapLock(properties.lockNamespace(), properties.lockName(), holderIdentity); + } + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacks.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacks.java new file mode 100644 index 0000000000..910d7fb3a7 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacks.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.util.function.Consumer; + +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; + +/** + * @author wind57 + */ +final class Fabric8LeaderElectionCallbacks extends LeaderCallbacks { + + Fabric8LeaderElectionCallbacks(Runnable onStartLeading, Runnable onStopLeading, Consumer onNewLeader) { + super(onStartLeading, onStopLeading, onNewLeader); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfiguration.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfiguration.java new file mode 100644 index 0000000000..cc06700357 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfiguration.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.util.function.Consumer; + +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnCloudPlatform; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.cloud.CloudPlatform; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.kubernetes.commons.KubernetesCommonsAutoConfiguration; +import org.springframework.cloud.kubernetes.commons.leader.election.ConditionalOnLeaderElectionEnabled; +import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionCallbacks; +import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties; +import org.springframework.cloud.kubernetes.fabric8.Fabric8AutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(LeaderElectionProperties.class) +@ConditionalOnBean(KubernetesClient.class) +@ConditionalOnLeaderElectionEnabled +@ConditionalOnCloudPlatform(CloudPlatform.KUBERNETES) +@AutoConfigureAfter({ Fabric8AutoConfiguration.class, KubernetesCommonsAutoConfiguration.class }) +class Fabric8LeaderElectionCallbacksAutoConfiguration extends LeaderElectionCallbacks { + + @Bean + @ConditionalOnMissingBean + Fabric8LeaderElectionCallbacks fabric8LeaderElectionCallbacks(Runnable onStartLeadingCallback, + Runnable onStopLeadingCallback, Consumer onNewLeaderCallback) { + return new Fabric8LeaderElectionCallbacks(onStartLeadingCallback, onStopLeadingCallback, onNewLeaderCallback); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributor.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributor.java new file mode 100644 index 0000000000..c6a977774b --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributor.java @@ -0,0 +1,60 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; + +import org.springframework.boot.actuate.info.Info; +import org.springframework.boot.actuate.info.InfoContributor; + +/** + * @author wind57 + */ +final class Fabric8LeaderElectionInfoContributor implements InfoContributor { + + private final String holderIdentity; + + private final LeaderElectionConfig leaderElectionConfig; + + private final KubernetesClient fabric8KubernetesClient; + + Fabric8LeaderElectionInfoContributor(String holderIdentity, LeaderElectionConfig leaderElectionConfig, + KubernetesClient fabric8KubernetesClient) { + this.holderIdentity = holderIdentity; + this.leaderElectionConfig = leaderElectionConfig; + this.fabric8KubernetesClient = fabric8KubernetesClient; + } + + @Override + public void contribute(Info.Builder builder) { + Map details = new HashMap<>(); + Optional.ofNullable(leaderElectionConfig.getLock().get(fabric8KubernetesClient)) + .ifPresentOrElse(leaderRecord -> { + boolean isLeader = holderIdentity.equals(leaderRecord.getHolderIdentity()); + details.put("leaderId", holderIdentity); + details.put("isLeader", isLeader); + }, () -> details.put("leaderId", "Unknown")); + + builder.withDetail("leaderElection", details); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInitiator.java b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInitiator.java new file mode 100644 index 0000000000..236cb914ed --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInitiator.java @@ -0,0 +1,202 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; +import io.fabric8.kubernetes.client.readiness.Readiness; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import org.springframework.cloud.kubernetes.commons.PodReady; +import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties; +import org.springframework.core.log.LogAccessor; + +/** + * @author wind57 + */ +final class Fabric8LeaderElectionInitiator { + + private static final LogAccessor LOG = new LogAccessor(Fabric8LeaderElectionInitiator.class); + + private final PodReady podReady = new PodReady(); + + private final String holderIdentity; + + private final String podNamespace; + + private final KubernetesClient fabric8KubernetesClient; + + private final LeaderElectionConfig leaderElectionConfig; + + private final LeaderElectionProperties leaderElectionProperties; + + private final AtomicReference executorService = new AtomicReference<>(); + + private final AtomicReference> leaderFutureReference = new AtomicReference<>(); + + private volatile boolean destroyCalled = false; + + Fabric8LeaderElectionInitiator(String holderIdentity, String podNamespace, KubernetesClient fabric8KubernetesClient, + LeaderElectionConfig leaderElectionConfig, LeaderElectionProperties leaderElectionProperties) { + this.holderIdentity = holderIdentity; + this.podNamespace = podNamespace; + this.fabric8KubernetesClient = fabric8KubernetesClient; + this.leaderElectionConfig = leaderElectionConfig; + this.leaderElectionProperties = leaderElectionProperties; + } + + /** + * in a CachedSingleThreadScheduler start pod readiness and keep it running 'forever', + * until it is successful or failed. That is run in a daemon thread. + * + * In a different pool ('executorService'), block until the above CompletableFuture is + * done. Only when it's done, start the leader election process. If pod readiness + * fails, leader election is not started. + * + */ + @PostConstruct + void postConstruct() { + LOG.info(() -> "starting leader initiator : " + holderIdentity); + executorService.set(Executors + .newSingleThreadExecutor(r -> new Thread(r, "Fabric8LeaderElectionInitiator-" + holderIdentity))); + CompletableFuture podReadyFuture; + + // wait until pod is ready + if (leaderElectionProperties.waitForPodReady()) { + LOG.info(() -> "will wait until pod " + holderIdentity + " is ready : "); + podReadyFuture = podReady.podReady(() -> { + Pod pod = fabric8KubernetesClient.pods().inNamespace(podNamespace).withName(holderIdentity).get(); + return Readiness.isPodReady(pod); + }, holderIdentity, podNamespace); + + } + else { + podReadyFuture = new CompletableFuture<>(); + } + + // wait in a different thread until the pod is ready + // and in the same thread start the leader election + executorService.get().submit(() -> { + if (leaderElectionProperties.waitForPodReady()) { + CompletableFuture ready = podReadyFuture.whenComplete((ok, error) -> { + if (error != null) { + LOG.error(() -> "readiness failed for : " + holderIdentity); + LOG.error(() -> "leader election for : " + holderIdentity + " will not start"); + } + else { + LOG.info(() -> holderIdentity + " is ready"); + } + // we cancel the future that checks readiness of the pod + // and thus also close the pool that was running it. + podReady.getPodReadyTask().get().cancel(true); + }); + try { + ready.get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // readiness check passed, start leader election + if (!podReadyFuture.isCompletedExceptionally()) { + startLeaderElection(); + } + + } + else { + startLeaderElection(); + } + }); + + } + + @PreDestroy + void preDestroy() { + destroyCalled(); + LOG.info(() -> "preDestroy called in the leader initiator : " + holderIdentity); + if (podReady.getPodReadyTask().get() != null) { + // if the task is not running, this has no effect + // if the task is running, calling this will also make sure + // that the caching executor will shut down too. + podReady.getPodReadyTask().get().cancel(true); + } + + if (leaderFutureReference.get() != null) { + LOG.info(() -> "leader will be canceled : " + holderIdentity); + // needed to release the lock, fabric8 internally expects this one to be + // called + leaderFutureReference.get().cancel(true); + } + shutDownExecutor(); + } + + void destroyCalled() { + destroyCalled = true; + } + + void shutDownExecutor() { + executorService.get().shutdownNow(); + } + + private void startLeaderElection() { + try { + CompletableFuture leaderFuture = leaderElector(leaderElectionConfig, fabric8KubernetesClient).start(); + leaderFuture.whenCompleteAsync((ok, error) -> { + + if (ok != null) { + LOG.info(() -> "leaderFuture finished normally, will re-start it for : " + holderIdentity); + startLeaderElection(); + return; + } + + if (error instanceof CancellationException) { + if (!destroyCalled) { + LOG.warn(() -> "renewal failed for : " + holderIdentity + ", will re-start it after : " + + leaderElectionProperties.waitAfterRenewalFailure().toSeconds() + " seconds"); + try { + TimeUnit.SECONDS.sleep(leaderElectionProperties.waitAfterRenewalFailure().toSeconds()); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + startLeaderElection(); + } + } + }, executorService.get()); + leaderFutureReference.set(leaderFuture); + leaderFutureReference.get().get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private LeaderElector leaderElector(LeaderElectionConfig config, KubernetesClient fabric8KubernetesClient) { + return fabric8KubernetesClient.leaderElector().withConfig(config).build(); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-kubernetes-fabric8-leader/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index d612cac6dc..e50b5a10a3 100644 --- a/spring-cloud-kubernetes-fabric8-leader/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-kubernetes-fabric8-leader/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,4 @@ org.springframework.cloud.kubernetes.fabric8.leader.Fabric8LeaderAutoConfiguration +org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionCallbacksAutoConfiguration +org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionAutoConfiguration + diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderApp.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderApp.java new file mode 100644 index 0000000000..ee948718ce --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/Fabric8LeaderApp.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.dsl.internal.BaseOperation; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock; +import org.mockito.Mockito; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +@Configuration +public class Fabric8LeaderApp { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + KubernetesClient kubernetesClient() { + KubernetesClient client = Mockito.mock(KubernetesClient.class); + Mockito.when(client.getNamespace()).thenReturn("a"); + + MixedOperation mixedOperation = Mockito.mock(MixedOperation.class); + Mockito.when(client.configMaps()).thenReturn(mixedOperation); + + PodResource podResource = Mockito.mock(PodResource.class); + Mockito.when(podResource.isReady()).thenReturn(true); + + Mockito.when(client.pods()).thenReturn(mixedOperation); + Mockito.when(mixedOperation.withName(Mockito.anyString())).thenReturn(podResource); + + Resource resource = Mockito.mock(Resource.class); + + BaseOperation baseOperation = Mockito.mock(BaseOperation.class); + Mockito.when(baseOperation.withName("leaders")).thenReturn(resource); + + Mockito.when(mixedOperation.inNamespace("a")).thenReturn(baseOperation); + return client; + } + + @Bean + @Primary + Lock lock() { + return Mockito.mock(Lock.class); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderAutoConfigurationTests.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderAutoConfigurationTests.java new file mode 100644 index 0000000000..49227a3363 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderAutoConfigurationTests.java @@ -0,0 +1,117 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.kubernetes.fabric8.leader.Fabric8LeaderApp; +import org.springframework.cloud.kubernetes.fabric8.leader.Fabric8LeaderAutoConfiguration; +import org.springframework.cloud.kubernetes.fabric8.leader.Fabric8PodReadinessWatcher; + +/** + * tests that ensure 'spring.cloud.kubernetes.leader.election' enabled correct + * auto-configurations, when it is enabled/disabled. + * + * @author wind57 + */ +class Fabric8LeaderAutoConfigurationTests { + + /** + *
+	 *     - spring.cloud.kubernetes.leader.election is not present
+	 *
+	 *     As such:
+	 *
+	 *     - Fabric8LeaderAutoConfiguration must be picked up
+	 *     - Fabric8LeaderElectionAutoConfiguration must not be picked up
+	 * 
+ */ + @Test + void leaderElectionAnnotationMissing() { + new ApplicationContextRunner().withUserConfiguration(Fabric8LeaderApp.class) + .withConfiguration(AutoConfigurations.of(Fabric8LeaderAutoConfiguration.class, + Fabric8LeaderElectionAutoConfiguration.class, + Fabric8LeaderElectionCallbacksAutoConfiguration.class)) + .run(context -> { + + // this one comes from Fabric8LeaderElectionAutoConfiguration + Assertions.assertThat(context).doesNotHaveBean(Fabric8LeaderElectionInitiator.class); + + // this one comes from Fabric8LeaderAutoConfiguration + Assertions.assertThat(context).hasSingleBean(Fabric8PodReadinessWatcher.class); + }); + } + + /** + *
+	 *     - spring.cloud.kubernetes.leader.election = false
+	 *
+	 *     As such:
+	 *
+	 *     - Fabric8LeaderAutoConfiguration must be picked up
+	 *     - Fabric8LeaderElectionAutoConfiguration must not be picked up
+	 * 
+ */ + @Test + void leaderElectionAnnotationPresentEqualToFalse() { + new ApplicationContextRunner().withUserConfiguration(Fabric8LeaderApp.class) + .withConfiguration(AutoConfigurations.of(Fabric8LeaderAutoConfiguration.class, + Fabric8LeaderElectionAutoConfiguration.class, + Fabric8LeaderElectionCallbacksAutoConfiguration.class)) + .withPropertyValues("spring.cloud.kubernetes.leader.election.enabled=false") + .run(context -> { + + // this one comes from Fabric8LeaderElectionAutoConfiguration + Assertions.assertThat(context).doesNotHaveBean(Fabric8LeaderElectionInitiator.class); + + // this one comes from Fabric8LeaderAutoConfiguration + Assertions.assertThat(context).hasSingleBean(Fabric8PodReadinessWatcher.class); + }); + } + + /** + *
+	 *     - spring.cloud.kubernetes.leader.election = false
+	 *
+	 *     As such:
+	 *
+	 *     - Fabric8LeaderAutoConfiguration must not be picked up
+	 *     - Fabric8LeaderElectionAutoConfiguration must be picked up
+	 * 
+ */ + @Test + void leaderElectionAnnotationPresentEqualToTrue() { + new ApplicationContextRunner().withUserConfiguration(Fabric8LeaderApp.class) + .withConfiguration(AutoConfigurations.of(Fabric8LeaderAutoConfiguration.class, + Fabric8LeaderElectionAutoConfiguration.class, + Fabric8LeaderElectionCallbacksAutoConfiguration.class)) + .withPropertyValues("spring.cloud.kubernetes.leader.election.enabled=true", + "spring.main.cloud-platform=kubernetes") + .run(context -> { + + // this one comes from Fabric8LeaderElectionAutoConfiguration + Assertions.assertThat(context).hasSingleBean(Fabric8LeaderElectionInitiator.class); + + // this one comes from Fabric8LeaderAutoConfiguration + Assertions.assertThat(context).doesNotHaveBean(Fabric8PodReadinessWatcher.class); + }); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfigurationTests.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfigurationTests.java new file mode 100644 index 0000000000..9ce9d48e9d --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionAutoConfigurationTests.java @@ -0,0 +1,118 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import io.fabric8.kubernetes.api.model.APIGroupList; +import io.fabric8.kubernetes.api.model.APIGroupListBuilder; +import io.fabric8.kubernetes.api.model.APIResourceBuilder; +import io.fabric8.kubernetes.api.model.APIResourceListBuilder; +import io.fabric8.kubernetes.api.model.GroupVersionForDiscoveryBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.kubernetes.commons.KubernetesCommonsAutoConfiguration; +import org.springframework.cloud.kubernetes.fabric8.Fabric8AutoConfiguration; +import org.springframework.context.annotation.Bean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author wind57 + */ +class Fabric8LeaderElectionAutoConfigurationTests { + + private ApplicationContextRunner applicationContextRunner; + + @Test + void allBeansPresent() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(Fabric8LeaderElectionInfoContributor.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionInitiator.class); + assertThat(context).hasSingleBean(LeaderElectionConfig.class); + assertThat(context).hasSingleBean(Lock.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionCallbacks.class); + }); + } + + @Test + void allBeansPresentWithoutHealthIndicator() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true", + "management.health.leader.election.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionInfoContributor.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionInitiator.class); + assertThat(context).hasSingleBean(LeaderElectionConfig.class); + assertThat(context).hasSingleBean(Lock.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionCallbacks.class); + assertThat(context).hasBean("onStartLeadingCallback"); + assertThat(context).hasBean("onStopLeadingCallback"); + assertThat(context).hasBean("onNewLeaderCallback"); + }); + } + + @Test + void allBeansMissing() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionInfoContributor.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionInitiator.class); + assertThat(context).doesNotHaveBean(LeaderElectionConfig.class); + assertThat(context).doesNotHaveBean(Lock.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionCallbacks.class); + }); + } + + private void setup(String... properties) { + applicationContextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(Fabric8LeaderElectionCallbacksAutoConfiguration.class, + Fabric8AutoConfiguration.class, KubernetesCommonsAutoConfiguration.class, + Fabric8LeaderElectionAutoConfiguration.class)) + .withUserConfiguration(Configuration.class) + .withPropertyValues(properties); + } + + @TestConfiguration + static class Configuration { + + @Bean + KubernetesClient mockKubernetesClient() { + KubernetesClient client = Mockito.mock(KubernetesClient.class); + + Mockito.when(client.getApiResources("coordination.k8s.io/v1")) + .thenReturn( + new APIResourceListBuilder().withResources(new APIResourceBuilder().withKind("Lease").build()) + .build()); + + APIGroupList apiGroupList = new APIGroupListBuilder().addNewGroup() + .withVersions(new GroupVersionForDiscoveryBuilder().withGroupVersion("coordination.k8s.io/v1").build()) + .endGroup() + .build(); + + Mockito.when(client.getApiGroups()).thenReturn(apiGroupList); + return client; + } + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfigurationTests.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfigurationTests.java new file mode 100644 index 0000000000..dbf744a441 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionCallbacksAutoConfigurationTests.java @@ -0,0 +1,68 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.kubernetes.commons.KubernetesCommonsAutoConfiguration; +import org.springframework.cloud.kubernetes.fabric8.Fabric8AutoConfiguration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author wind57 + */ +class Fabric8LeaderElectionCallbacksAutoConfigurationTests { + + private ApplicationContextRunner applicationContextRunner; + + @Test + void allBeansPresent() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).hasBean("holderIdentity"); + assertThat(context).hasBean("podNamespace"); + assertThat(context).hasBean("onStartLeadingCallback"); + assertThat(context).hasBean("onStopLeadingCallback"); + assertThat(context).hasBean("onNewLeaderCallback"); + assertThat(context).hasSingleBean(Fabric8LeaderElectionCallbacks.class); + }); + } + + @Test + void allBeansMissing() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean("holderIdentity"); + assertThat(context).doesNotHaveBean("podNamespace"); + assertThat(context).doesNotHaveBean("onStartLeadingCallback"); + assertThat(context).doesNotHaveBean("onStopLeadingCallback"); + assertThat(context).doesNotHaveBean("onNewLeaderCallback"); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionCallbacks.class); + }); + } + + private void setup(String... properties) { + applicationContextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(Fabric8LeaderElectionCallbacksAutoConfiguration.class, + Fabric8AutoConfiguration.class, KubernetesCommonsAutoConfiguration.class)) + .withPropertyValues(properties); + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionConcurrentITTest.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionConcurrentITTest.java new file mode 100644 index 0000000000..057d4301d0 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionConcurrentITTest.java @@ -0,0 +1,237 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.time.Duration; +import java.util.function.Consumer; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.testcontainers.k3s.K3sContainer; + +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties; +import org.springframework.cloud.kubernetes.integration.tests.commons.Commons; + +/** + * @author wind57 + */ +@ExtendWith(OutputCaptureExtension.class) +class Fabric8LeaderElectionConcurrentITTest { + + private static final LeaderElectionProperties PROPERTIES = new LeaderElectionProperties(false, false, + Duration.ofSeconds(15), "default", "lease-lock", Duration.ofSeconds(5), Duration.ofSeconds(2), + Duration.ofSeconds(5), false); + + private static K3sContainer container; + + private static final String HOLDER_IDENTITY_ONE = "one"; + + private static final String HOLDER_IDENTITY_TWO = "two"; + + @BeforeAll + static void beforeAll() { + container = Commons.container(); + container.start(); + } + + @AfterAll + static void afterAll() { + container.stop(); + } + + @Test + void test(CapturedOutput output) { + + String kubeConfigYaml = container.getKubeConfigYaml(); + Config config = Config.fromKubeconfig(kubeConfigYaml); + KubernetesClient kubernetesClient = new KubernetesClientBuilder().withConfig(config).build(); + + LeaderElectionConfig leaderElectionConfigOne = leaderElectionConfig(HOLDER_IDENTITY_ONE); + Fabric8LeaderElectionInitiator one = new Fabric8LeaderElectionInitiator(HOLDER_IDENTITY_ONE, "default", + kubernetesClient, leaderElectionConfigOne, PROPERTIES); + one = Mockito.spy(one); + + LeaderElectionConfig leaderElectionConfigTwo = leaderElectionConfig(HOLDER_IDENTITY_TWO); + Fabric8LeaderElectionInitiator two = new Fabric8LeaderElectionInitiator(HOLDER_IDENTITY_TWO, "default", + kubernetesClient, leaderElectionConfigTwo, PROPERTIES); + two = Mockito.spy(two); + + one.postConstruct(); + two.postConstruct(); + + // both try to acquire the lock + awaitForMessage(output, "Attempting to acquire leader lease 'LeaseLock: default - lease-lock (two)'..."); + awaitForMessage(output, "Attempting to acquire leader lease 'LeaseLock: default - lease-lock (one)'..."); + awaitForMessage(output, "Leader changed from null to "); + + LeaderAndFollower leaderAndFollower = leaderAndFollower(leaderElectionConfigOne, kubernetesClient); + String leader = leaderAndFollower.leader(); + String follower = leaderAndFollower.follower(); + + awaitForMessage(output, "Leader changed from null to " + leader); + awaitForMessage(output, "id : " + leader + " is the new leader"); + awaitForMessage(output, + "Successfully Acquired leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'"); + + // renewal happens for the current leader + awaitForMessage(output, + "Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'..."); + awaitForMessage(output, "Acquired lease 'LeaseLock: default - lease-lock (" + leader + ")'"); + + // the other elector says it can't acquire the lock + awaitForMessage(output, "Lock is held by " + leader + " and has not yet expired"); + awaitForMessage(output, + "Failed to acquire lease 'LeaseLock: " + "default - lease-lock (" + follower + ")' retrying..."); + + int beforeRelease = output.getOut().length(); + failLeaderRenewal(leader, one, two); + + /* + * we simulated above that renewal failed and leader future was canceled. In this + * case, the 'notLeader' picks up the leadership, the 'leader' is now a + * "follower", it re-tries to take leadership. + */ + awaitForMessageFromPosition(output, beforeRelease, "id : " + follower + " is the new leader"); + awaitForMessageFromPosition(output, beforeRelease, + "Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + follower + ")'..."); + awaitForMessageFromPosition(output, beforeRelease, + "Acquired lease 'LeaseLock: default - lease-lock (" + follower + ")'"); + + // proves that the canceled leader tries to acquire again the leadership + awaitForMessageFromPosition(output, beforeRelease, + "Attempting to acquire leader lease 'LeaseLock: default - lease-lock (" + leader + ")'..."); + awaitForMessageFromPosition(output, beforeRelease, "Lock is held by " + follower + " and has not yet expired"); + + /* + * we simulate the renewal failure one more time. we know that leader = 'follower' + */ + beforeRelease = output.getOut().length(); + failLeaderRenewal(follower, one, two); + + awaitForMessageFromPosition(output, beforeRelease, "id : " + leader + " is the new leader"); + awaitForMessageFromPosition(output, beforeRelease, + "Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'..."); + awaitForMessageFromPosition(output, beforeRelease, + "Acquired lease 'LeaseLock: default - lease-lock (" + leader + ")'"); + + // proves that the canceled leader tries to acquire again the leadership + awaitForMessageFromPosition(output, beforeRelease, + "Attempting to acquire leader lease 'LeaseLock: default - lease-lock (" + follower + ")'..."); + awaitForMessageFromPosition(output, beforeRelease, "Lock is held by " + leader + " and has not yet expired"); + + } + + /** + *
+	 * 		simulate that renewal failed, we do this by:
+	 * 			- calling preDestroy, thus calling future::cancel
+	 * 		      (same as fabric8 internals will do)
+	 * 		    - do not shutdown the executor
+	 * 
+ */ + private void assumeRenewalFailed(Fabric8LeaderElectionInitiator initiator) { + Mockito.doNothing().when(initiator).destroyCalled(); + Mockito.doNothing().when(initiator).shutDownExecutor(); + } + + private LeaderElectionConfig leaderElectionConfig(String holderIdentity) { + + LeaseLock lock = leaseLock(holderIdentity); + Fabric8LeaderElectionCallbacks callbacks = callbacks(holderIdentity); + + return new LeaderElectionConfigBuilder().withReleaseOnCancel() + .withName("leader-election-config") + .withLeaseDuration(PROPERTIES.leaseDuration()) + .withLock(lock) + .withRenewDeadline(PROPERTIES.renewDeadline()) + .withRetryPeriod(PROPERTIES.retryPeriod()) + .withLeaderCallbacks(callbacks) + .build(); + } + + private LeaseLock leaseLock(String holderIdentity) { + return new LeaseLock("default", "lease-lock", holderIdentity); + } + + private Fabric8LeaderElectionCallbacks callbacks(String holderIdentity) { + Fabric8LeaderElectionCallbacksAutoConfiguration configuration = new Fabric8LeaderElectionCallbacksAutoConfiguration(); + + Runnable onStartLeadingCallback = configuration.onStartLeadingCallback(null, holderIdentity, PROPERTIES); + Runnable onStopLeadingCallback = configuration.onStopLeadingCallback(null, holderIdentity, PROPERTIES); + Consumer onNewLeaderCallback = configuration.onNewLeaderCallback(null, PROPERTIES); + + return new Fabric8LeaderElectionCallbacks(onStartLeadingCallback, onStopLeadingCallback, onNewLeaderCallback); + } + + private void awaitForMessage(CapturedOutput output, String message) { + Awaitility.await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(10)) + .until(() -> output.getOut().contains(message)); + } + + private void awaitForMessageFromPosition(CapturedOutput output, int from, String message) { + Awaitility.await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(10)) + .until(() -> output.getOut().substring(from).contains(message)); + } + + private LeaderAndFollower leaderAndFollower(LeaderElectionConfig leaderElectionConfig, + KubernetesClient kubernetesClient) { + boolean oneIsLeader = leaderElectionConfig.getLock() + .get(kubernetesClient) + .getHolderIdentity() + .equals(HOLDER_IDENTITY_ONE); + + if (oneIsLeader) { + return new LeaderAndFollower("one", "two"); + } + else { + return new LeaderAndFollower("two", "one"); + } + } + + private void failLeaderRenewal(String currentLeader, Fabric8LeaderElectionInitiator one, + Fabric8LeaderElectionInitiator two) { + if (currentLeader.equals("one")) { + assumeRenewalFailed(one); + one.preDestroy(); + } + else { + assumeRenewalFailed(two); + two.preDestroy(); + } + } + + private record LeaderAndFollower(String leader, String follower) { + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsLeaderTest.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsLeaderTest.java new file mode 100644 index 0000000000..00a965b38a --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsLeaderTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.time.ZonedDateTime; + +import io.fabric8.kubernetes.api.model.APIGroupList; +import io.fabric8.kubernetes.api.model.APIGroupListBuilder; +import io.fabric8.kubernetes.api.model.APIResourceBuilder; +import io.fabric8.kubernetes.api.model.APIResourceListBuilder; +import io.fabric8.kubernetes.api.model.GroupVersionForDiscoveryBuilder; +import io.fabric8.kubernetes.api.model.coordination.v1.Lease; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseSpecBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.server.test.LocalManagementPort; +import org.springframework.cloud.kubernetes.commons.leader.LeaderUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.http.MediaType; +import org.springframework.test.web.reactive.server.WebTestClient; + +/** + * @author wind57 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { "spring.main.cloud-platform=KUBERNETES", "management.endpoints.web.exposure.include=info", + "management.endpoint.info.show-details=always", "management.info.kubernetes.enabled=true", + "spring.cloud.kubernetes.leader.election.enabled=true" }) +class Fabric8LeaderElectionInfoContributorIsLeaderTest { + + private static final String HOLDER_IDENTITY = "leader"; + + @LocalManagementPort + private int port; + + @Autowired + private WebTestClient webClient; + + private static MockedStatic leaderUtilsMockedStatic; + + @BeforeAll + static void beforeAll() { + leaderUtilsMockedStatic = Mockito.mockStatic(LeaderUtils.class); + leaderUtilsMockedStatic.when(LeaderUtils::hostName).thenReturn(HOLDER_IDENTITY); + } + + @AfterAll + static void afterAll() { + leaderUtilsMockedStatic.close(); + } + + @Test + void infoEndpointIsLeaderTest() { + webClient.get() + .uri("http://localhost:{port}/actuator/info", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus() + .isOk() + .expectBody() + .jsonPath("leaderElection.isLeader") + .isEqualTo(true) + .jsonPath("leaderElection.leaderId") + .isEqualTo(HOLDER_IDENTITY); + } + + @TestConfiguration + static class Configuration { + + @Bean + @Primary + KubernetesClient mockKubernetesClient() { + KubernetesClient client = Mockito.mock(KubernetesClient.class); + mockForLeaseSupport(client); + mockForLeaderSupport(client); + return client; + } + + private void mockForLeaseSupport(KubernetesClient client) { + Mockito.when(client.getApiResources("coordination.k8s.io/v1")) + .thenReturn( + new APIResourceListBuilder().withResources(new APIResourceBuilder().withKind("Lease").build()) + .build()); + + APIGroupList apiGroupList = new APIGroupListBuilder().addNewGroup() + .withVersions(new GroupVersionForDiscoveryBuilder().withGroupVersion("coordination.k8s.io/v1").build()) + .endGroup() + .build(); + + Mockito.when(client.getApiGroups()).thenReturn(apiGroupList); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void mockForLeaderSupport(KubernetesClient client) { + + Lease lease = new LeaseBuilder().withNewMetadata() + .withName("spring-k8s-leader-election-lock") + .endMetadata() + .withSpec(new LeaseSpecBuilder().withHolderIdentity(HOLDER_IDENTITY) + .withLeaseDurationSeconds(1) + .withAcquireTime(ZonedDateTime.now()) + .withRenewTime(ZonedDateTime.now()) + .withLeaseTransitions(1) + .build()) + .build(); + + MixedOperation mixedOperation = Mockito.mock(MixedOperation.class); + Mockito.when(client.resources(Lease.class)).thenReturn(mixedOperation); + + Resource resource = Mockito.mock(Resource.class); + Mockito.when(resource.get()).thenReturn(lease); + + NonNamespaceOperation nonNamespaceOperation = Mockito.mock(NonNamespaceOperation.class); + Mockito.when(mixedOperation.inNamespace("default")).thenReturn(nonNamespaceOperation); + Mockito.when(nonNamespaceOperation.withName("spring-k8s-leader-election-lock")).thenReturn(resource); + + } + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsNotLeaderTest.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsNotLeaderTest.java new file mode 100644 index 0000000000..a0c749515b --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInfoContributorIsNotLeaderTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.time.ZonedDateTime; + +import io.fabric8.kubernetes.api.model.APIGroupList; +import io.fabric8.kubernetes.api.model.APIGroupListBuilder; +import io.fabric8.kubernetes.api.model.APIResourceBuilder; +import io.fabric8.kubernetes.api.model.APIResourceListBuilder; +import io.fabric8.kubernetes.api.model.GroupVersionForDiscoveryBuilder; +import io.fabric8.kubernetes.api.model.coordination.v1.Lease; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseSpecBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.server.test.LocalManagementPort; +import org.springframework.cloud.kubernetes.commons.leader.LeaderUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.http.MediaType; +import org.springframework.test.web.reactive.server.WebTestClient; + +/** + * @author wind57 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { "spring.main.cloud-platform=KUBERNETES", "management.endpoints.web.exposure.include=info", + "management.endpoint.info.show-details=always", "management.info.kubernetes.enabled=true", + "spring.cloud.kubernetes.leader.election.enabled=true" }) +class Fabric8LeaderElectionInfoContributorIsNotLeaderTest { + + private static final String HOLDER_IDENTITY = "leader"; + + @LocalManagementPort + private int port; + + @Autowired + private WebTestClient webClient; + + private static MockedStatic leaderUtilsMockedStatic; + + @BeforeAll + static void beforeAll() { + leaderUtilsMockedStatic = Mockito.mockStatic(LeaderUtils.class); + leaderUtilsMockedStatic.when(LeaderUtils::hostName).thenReturn("non-" + HOLDER_IDENTITY); + } + + @AfterAll + static void afterAll() { + leaderUtilsMockedStatic.close(); + } + + @Test + void infoEndpointIsNotLeaderTest() { + webClient.get() + .uri("http://localhost:{port}/actuator/info", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus() + .isOk() + .expectBody() + .jsonPath("leaderElection.isLeader") + .isEqualTo(false) + .jsonPath("leaderElection.leaderId") + .isEqualTo("non-" + HOLDER_IDENTITY); + } + + @TestConfiguration + static class Configuration { + + @Bean + @Primary + KubernetesClient mockKubernetesClient() { + KubernetesClient client = Mockito.mock(KubernetesClient.class); + mockForLeaseSupport(client); + mockForLeaderSupport(client); + return client; + } + + private void mockForLeaseSupport(KubernetesClient client) { + Mockito.when(client.getApiResources("coordination.k8s.io/v1")) + .thenReturn( + new APIResourceListBuilder().withResources(new APIResourceBuilder().withKind("Lease").build()) + .build()); + + APIGroupList apiGroupList = new APIGroupListBuilder().addNewGroup() + .withVersions(new GroupVersionForDiscoveryBuilder().withGroupVersion("coordination.k8s.io/v1").build()) + .endGroup() + .build(); + + Mockito.when(client.getApiGroups()).thenReturn(apiGroupList); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void mockForLeaderSupport(KubernetesClient client) { + + Lease lease = new LeaseBuilder().withNewMetadata() + .withName("spring-k8s-leader-election-lock") + .endMetadata() + .withSpec(new LeaseSpecBuilder().withHolderIdentity(HOLDER_IDENTITY) + .withLeaseDurationSeconds(1) + .withAcquireTime(ZonedDateTime.now()) + .withRenewTime(ZonedDateTime.now()) + .withLeaseTransitions(1) + .build()) + .build(); + + MixedOperation mixedOperation = Mockito.mock(MixedOperation.class); + Mockito.when(client.resources(Lease.class)).thenReturn(mixedOperation); + + Resource resource = Mockito.mock(Resource.class); + Mockito.when(resource.get()).thenReturn(lease); + + NonNamespaceOperation nonNamespaceOperation = Mockito.mock(NonNamespaceOperation.class); + Mockito.when(mixedOperation.inNamespace("default")).thenReturn(nonNamespaceOperation); + Mockito.when(nonNamespaceOperation.withName("spring-k8s-leader-election-lock")).thenReturn(resource); + + } + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionSimpleITTest.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionSimpleITTest.java new file mode 100644 index 0000000000..04e2984d84 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionSimpleITTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import java.time.Duration; +import java.time.ZonedDateTime; + +import io.fabric8.kubernetes.api.model.coordination.v1.Lease; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.k3s.K3sContainer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.kubernetes.integration.tests.commons.Commons; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +/** + * @author wind57 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { "spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true", + "spring.cloud.kubernetes.leader.election.wait-for-pod-ready=false" }) +@ExtendWith(OutputCaptureExtension.class) +class Fabric8LeaderElectionSimpleITTest { + + private static K3sContainer container; + + @Autowired + private KubernetesClient kubernetesClient; + + @BeforeAll + static void beforeAll() { + container = Commons.container(); + container.start(); + } + + @AfterAll + static void afterAll() { + container.stop(); + } + + @Test + void test(CapturedOutput output) { + + // wait for a renewal + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofMinutes(1)) + .until(() -> output.getOut().contains("Attempting to renew leader lease")); + + // all these logs happen before a renewal + Assertions.assertThat(output.getOut()).contains("will use lease as the lock for leader election"); + Assertions.assertThat(output.getOut()).contains("starting leader initiator"); + Assertions.assertThat(output.getOut()).contains("Leader election started"); + Assertions.assertThat(output.getOut()).contains("Successfully Acquired leader lease"); + + Lease lockLease = kubernetesClient.leases() + .inNamespace("default") + .withName("spring-k8s-leader-election-lock") + .get(); + ZonedDateTime currentAcquiredTime = lockLease.getSpec().getAcquireTime(); + Assertions.assertThat(currentAcquiredTime).isNotNull(); + Assertions.assertThat(lockLease.getSpec().getLeaseDurationSeconds()).isEqualTo(15); + Assertions.assertThat(lockLease.getSpec().getLeaseTransitions()).isEqualTo(0); + + ZonedDateTime currentRenewalTime = lockLease.getSpec().getRenewTime(); + Assertions.assertThat(currentRenewalTime).isNotNull(); + + // renew happened, we renew by default on every two seconds + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(4)) + .until(() -> !(currentRenewalTime.equals(kubernetesClient.leases() + .inNamespace("default") + .withName("spring-k8s-leader-election-lock") + .get() + .getSpec() + .getRenewTime()))); + } + + @TestConfiguration + static class LocalConfiguration { + + @Bean + @Primary + KubernetesClient client() { + String kubeConfigYaml = container.getKubeConfigYaml(); + Config config = Config.fromKubeconfig(kubeConfigYaml); + return new KubernetesClientBuilder().withConfig(config).build(); + } + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderOldAndNewImplementationTests.java b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderOldAndNewImplementationTests.java new file mode 100644 index 0000000000..da11f40ba9 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderOldAndNewImplementationTests.java @@ -0,0 +1,244 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.leader.election; + +import io.fabric8.kubernetes.api.model.APIGroupList; +import io.fabric8.kubernetes.api.model.APIGroupListBuilder; +import io.fabric8.kubernetes.api.model.APIResourceBuilder; +import io.fabric8.kubernetes.api.model.APIResourceListBuilder; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.GroupVersionForDiscoveryBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.dsl.Resource; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.kubernetes.commons.KubernetesCommonsAutoConfiguration; +import org.springframework.cloud.kubernetes.fabric8.Fabric8AutoConfiguration; +import org.springframework.cloud.kubernetes.fabric8.leader.Fabric8LeaderAutoConfiguration; +import org.springframework.context.annotation.Bean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that prove that previous and new leader implementation works based on the flags + * we set. + * + * @author wind57 + */ +class Fabric8LeaderOldAndNewImplementationTests { + + private ApplicationContextRunner applicationContextRunner; + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'           is not set
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled'  is not set
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is not active
+	 * 
+ */ + @Test + void noFlagsSet() { + setup("spring.main.cloud-platform=KUBERNETES"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'          =  true
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled'    is not set
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is not active
+	 * 
+ */ + @Test + void oldImplementationEnabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'          = false
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled'   is not set
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is not active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is not active
+	 * 
+ */ + @Test + void oldImplementationDisabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'            is not set
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled' = false
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is not active
+	 * 
+ */ + @Test + void newImplementationDisabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'            is not set
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled' = true
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is not active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is active
+	 * 
+ */ + @Test + void newImplementationEnabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'          = false
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled' = false
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is not active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is not active
+	 * 
+ */ + @Test + void bothDisabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.enabled=false", + "spring.cloud.kubernetes.leader.election.enabled=false"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).doesNotHaveBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + /** + *
+	 *     - 'spring.cloud.kubernetes.leader.enabled'          = true
+	 *     - 'spring.cloud.kubernetes.leader.election.enabled' = true
+	 *
+	 *     As such :
+	 *
+	 *     - 'Fabric8LeaderAutoConfiguration'                   is not active
+	 *     - 'Fabric8LeaderElectionAutoConfiguration'           is active
+	 *
+	 *     You can't enable both of them, only the new one will work.
+	 * 
+ */ + @Test + void bothEnabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.enabled=true", + "spring.cloud.kubernetes.leader.election.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8LeaderAutoConfiguration.class); + assertThat(context).hasSingleBean(Fabric8LeaderElectionAutoConfiguration.class); + }); + } + + private void setup(String... properties) { + applicationContextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(Fabric8LeaderElectionCallbacksAutoConfiguration.class, + Fabric8AutoConfiguration.class, KubernetesCommonsAutoConfiguration.class, + Fabric8LeaderElectionAutoConfiguration.class, Fabric8LeaderAutoConfiguration.class)) + .withUserConfiguration(Fabric8LeaderOldAndNewImplementationTests.Configuration.class) + .withPropertyValues(properties); + } + + @TestConfiguration + static class Configuration { + + @Bean + @SuppressWarnings({ "rawtypes", "unchecked" }) + KubernetesClient mockKubernetesClient() { + KubernetesClient client = Mockito.mock(KubernetesClient.class); + + Mockito.when(client.getNamespace()).thenReturn("namespace"); + + MixedOperation mixedOperation = Mockito.mock(MixedOperation.class); + NonNamespaceOperation nonNamespaceOperation = Mockito.mock(NonNamespaceOperation.class); + Mockito.when(client.configMaps()).thenReturn(mixedOperation); + + Mockito.when(mixedOperation.inNamespace(Mockito.anyString())).thenReturn(nonNamespaceOperation); + Resource configMapResource = Mockito.mock(Resource.class); + Mockito.when(nonNamespaceOperation.withName(Mockito.anyString())).thenReturn(configMapResource); + + Mockito.when(client.pods()).thenReturn(mixedOperation); + PodResource podResource = Mockito.mock(PodResource.class); + Mockito.when(mixedOperation.withName(Mockito.anyString())).thenReturn(podResource); + + Mockito.when(client.getApiResources("coordination.k8s.io/v1")) + .thenReturn( + new APIResourceListBuilder().withResources(new APIResourceBuilder().withKind("Lease").build()) + .build()); + + APIGroupList apiGroupList = new APIGroupListBuilder().addNewGroup() + .withVersions(new GroupVersionForDiscoveryBuilder().withGroupVersion("coordination.k8s.io/v1").build()) + .endGroup() + .build(); + + Mockito.when(client.getApiGroups()).thenReturn(apiGroupList); + return client; + } + + } + +} diff --git a/spring-cloud-kubernetes-fabric8-leader/src/test/resources/logback-test.xml b/spring-cloud-kubernetes-fabric8-leader/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..55654605fa --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-leader/src/test/resources/logback-test.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/spring-cloud-kubernetes-integration-tests/pom.xml b/spring-cloud-kubernetes-integration-tests/pom.xml index 0569480404..66e3f0cdad 100644 --- a/spring-cloud-kubernetes-integration-tests/pom.xml +++ b/spring-cloud-kubernetes-integration-tests/pom.xml @@ -108,5 +108,8 @@ spring-cloud-kubernetes-k8s-client-rabbitmq-secret-reload - + + spring-cloud-kubernetes-fabric8-client-leader-election + + diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/pom.xml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/pom.xml new file mode 100644 index 0000000000..ce090e2d12 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + org.springframework.cloud + spring-cloud-kubernetes-fabric8-client-leader-election + 5.0.0-SNAPSHOT + + + fabric8-leader-app + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-kubernetes-fabric8-leader + + + org.springframework.boot + spring-boot-starter-actuator + + + + diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/App.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/App.java new file mode 100644 index 0000000000..aa264812f1 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/App.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.client.leader.election; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author wind57 + */ +@SpringBootApplication +public class App { + + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } + +} diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/resources/application.yaml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/resources/application.yaml new file mode 100644 index 0000000000..45f8b516f6 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-app/src/main/resources/application.yaml @@ -0,0 +1,30 @@ +logging: + level: + org: + springframework: + cloud: + kubernetes: + fabric8: + leader: DEBUG + +spring: + application: + name: fabric8-leader-app-a + cloud: + kubernetes: + leader: + election: + enabled: true +management: + endpoint: + info: + access: unrestricted + health: + probes: + enabled: true + endpoints: + web: + exposure: + include: health, info +server: + port: 8080 diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/pom.xml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/pom.xml new file mode 100644 index 0000000000..b9eb557636 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + org.springframework.cloud + spring-cloud-kubernetes-fabric8-client-leader-election + 5.0.0-SNAPSHOT + + + fabric8-leader-test + + + true + true + + + + + org.springframework.boot + spring-boot-starter-test + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.cloud + spring-cloud-kubernetes-test-support + + + + + + + + ../../src/main/resources + true + + + + + diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/Fabric8LeaderElectionTest.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/Fabric8LeaderElectionTest.java new file mode 100644 index 0000000000..e2184d5f6b --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/leader/election/Fabric8LeaderElectionTest.java @@ -0,0 +1,235 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.client.leader.election; + +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.utils.Serialization; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.k3s.K3sContainer; + +import org.springframework.boot.test.json.BasicJsonTester; +import org.springframework.cloud.kubernetes.integration.tests.commons.Commons; +import org.springframework.cloud.kubernetes.integration.tests.commons.Phase; +import org.springframework.cloud.kubernetes.integration.tests.commons.fabric8_client.Util; +import org.springframework.http.HttpMethod; +import org.springframework.web.reactive.function.client.WebClient; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.cloud.kubernetes.integration.tests.commons.Commons.builder; +import static org.springframework.cloud.kubernetes.integration.tests.commons.Commons.retrySpec; + +/** + * @author wind57 + */ +class Fabric8LeaderElectionTest { + + private static final String NAMESPACE = "default"; + + private static final String FABRIC8_LEADER_APP_A = "fabric8-leader-app"; + + private static final BasicJsonTester BASIC_JSON_TESTER = new BasicJsonTester(Fabric8LeaderElectionTest.class); + + private static final K3sContainer K3S = Commons.container(); + + private static Util util; + + @BeforeAll + static void beforeAll() throws Exception { + K3S.start(); + util = new Util(K3S); + + Commons.validateImage(FABRIC8_LEADER_APP_A, K3S); + Commons.loadSpringCloudKubernetesImage(FABRIC8_LEADER_APP_A, K3S); + + util.setUp(NAMESPACE); + } + + @BeforeEach + void beforeEach() { + appA(Phase.CREATE); + appB(Phase.CREATE); + } + + @AfterEach + void afterEach() { + // appA is deleted within the test itself + appB(Phase.DELETE); + } + + @Test + void test() throws Exception { + WebClient clientA = builder().baseUrl("http://localhost:32321/actuator/info").build(); + WebClient clientB = builder().baseUrl("http://localhost:32322/actuator/info").build(); + + String podAInfoResult = clientA.method(HttpMethod.GET) + .retrieve() + .bodyToMono(String.class) + .retryWhen(retrySpec()) + .block(); + + String podBInfoResult = clientB.method(HttpMethod.GET) + .retrieve() + .bodyToMono(String.class) + .retryWhen(retrySpec()) + .block(); + + // we start podA first, so it will be the leader + assertThat(BASIC_JSON_TESTER.from(podAInfoResult)).extractingJsonPathBooleanValue("$.leaderElection.isLeader") + .isEqualTo(true); + + String leaderFromLease = K3S + .execInContainer("sh", "-c", + "kubectl get lease spring-k8s-leader-election-lock -o=jsonpath='{.spec.holderIdentity}'") + .getStdout(); + assertThat(leaderFromLease).contains("fabric8-leader-app-a"); + + // podB is a follower + assertThat(BASIC_JSON_TESTER.from(podBInfoResult)).extractingJsonPathBooleanValue("$.leaderElection.isLeader") + .isEqualTo(false); + + String renewTime = K3S + .execInContainer("sh", "-c", + "kubectl get lease spring-k8s-leader-election-lock -o=jsonpath='{.spec.renewTime}'") + .getStdout(); + + // lease has renew time changed + Awaitility.await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + String localRenewTime = K3S + .execInContainer("sh", "-c", + "kubectl get lease spring-k8s-leader-election-lock -o=jsonpath='{.spec.renewTime}'") + .getStdout(); + assertThat(renewTime).isNotEqualTo(localRenewTime); + }); + + // delete appA and so, appB can become the leader + appA(Phase.DELETE); + + // lease now shows that appB is the leader + Awaitility.await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + String newLeader = K3S + .execInContainer("sh", "-c", + "kubectl get lease spring-k8s-leader-election-lock -o=jsonpath='{.spec.holderIdentity}'") + .getStdout(); + assertThat(newLeader).contains("fabric8-leader-app-b"); + }); + + podBInfoResult = clientB.method(HttpMethod.GET) + .retrieve() + .bodyToMono(String.class) + .retryWhen(retrySpec()) + .block(); + + // info actuator confirms that podB is the leader + assertThat(BASIC_JSON_TESTER.from(podBInfoResult)).extractingJsonPathBooleanValue("$.leaderElection.isLeader") + .isEqualTo(true); + + } + + private void appA(Phase phase) { + InputStream deploymentStream = util.inputStream("app/deployment.yaml"); + Deployment deployment = Serialization.unmarshal(deploymentStream, Deployment.class); + + InputStream serviceStream = util.inputStream("app/service-a.yaml"); + Service service = Serialization.unmarshal(serviceStream, Service.class); + + // 1. change name + deployment.getMetadata().setName("fabric8-leader-app-a"); + + // 2. change spec.selector.matchLabels + deployment.getSpec().getSelector().setMatchLabels(Map.of("app", "app-a")); + + // 3. change spec.template.metadata.labels + deployment.getSpec().getTemplate().getMetadata().setLabels(Map.of("app", "app-a")); + + // 4. change env variables + List env = new ArrayList<>(); + env.add(new EnvVarBuilder().withName("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_CLOUD_KUBERNETES_FABRIC8_LEADER") + .withValue("DEBUG") + .build()); + env.add(new EnvVarBuilder().withName("SPRING_CLOUD_KUBERNETES_LEADER_ELECTION_ENABLED") + .withValue("TRUE") + .build()); + env.add(new EnvVarBuilder().withName("MANAGEMENT_ENDPOINT_HEALTH_PROBES_ENABLED").withValue("TRUE").build()); + + env.add(new EnvVarBuilder().withName("SPRING_APPLICATION_NAME").withValue("fabric8-leader-app-a").build()); + + env.add(new EnvVarBuilder().withName("MANAGEMENT_HEALTH_LEADER_ELECTION_ENABLED").withValue("TRUE").build()); + + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).setEnv(env); + + if (phase.equals(Phase.CREATE)) { + util.createAndWait(NAMESPACE, null, deployment, service, true); + } + else if (phase.equals(Phase.DELETE)) { + util.deleteAndWait(NAMESPACE, deployment, service); + } + } + + private void appB(Phase phase) { + InputStream deploymentStream = util.inputStream("app/deployment.yaml"); + Deployment deployment = Serialization.unmarshal(deploymentStream, Deployment.class); + + InputStream serviceStream = util.inputStream("app/service-b.yaml"); + Service service = Serialization.unmarshal(serviceStream, Service.class); + + // 1. change name + deployment.getMetadata().setName("fabric8-leader-app-b"); + + // 2. change spec.selector.matchLabels + deployment.getSpec().getSelector().setMatchLabels(Map.of("app", "app-b")); + + // 3. change spec.template.metadata.labels + deployment.getSpec().getTemplate().getMetadata().setLabels(Map.of("app", "app-b")); + + // 4. change env variables + List env = new ArrayList<>(); + env.add(new EnvVarBuilder().withName("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_CLOUD_KUBERNETES_FABRIC8_LEADER") + .withValue("DEBUG") + .build()); + env.add(new EnvVarBuilder().withName("SPRING_CLOUD_KUBERNETES_LEADER_ELECTION_ENABLED") + .withValue("TRUE") + .build()); + env.add(new EnvVarBuilder().withName("MANAGEMENT_ENDPOINT_HEALTH_PROBES_ENABLED").withValue("TRUE").build()); + + env.add(new EnvVarBuilder().withName("SPRING_APPLICATION_NAME").withValue("fabric8-leader-app-b").build()); + + env.add(new EnvVarBuilder().withName("MANAGEMENT_HEALTH_LEADER_ELECTION_ENABLED").withValue("TRUE").build()); + + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).setEnv(env); + + if (phase.equals(Phase.CREATE)) { + util.createAndWait(NAMESPACE, null, deployment, service, true); + } + else if (phase.equals(Phase.DELETE)) { + util.deleteAndWait(NAMESPACE, deployment, service); + } + } + +} diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/deployment.yaml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/deployment.yaml new file mode 100644 index 0000000000..404038003b --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/deployment.yaml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: app-deployment +spec: + selector: + matchLabels: + app: app + template: + metadata: + labels: + app: app + spec: + serviceAccountName: spring-cloud-kubernetes-serviceaccount + containers: + - name: app + image: docker.io/springcloud/fabric8-leader-app + imagePullPolicy: IfNotPresent + readinessProbe: + httpGet: + port: 8080 + path: /actuator/health/readiness + livenessProbe: + httpGet: + port: 8080 + path: /actuator/health/liveness + ports: + - containerPort: 8080 diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-a.yaml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-a.yaml new file mode 100644 index 0000000000..dbeda471e9 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-a.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: app-a + name: app-a +spec: + ports: + - name: http + port: 8080 + targetPort: 8080 + nodePort: 32321 + selector: + app: app-a + type: NodePort diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-b.yaml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-b.yaml new file mode 100644 index 0000000000..82d609f203 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/fabric8-leader-test/src/test/resources/app/service-b.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: app-b + name: app-b +spec: + ports: + - name: http + port: 8080 + targetPort: 8080 + nodePort: 32322 + selector: + app: app-b + type: NodePort diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/pom.xml b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/pom.xml new file mode 100644 index 0000000000..4269e9e554 --- /dev/null +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-leader-election/pom.xml @@ -0,0 +1,19 @@ + + + 4.0.0 + + org.springframework.cloud + spring-cloud-kubernetes-integration-tests + 5.0.0-SNAPSHOT + + + spring-cloud-kubernetes-fabric8-client-leader-election + pom + + fabric8-leader-app + fabric8-leader-test + + + diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherBusKafkaIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherBusKafkaIT.java index 36f3577d28..696ae588f3 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherBusKafkaIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherBusKafkaIT.java @@ -72,8 +72,6 @@ static void beforeAll() throws Exception { Commons.loadSpringCloudKubernetesImage(CONFIG_WATCHER_APP_IMAGE, K3S); Images.loadKafka(K3S); - - util = new Util(K3S); util.setUp(NAMESPACE); } diff --git a/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/FixedPortsK3sContainer.java b/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/FixedPortsK3sContainer.java index 8f8a7bdfc6..64faf9b128 100644 --- a/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/FixedPortsK3sContainer.java +++ b/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/FixedPortsK3sContainer.java @@ -37,7 +37,7 @@ final class FixedPortsK3sContainer extends K3sContainer { /** * Test containers exposed ports. */ - private static final int[] EXPOSED_PORTS = new int[] { 80, 6443, 8080, 8888, 9092, 32321 }; + private static final int[] EXPOSED_PORTS = new int[] { 80, 6443, 8080, 8888, 9092, 32321, 32322 }; /** * Rancher version to use for test-containers. diff --git a/spring-cloud-kubernetes-test-support/src/main/resources/setup/role.yaml b/spring-cloud-kubernetes-test-support/src/main/resources/setup/role.yaml index 4a397a6920..a159e008fc 100644 --- a/spring-cloud-kubernetes-test-support/src/main/resources/setup/role.yaml +++ b/spring-cloud-kubernetes-test-support/src/main/resources/setup/role.yaml @@ -4,6 +4,6 @@ metadata: namespace: default name: namespace-reader rules: - - apiGroups: ["", "extensions", "apps", "discovery.k8s.io"] - resources: ["configmaps", "pods", "services", "endpoints", "secrets", "endpointslices"] - verbs: ["get", "list", "watch"] + - apiGroups: ["", "extensions", "apps", "discovery.k8s.io", "coordination.k8s.io"] + resources: ["configmaps", "pods", "services", "endpoints", "secrets", "endpointslices", "leases"] + verbs: ["get", "list", "watch", "create", "update", "patch"]