Skip to content

Commit c565cc1

Browse files
committed
Resiliency for namespace listing and additional liveness checks
1 parent e017674 commit c565cc1

File tree

15 files changed

+88
-38
lines changed

15 files changed

+88
-38
lines changed

operator/src/main/java/oracle/kubernetes/operator/BaseMain.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2022, 2024, Oracle and/or its affiliates.
1+
// Copyright (c) 2022, 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator;
@@ -17,6 +17,7 @@
1717
import java.security.cert.CertificateException;
1818
import java.security.spec.InvalidKeySpecException;
1919
import java.time.OffsetDateTime;
20+
import java.util.Collection;
2021
import java.util.Optional;
2122
import java.util.Properties;
2223
import java.util.concurrent.ScheduledExecutorService;
@@ -35,6 +36,7 @@
3536
import oracle.kubernetes.operator.logging.LoggingFactory;
3637
import oracle.kubernetes.operator.tuning.TuningParameters;
3738
import oracle.kubernetes.operator.utils.PathSupport;
39+
import oracle.kubernetes.operator.work.Cancellable;
3840
import oracle.kubernetes.operator.work.Fiber.CompletionCallback;
3941
import oracle.kubernetes.operator.work.Packet;
4042
import oracle.kubernetes.operator.work.Step;
@@ -149,13 +151,13 @@ public void run() {
149151
}
150152
}
151153

