Skip to content

Commit 78ec026

Browse files
committed
Support for graceful shutdown based on configuration
Signed-off-by: 10000-ki <[email protected]>
1 parent e9bafc7 commit 78ec026

File tree

7 files changed

+136
-13
lines changed

7 files changed

+136
-13
lines changed

docs/content/en/docs/patterns-and-best-practices/_index.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,25 @@ might be a permission issue for some resources in another namespace.
120120
The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179)
121121
behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start
122122
reconcile resources even if one or more event source caches did not sync yet.
123+
124+
## Graceful Shutdown
125+
126+
We can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down.
127+
There are two ways to support the graceful shutdown feature.
128+
129+
The first method is to directly provide a timeout value to the `stop` method.
130+
131+
```java
132+
final var operator = new Operator();
133+
134+
operator.stop(Duration.ofSeconds(5));
135+
```
136+
137+
The second method is to specify the timeout value by overriding the `ConfigurationService`.
138+
139+
```java
140+
final var overridden = new ConfigurationServiceOverrider(config)
141+
.withReconcileTerminationTimeout(Duration.ofSeconds(5));
142+
143+
final var operator = new Operator(overridden);
144+
```

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
164164

165165
@Override
166166
public void stop() throws OperatorException {
167-
stop(Duration.ZERO);
167+
Duration reconcileTerminationTimeout = configurationService.reconcileTerminationTimeout();
168+
stop(reconcileTerminationTimeout);
168169
}
169170

