2222import java .util .concurrent .Executors ;
2323import java .util .concurrent .ScheduledFuture ;
2424import java .util .concurrent .TimeUnit ;
25- import java .util .concurrent .atomic .AtomicBoolean ;
2625import java .util .concurrent .atomic .AtomicReference ;
2726
2827import io .fabric8 .kubernetes .api .model .Pod ;
@@ -44,7 +43,7 @@ final class Fabric8LeaderElectionInitiator {
4443
4544 private static final LogAccessor LOG = new LogAccessor (Fabric8LeaderElectionInitiator .class );
4645
47- private final CachedSingleThreadScheduler scheduler = new CachedSingleThreadScheduler ();
46+ private final CachedSingleThreadScheduler podReadyScheduler = new CachedSingleThreadScheduler ();
4847
4948 private final String holderIdentity ;
5049
@@ -58,12 +57,12 @@ final class Fabric8LeaderElectionInitiator {
5857
5958 private final AtomicReference <ExecutorService > executorService = new AtomicReference <>();
6059
61- private final AtomicReference <ScheduledFuture <?>> scheduledFuture = new AtomicReference <>();
60+ private final AtomicReference <ScheduledFuture <?>> podReadyTask = new AtomicReference <>();
6261
6362 private final AtomicReference <CompletableFuture <?>> leaderFutureReference = new AtomicReference <>();
6463
6564 // not private for testing
66- final AtomicBoolean destroyCalled = new AtomicBoolean ( false ) ;
65+ volatile boolean destroyCalled = false ;
6766
6867 Fabric8LeaderElectionInitiator (String holderIdentity , String podNamespace , KubernetesClient fabric8KubernetesClient ,
6968 LeaderElectionConfig leaderElectionConfig , LeaderElectionProperties leaderElectionProperties ) {
@@ -93,7 +92,7 @@ void postConstruct() {
9392 // wait until pod is ready
9493 if (leaderElectionProperties .waitForPodReady ()) {
9594 LOG .info (() -> "need to wait until pod is ready : " + holderIdentity );
96- scheduledFuture .set (scheduler .scheduleWithFixedDelay (() -> {
95+ podReadyTask .set (podReadyScheduler .scheduleWithFixedDelay (() -> {
9796
9897 try {
9998 LOG .info (() -> "waiting for pod : " + holderIdentity + " in namespace : " + podNamespace
@@ -127,12 +126,13 @@ void postConstruct() {
127126 if (error != null ) {
128127 LOG .error (() -> "readiness failed for : " + holderIdentity );
129128 LOG .error (() -> "leader election for : " + holderIdentity + " will not start" );
130- scheduledFuture .get ().cancel (true );
131129 }
132130 else {
133131 LOG .info (() -> holderIdentity + " is ready" );
134- scheduledFuture .get ().cancel (true );
135132 }
133+ // we cancel the future that checks readiness of the pod
134+ // and thus also close the pool that was running it.
135+ podReadyTask .get ().cancel (true );
136136 });
137137 try {
138138 ready .get ();
@@ -158,11 +158,11 @@ void postConstruct() {
158158 void preDestroy () {
159159 destroyCalled ();
160160 LOG .info (() -> "preDestroy called in the leader initiator : " + holderIdentity );
161- if (scheduledFuture .get () != null ) {
161+ if (podReadyTask .get () != null ) {
162162 // if the task is not running, this has no effect
163163 // if the task is running, calling this will also make sure
164164 // that the caching executor will shut down too.
165- scheduledFuture .get ().cancel (true );
165+ podReadyTask .get ().cancel (true );
166166 }
167167
168168 if (leaderFutureReference .get () != null ) {
@@ -175,7 +175,7 @@ void preDestroy() {
175175 }
176176
177177 void destroyCalled () {
178- destroyCalled . set ( true ) ;
178+ destroyCalled = true ;
179179 }
180180
181181 void shutDownExecutor () {
@@ -194,7 +194,7 @@ private void startLeaderElection() {
194194 }
195195
196196 if (error instanceof CancellationException ) {
197- if (!destroyCalled . get () ) {
197+ if (!destroyCalled ) {
198198 LOG .warn (() -> "renewal failed for : " + holderIdentity + ", will re-start it after : " +
199199 leaderElectionProperties .waitAfterRenewalFailure ().toSeconds () + " seconds" );
200200 try {
0 commit comments