152-
void markReadyAndStartLivenessThread() {
154+
void markReadyAndStartLivenessThread(Collection<Cancellable> futures) {
153155
try {
154156
new DeploymentReady(delegate).create();
155157

156158
logStartingLivenessMessage();
157159
// every five seconds we need to update the last modified time on the liveness file
158-
delegate.scheduleWithFixedDelay(new DeploymentLiveness(delegate), 5, 5, TimeUnit.SECONDS);
160+
delegate.scheduleWithFixedDelay(new DeploymentLiveness(futures, delegate), 5, 5, TimeUnit.SECONDS);
159161
} catch (IOException io) {
160162
LOGGER.severe(MessageKeys.EXCEPTION, io);
161163
}

operator/src/main/java/oracle/kubernetes/operator/CoreDelegateImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static oracle.kubernetes.operator.BaseMain.GIT_COMMIT_KEY;
2727
import static oracle.kubernetes.operator.BaseMain.deploymentHome;
2828
import static oracle.kubernetes.operator.BaseMain.probesHome;
29+
import static oracle.kubernetes.operator.work.Cancellable.createCancellable;
2930

3031
public class CoreDelegateImpl implements CoreDelegate {
3132

@@ -125,12 +126,12 @@ private static BaseMain.NullCompletionCallback andThenDo(Runnable completionActi
125126
@Override
126127
public Cancellable schedule(Runnable command, long delay, TimeUnit unit) {
127128
ScheduledFuture<?> future = scheduledExecutorService.schedule(command, delay, unit);
128-
return () -> future.cancel(true);
129+
return createCancellable(future);
129130
}
130131

131132
@Override
132133
public Cancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
133134
ScheduledFuture<?> future = scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
134-
return () -> future.cancel(true);
135+
return createCancellable(future);
135136
}
136137
}

operator/src/main/java/oracle/kubernetes/operator/DeploymentLiveness.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
// Copyright (c) 2017, 2022, Oracle and/or its affiliates.
1+
// Copyright (c) 2017, 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator;
55

66
import java.io.File;
77
import java.io.IOException;
8+
import java.util.Collection;
89
import java.util.Date;
910

1011
import oracle.kubernetes.common.logging.MessageKeys;
1112
import oracle.kubernetes.operator.logging.LoggingFacade;
1213
import oracle.kubernetes.operator.logging.LoggingFactory;
14+
import oracle.kubernetes.operator.work.Cancellable;
1315

1416
/**
1517
* This task maintains the "liveness" indicator so that Kubernetes knows the Operator is still
@@ -18,9 +20,11 @@
1820
public class DeploymentLiveness implements Runnable {
1921

2022
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
23+
private final Collection<Cancellable> futures;
2124
private final File livenessFile;
2225

23-
public DeploymentLiveness(CoreDelegate delegate) {
26+
public DeploymentLiveness(Collection<Cancellable> futures, CoreDelegate delegate) {
27+
this.futures = futures;
2428
livenessFile = new File(delegate.getProbesHome(), ".alive");
2529
}
2630

@@ -33,7 +37,8 @@ public void run() {
3337
} catch (IOException ioe) {
3438
LOGGER.warning(MessageKeys.COULD_NOT_CREATE_LIVENESS_FILE);
3539
}
36-
if (livenessFile.setLastModified(new Date().getTime())) {
40+
if (futures.stream().filter(Cancellable::isDoneOrCancelled).findAny().isEmpty()
41+
&& livenessFile.setLastModified(new Date().getTime())) {
3742
LOGGER.fine("Liveness file last modified time set");
3843
}
3944
}

operator/src/main/java/oracle/kubernetes/operator/DomainRecheck.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2222
import io.kubernetes.client.openapi.models.V1SubjectRulesReviewStatus;
2323
import io.kubernetes.client.util.generic.KubernetesApiResponse;
24+
import jakarta.validation.constraints.NotNull;
2425
import oracle.kubernetes.common.logging.MessageKeys;
2526
import oracle.kubernetes.operator.calls.RequestBuilder;
2627
import oracle.kubernetes.operator.helpers.EventHelper;
@@ -35,6 +36,7 @@
3536
import oracle.kubernetes.operator.work.Packet;
3637
import oracle.kubernetes.operator.work.Step;
3738

39+
import static oracle.kubernetes.operator.KubernetesConstants.HTTP_GONE;
3840
import static oracle.kubernetes.operator.helpers.EventHelper.EventItem.NAMESPACE_WATCHING_STARTED;
3941
import static oracle.kubernetes.operator.helpers.NamespaceHelper.getOperatorNamespace;
4042
import static oracle.kubernetes.operator.logging.ThreadLoggingContext.setThreadContext;
@@ -158,7 +160,15 @@ private NamespaceListResponseStep() {
158160
protected Result onFailureNoRetry(Packet packet, KubernetesApiResponse<V1NamespaceList> callResponse) {
159161
return useBackupStrategy(callResponse)
160162
? doNext(createStartNamespacesStep(Namespaces.getConfiguredDomainNamespaces()), packet)
161-
: super.onFailureNoRetry(packet, callResponse);
163+
: onFailureNoRetryCheckForGone(packet, callResponse);
164+
}
165+
166+
protected Result onFailureNoRetryCheckForGone(Packet packet,
167+
@NotNull KubernetesApiResponse<V1NamespaceList> callResponse) {
168+
if (callResponse.getHttpStatusCode() == HTTP_GONE) {
169+
return doEnd();
170+
}
171+
return super.onFailureNoRetry(packet, callResponse);
162172
}
163173

164174
// Returns true if the failure wasn't due to authorization, and we have a list of namespaces to manage.
@@ -169,12 +179,13 @@ private boolean useBackupStrategy(KubernetesApiResponse<V1NamespaceList> callRes
169179
@Override
170180
public Result onSuccess(Packet packet, KubernetesApiResponse<V1NamespaceList> callResponse) {
171181
final Set<String> namespacesToStart = getNamespacesToStart(callResponse.getObject());
172-
Namespaces.getFoundDomainNamespaces(packet).addAll(namespacesToStart);
182+
Collection<String> foundDomainNamespaces = Namespaces.getFoundDomainNamespaces(packet);
183+
foundDomainNamespaces.addAll(namespacesToStart);
173184

174-
return doContinueListOrNext(callResponse, packet, createNextSteps(namespacesToStart));
185+
return doContinueListOrNext(callResponse, packet, () -> createNextSteps(foundDomainNamespaces));
175186
}
176187

177-
private Step createNextSteps(Set<String> namespacesToStartNow) {
188+
private Step createNextSteps(Collection<String> namespacesToStartNow) {
178189
if (!namespacesToStartNow.isEmpty()) {
179190
List<Step> nextSteps = new ArrayList<>();
180191
nextSteps.add(createStartNamespacesStep(namespacesToStartNow));
@@ -187,7 +198,7 @@ private Step createNextSteps(Set<String> namespacesToStartNow) {
187198
return current;
188199
}
189200

190-
private Step createNamespaceReviewStep(Set<String> namespacesToStartNow) {
201+
private Step createNamespaceReviewStep(Collection<String> namespacesToStartNow) {
191202
return RunInParallel.perNamespace(namespacesToStartNow, DomainRecheck.this::createNamespaceReview);
192203
}
193204

operator/src/main/java/oracle/kubernetes/operator/OperatorMain.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2017, 2024, Oracle and/or its affiliates.
1+
// Copyright (c) 2017, 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator;
@@ -13,6 +13,7 @@
1313
import java.time.OffsetDateTime;
1414
import java.util.Collection;
1515
import java.util.Collections;
16+
import java.util.List;
1617
import java.util.Optional;
1718
import java.util.Properties;
1819
import java.util.concurrent.ScheduledExecutorService;
@@ -49,6 +50,7 @@
4950
import oracle.kubernetes.operator.utils.Certificates;
5051
import oracle.kubernetes.operator.watcher.NamespaceWatcher;
5152
import oracle.kubernetes.operator.watcher.OperatorEventWatcher;
53+
import oracle.kubernetes.operator.work.Cancellable;
5254
import oracle.kubernetes.operator.work.FiberGate;
5355
import oracle.kubernetes.operator.work.Packet;
5456
import oracle.kubernetes.operator.work.Step;
@@ -273,10 +275,11 @@ void completeBegin() {
273275
// start periodic retry and recheck
274276
int recheckInterval = TuningParameters.getInstance().getDomainNamespaceRecheckIntervalSeconds();
275277
int stuckPodInterval = TuningParameters.getInstance().getStuckPodRecheckSeconds();
276-
mainDelegate.scheduleWithFixedDelay(recheckDomains(), recheckInterval, recheckInterval, TimeUnit.SECONDS);
277-
mainDelegate.scheduleWithFixedDelay(checkStuckPods(), stuckPodInterval, stuckPodInterval, TimeUnit.SECONDS);
278+
Collection<Cancellable> futures = List.of(
279+
mainDelegate.scheduleWithFixedDelay(recheckDomains(), recheckInterval, recheckInterval, TimeUnit.SECONDS),
280+
mainDelegate.scheduleWithFixedDelay(checkStuckPods(), stuckPodInterval, stuckPodInterval, TimeUnit.SECONDS));
278281

279-
markReadyAndStartLivenessThread();
282+
markReadyAndStartLivenessThread(futures);
280283

281284
} catch (Throwable e) {
282285
LOGGER.warning(MessageKeys.EXCEPTION, e);

operator/src/main/java/oracle/kubernetes/operator/WebhookMain.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
// Copyright (c) 2022, 2024, Oracle and/or its affiliates.
1+
// Copyright (c) 2022, 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator;
55

6+
import java.util.List;
67
import java.util.Optional;
78
import java.util.Properties;
89
import java.util.concurrent.ScheduledExecutorService;
@@ -30,6 +31,7 @@
3031
import oracle.kubernetes.operator.tuning.TuningParameters;
3132
import oracle.kubernetes.operator.utils.Certificates;
3233
import oracle.kubernetes.operator.webhooks.WebhookRestServer;
34+
import oracle.kubernetes.operator.work.Cancellable;
3335
import oracle.kubernetes.operator.work.Packet;
3436
import oracle.kubernetes.operator.work.Step;
3537

@@ -138,9 +140,10 @@ void completeBegin() {
138140

139141
// start periodic recheck of CRD
140142
int recheckInterval = TuningParameters.getInstance().getDomainNamespaceRecheckIntervalSeconds();
141-
delegate.scheduleWithFixedDelay(recheckCrd(), recheckInterval, recheckInterval, TimeUnit.SECONDS);
143+
Cancellable future
144+
= delegate.scheduleWithFixedDelay(recheckCrd(), recheckInterval, recheckInterval, TimeUnit.SECONDS);
142145

143-
markReadyAndStartLivenessThread();
146+
markReadyAndStartLivenessThread(List.of(future));
144147

145148
} catch (Exception e) {
146149
LOGGER.warning(MessageKeys.EXCEPTION, e);

operator/src/main/java/oracle/kubernetes/operator/calls/ResponseStep.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Optional;
77
import java.util.Random;
88
import java.util.Set;
9+
import java.util.function.Supplier;
910
import java.util.concurrent.TimeUnit;
1011
import javax.annotation.Nonnull;
1112

@@ -161,7 +162,7 @@ public final void setPrevious(@SuppressWarnings("rawtypes") RequestStep previous
161162
* @return Next action for list continue
162163
*/
163164
protected final Result doContinueListOrNext(KubernetesApiResponse<T> callResponse, Packet packet) {
164-
return doContinueListOrNext(callResponse, packet, getNext());
165+
return doContinueListOrNext(callResponse, packet, this::getNext);
165166
}
166167

167168
/**
@@ -170,18 +171,19 @@ protected final Result doContinueListOrNext(KubernetesApiResponse<T> callRespons
170171
*
171172
* @param callResponse Call response
172173
* @param packet Packet
173-
* @param next Next step, if no continuation
174+
* @param next Supplier of next step, if no continuation
174175
* @return Next action for list continue
175176
*/
176-
protected final Result doContinueListOrNext(KubernetesApiResponse<T> callResponse, Packet packet, Step next) {
177+
protected final Result doContinueListOrNext(
178+
KubernetesApiResponse<T> callResponse, Packet packet, Supplier<Step> next) {
177179
String cont = accessContinue(callResponse.getObject());
178180
if (cont != null) {
179181
packet.put(CONTINUE, cont);
180182
// Since the continue value is present, invoking the original request will return
181183
// the next window of data.
182184
return resetRetryStrategyAndReinvokeRequest(packet);
183185
}
184-
return doNext(next, packet);
186+
return doNext(next.get(), packet);
185187
}
186188

187189
private void addDomainFailureStatus(Packet packet, V1Status status) {

operator/src/main/java/oracle/kubernetes/operator/helpers/JobHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ public Result onSuccess(Packet packet, KubernetesApiResponse<V1PodList> callResp
852852
.orElse(null);
853853

854854
if (jobPod == null) {
855-
return doContinueListOrNext(callResponse, packet, processIntrospectorPodLog(getNext()));
855+
return doContinueListOrNext(callResponse, packet, () -> processIntrospectorPodLog(getNext()));
856856
} else if (hasImagePullError(jobPod) || initContainersHaveImagePullError(jobPod)) {
857857

858858
String reason = getImagePullError(jobPod);
Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,25 @@
1-
// Copyright (c) 2024, Oracle and/or its affiliates.
1+
// Copyright (c) 2024, 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator.work;
55

6-
@FunctionalInterface
6+
import java.util.concurrent.ScheduledFuture;
7+
78
public interface Cancellable {
89
boolean cancel();
10+
boolean isDoneOrCancelled();
11+
12+
static Cancellable createCancellable(ScheduledFuture<?> future) {
13+
return new Cancellable() {
14+
@Override
15+
public boolean cancel() {
16+
return future.cancel(true);
17+
}
18+
19+
@Override
20+
public boolean isDoneOrCancelled() {
21+
return future.isDone();
22+
}
23+
};
24+
}
925
}

operator/src/main/java/oracle/kubernetes/operator/work/Fiber.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import oracle.kubernetes.operator.logging.LoggingFactory;
2020
import org.jetbrains.annotations.NotNull;
2121

22+
import static oracle.kubernetes.operator.work.Cancellable.createCancellable;
2223
import static oracle.kubernetes.operator.work.Step.THROWABLE;
2324
import static oracle.kubernetes.operator.work.Step.adapt;
2425

@@ -259,7 +260,7 @@ private static FiberExecutor fromScheduled(ScheduledExecutorService scheduledExe
259260
public Cancellable schedule(Fiber fiber, Duration duration) {
260261
ScheduledFuture<?> future = scheduledExecutorService.schedule(fiber,
261262
TimeUnit.MILLISECONDS.convert(duration), TimeUnit.MILLISECONDS);
262-
return () -> future.cancel(true);
263+
return createCancellable(future);
263264
}
264265

265266
@Override

0 commit comments

Comments
 (0)