170171
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ default int concurrentWorkflowExecutorThreads() {
211211

212212
/**
213213
* Override to provide a custom {@link Metrics} implementation
214-
*
214+
*
215215
* @return the {@link Metrics} implementation
216216
*/
217217
default Metrics getMetrics() {
@@ -221,7 +221,7 @@ default Metrics getMetrics() {
221221
/**
222222
* Override to provide a custom {@link ExecutorService} implementation to change how threads
223223
* handle concurrent reconciliations
224-
*
224+
*
225225
* @return the {@link ExecutorService} implementation to use for concurrent reconciliation
226226
* processing
227227
*/
@@ -232,7 +232,7 @@ default ExecutorService getExecutorService() {
232232
/**
233233
* Override to provide a custom {@link ExecutorService} implementation to change how dependent
234234
* workflows are processed in parallel
235-
*
235+
*
236236
* @return the {@link ExecutorService} implementation to use for dependent workflow processing
237237
*/
238238
default ExecutorService getWorkflowExecutorService() {
@@ -242,7 +242,7 @@ default ExecutorService getWorkflowExecutorService() {
242242
/**
243243
* Determines whether the associated Kubernetes client should be closed when the associated
244244
* {@link io.javaoperatorsdk.operator.Operator} is stopped.
245-
*
245+
*
246246
* @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise
247247
*/
248248
default boolean closeClientOnStop() {
@@ -252,7 +252,7 @@ default boolean closeClientOnStop() {
252252
/**
253253
* Override to provide a custom {@link DependentResourceFactory} implementation to change how
254254
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated
255-
*
255+
*
256256
* @return the custom {@link DependentResourceFactory} implementation
257257
*/
258258
@SuppressWarnings("rawtypes")
@@ -264,7 +264,7 @@ default DependentResourceFactory dependentResourceFactory() {
264264
* Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated
265265
* {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one
266266
* instance of the operator runs on the cluster at any given time
267-
*
267+
*
268268
* @return the {@link LeaderElectionConfiguration}
269269
*/
270270
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
@@ -299,6 +299,16 @@ default Duration cacheSyncTimeout() {
299299
return Duration.ofMinutes(2);
300300
}
301301

302+
/**
303+
* This is the timeout value that allows the reconciliation threads to gracefully shut down.
304+
* If no value is set, the default is immediate shutdown.
305+
*
306+
* @return The duration of time to wait before terminating the reconciliation threads
307+
*/
308+
default Duration reconcileTerminationTimeout() {
309+
return Duration.ZERO;
310+
}
311+
302312
/**
303313
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
304314
* a resource that cannot be deserialized.
@@ -326,7 +336,7 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
326336
* Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how
327337
* {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are
328338
* instantiated
329-
*
339+
*
330340
* @return the custom {@link ManagedWorkflowFactory} implementation
331341
*/
332342
@SuppressWarnings("rawtypes")
@@ -336,7 +346,7 @@ default ManagedWorkflowFactory getWorkflowFactory() {
336346

337347
/**
338348
* Override to provide a custom {@link ExecutorServiceManager} implementation
339-
*
349+
*
340350
* @return the custom {@link ExecutorServiceManager} implementation
341351
*/
342352
default ExecutorServiceManager getExecutorServiceManager() {
@@ -427,18 +437,18 @@ default boolean useSSAToPatchPrimaryResource() {
427437
* Determines whether resources retrieved from caches such as via calls to
428438
* {@link Context#getSecondaryResource(Class)} should be defensively cloned first.
429439
* </p>
430-
*
440+
*
431441
* <p>
432442
* Defensive cloning to prevent problematic cache modifications (modifying the resource would
433443
* otherwise modify the stored copy in the cache) was transparently done in previous JOSDK
434444
* versions. This might have performance consequences and, with the more prevalent use of
435445
* Server-Side Apply, where you should create a new copy of your resource with only modified
436446
* fields, such modifications of these resources are less likely to occur.
437447
* </p>
438-
*
448+
*
439449
* @return {@code true} if resources should be defensively cloned before returning them from
440450
* caches, {@code false} otherwise
441-
*
451+
*
442452
* @since 5.0.0
443453
*/
444454
default boolean cloneSecondaryResourcesWhenGettingFromCache() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ConfigurationServiceOverrider {
3232
private InformerStoppedHandler informerStoppedHandler;
3333
private Boolean stopOnInformerErrorDuringStartup;
3434
private Duration cacheSyncTimeout;
35+
private Duration reconcileTerminationTimeout;
3536
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
3637
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
3738
private Boolean previousAnnotationForDependentResources;
@@ -127,6 +128,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime
127128
return this;
128129
}
129130

131+
public ConfigurationServiceOverrider withReconcileTerminationTimeout(
132+
Duration reconcileTerminationTimeout) {
133+
this.reconcileTerminationTimeout = reconcileTerminationTimeout;
134+
return this;
135+
}
136+
130137
public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources(
131138
boolean value) {
132139
this.ssaBasedCreateUpdateMatchForDependentResources = value;
@@ -251,6 +258,12 @@ public Duration cacheSyncTimeout() {
251258
return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout);
252259
}
253260

261+
@Override
262+
public Duration reconcileTerminationTimeout() {
263+
return overriddenValueOrDefault(reconcileTerminationTimeout,
264+
ConfigurationService::reconcileTerminationTimeout);
265+
}
266+
254267
@Override
255268
public boolean ssaBasedCreateUpdateMatchForDependentResources() {
256269
return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources,

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Optional;
45
import java.util.concurrent.Executors;
56

@@ -63,6 +64,7 @@ public <R extends HasMetadata> R clone(R object) {
6364
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
6465
.withInformerStoppedHandler((informer, ex) -> {
6566
})
67+
.withReconcileTerminationTimeout(Duration.ofSeconds(30))
6668
.build();
6769

6870
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -77,6 +79,7 @@ public <R extends HasMetadata> R clone(R object) {
7779
overridden.getLeaderElectionConfiguration());
7880
assertNotEquals(config.getInformerStoppedHandler(),
7981
overridden.getLeaderElectionConfiguration());
82+
assertNotEquals(config.reconcileTerminationTimeout(), overridden.reconcileTerminationTimeout());
8083
}
8184

8285
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class GracefulStopIT {
2828
.build();
2929

3030
@Test
31-
void stopsGracefullyWIthTimeout() {
31+
void stopsGracefullyWithTimeout() {
3232
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
3333
}
3434

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
9+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
10+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource;
11+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec;
12+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler;
13+
14+
import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.*;
15+
import static org.assertj.core.api.Assertions.*;
16+
import static org.awaitility.Awaitility.*;
17+
18+
public class GracefulStopWithConfigurationIT {
19+
20+
public static final String TEST_1 = "test1";
21+
22+
@RegisterExtension
23+
LocallyRunOperatorExtension operator =
24+
LocallyRunOperatorExtension.builder()
25+
.withConfigurationService(o -> o.withCloseClientOnStop(false)
26+
.withReconcileTerminationTimeout(Duration.ofSeconds(RECONCILER_SLEEP)))
27+
.withReconciler(new GracefulStopTestReconciler())
28+
.build();
29+
30+
@Test
31+
void stopsGracefullyWithTimeoutConfiguration() {
32+
testGracefulStop(TEST_1, 2);
33+
}
34+
35+
private void testGracefulStop(String resourceName, int expectedFinalGeneration) {
36+
var testRes = operator.create(testResource(resourceName));
37+
await().untilAsserted(() -> {
38+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
39+
assertThat(r.getStatus()).isNotNull();
40+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1);
41+
assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
42+
.getNumberOfExecutions()).isEqualTo(1);
43+
});
44+
45+
testRes.getSpec().setValue(2);
46+
operator.replace(testRes);
47+
48+
await().pollDelay(Duration.ofMillis(50)).untilAsserted(
49+
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
50+
.getNumberOfExecutions()).isEqualTo(2));
51+
52+
operator.getOperator().stop();
53+
54+
await().untilAsserted(() -> {
55+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
56+
assertThat(r.getStatus()).isNotNull();
57+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration);
58+
});
59+
}
60+
61+
public GracefulStopTestCustomResource testResource(String name) {
62+
GracefulStopTestCustomResource resource =
63+
new GracefulStopTestCustomResource();
64+
resource.setMetadata(
65+
new ObjectMetaBuilder()
66+
.withName(name)
67+
.withNamespace(operator.getNamespace())
68+
.build());
69+
resource.setSpec(new GracefulStopTestCustomResourceSpec());
70+
resource.getSpec().setValue(1);
71+
return resource;
72+
}
73+
74+
}

0 commit comments

Comments
 (0)