Skip to content

Commit b288737

Browse files
committed
feat: support for graceful shutdown based on configuration
Support for graceful shutdown based on configuration Signed-off-by: 10000-ki <[email protected]> Fix naming Signed-off-by: 10000-ki <[email protected]> Fix lint error Signed-off-by: 10000-ki <[email protected]> Fix lint error Signed-off-by: 10000-ki <[email protected]> Fix lint error Signed-off-by: 10000-ki <[email protected]> Fix test duration Signed-off-by: 10000-ki <[email protected]>
1 parent e9bafc7 commit b288737

File tree

7 files changed

+143
-22
lines changed

7 files changed

+143
-22
lines changed

docs/content/en/docs/patterns-and-best-practices/_index.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,25 @@ might be a permission issue for some resources in another namespace.
120120
The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179)
121121
behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start
122122
reconcile resources even if one or more event source caches did not sync yet.
123+
124+
## Graceful Shutdown
125+
126+
We can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down.
127+
There are two ways to support the graceful shutdown feature.
128+
129+
The first method is to directly provide a timeout value to the `stop` method.
130+
131+
```java
132+
final var operator = new Operator();
133+
134+
operator.stop(Duration.ofSeconds(5));
135+
```
136+
137+
The second method is to specify the timeout value by overriding the `ConfigurationService`.
138+
139+
```java
140+
final var overridden = new ConfigurationServiceOverrider(config)
141+
.withReconciliationTerminationTimeout(Duration.ofSeconds(5));
142+
143+
final var operator = new Operator(overridden);
144+
```

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
164164

165165
@Override
166166
public void stop() throws OperatorException {
167-
stop(Duration.ZERO);
167+
Duration reconciliationTerminationTimeout =
168+
configurationService.reconciliationTerminationTimeout();
169+
stop(reconciliationTerminationTimeout);
168170
}
169171

170172
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ static ConfigurationService newOverriddenConfigurationService(
101101
*
102102
* @param reconciler the reconciler we want the configuration of
103103
* @param <R> the {@code CustomResource} type associated with the specified reconciler
104-
* @return the {@link ControllerConfiguration} associated with the specified reconciler or {@code
105-
* null} if no configuration exists for the reconciler
104+
* @return the {@link ControllerConfiguration} associated with the specified reconciler or
105+
* {@code null} if no configuration exists for the reconciler
106106
*/
107107
<R extends HasMetadata> ControllerConfiguration<R> getConfigurationFor(Reconciler<R> reconciler);
108108

@@ -211,7 +211,7 @@ default int concurrentWorkflowExecutorThreads() {
211211

212212
/**
213213
* Override to provide a custom {@link Metrics} implementation
214-
*
214+
*
215215
* @return the {@link Metrics} implementation
216216
*/
217217
default Metrics getMetrics() {
@@ -221,7 +221,7 @@ default Metrics getMetrics() {
221221
/**
222222
* Override to provide a custom {@link ExecutorService} implementation to change how threads
223223
* handle concurrent reconciliations
224-
*
224+
*
225225
* @return the {@link ExecutorService} implementation to use for concurrent reconciliation
226226
* processing
227227
*/
@@ -232,7 +232,7 @@ default ExecutorService getExecutorService() {
232232
/**
233233
* Override to provide a custom {@link ExecutorService} implementation to change how dependent
234234
* workflows are processed in parallel
235-
*
235+
*
236236
* @return the {@link ExecutorService} implementation to use for dependent workflow processing
237237
*/
238238
default ExecutorService getWorkflowExecutorService() {
@@ -242,7 +242,7 @@ default ExecutorService getWorkflowExecutorService() {
242242
/**
243243
* Determines whether the associated Kubernetes client should be closed when the associated
244244
* {@link io.javaoperatorsdk.operator.Operator} is stopped.
245-
*
245+
*
246246
* @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise
247247
*/
248248
default boolean closeClientOnStop() {
@@ -252,7 +252,7 @@ default boolean closeClientOnStop() {
252252
/**
253253
* Override to provide a custom {@link DependentResourceFactory} implementation to change how
254254
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated
255-
*
255+
*
256256
* @return the custom {@link DependentResourceFactory} implementation
257257
*/
258258
@SuppressWarnings("rawtypes")
@@ -264,7 +264,7 @@ default DependentResourceFactory dependentResourceFactory() {
264264
* Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated
265265
* {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one
266266
* instance of the operator runs on the cluster at any given time
267-
*
267+
*
268268
* @return the {@link LeaderElectionConfiguration}
269269
*/
270270
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
@@ -299,6 +299,16 @@ default Duration cacheSyncTimeout() {
299299
return Duration.ofMinutes(2);
300300
}
301301

302+
/**
303+
* This is the timeout value that allows the reconciliation threads to gracefully shut down. If no
304+
* value is set, the default is immediate shutdown.
305+
*
306+
* @return The duration of time to wait before terminating the reconciliation threads
307+
*/
308+
default Duration reconciliationTerminationTimeout() {
309+
return Duration.ZERO;
310+
}
311+
302312
/**
303313
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
304314
* a resource that cannot be deserialized.
@@ -326,7 +336,7 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
326336
* Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how
327337
* {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are
328338
* instantiated
329-
*
339+
*
330340
* @return the custom {@link ManagedWorkflowFactory} implementation
331341
*/
332342
@SuppressWarnings("rawtypes")
@@ -336,7 +346,7 @@ default ManagedWorkflowFactory getWorkflowFactory() {
336346

337347
/**
338348
* Override to provide a custom {@link ExecutorServiceManager} implementation
339-
*
349+
*
340350
* @return the custom {@link ExecutorServiceManager} implementation
341351
*/
342352
default ExecutorServiceManager getExecutorServiceManager() {
@@ -353,9 +363,8 @@ default ExecutorServiceManager getExecutorServiceManager() {
353363
* SSA based create/update can be still used with the legacy matching, just overriding the match
354364
* method of Kubernetes Dependent Resource.
355365
*
356-
* @since 4.4.0
357-
*
358366
* @return if SSA should be used for dependent resources
367+
* @since 4.4.0
359368
*/
360369
default boolean ssaBasedCreateUpdateMatchForDependentResources() {
361370
return true;
@@ -383,9 +392,8 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
383392
* <p>
384393
* Disable this if you want to react to your own dependent resource updates
385394
*
386-
* @since 4.5.0
387-
*
388395
* @return if special annotation should be used for dependent resource to filter events
396+
* @since 4.5.0
389397
*/
390398
default boolean previousAnnotationForDependentResourcesEventFiltering() {
391399
return true;
@@ -400,9 +408,8 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() {
400408
* logic, and you want to further minimize the amount of work done / updates issued by the
401409
* operator.
402410
*
403-
* @since 4.5.0
404-
*
405411
* @return if resource version should be parsed (as integer)
412+
* @since 4.5.0
406413
*/
407414
default boolean parseResourceVersionsForEventFilteringAndCaching() {
408415
return false;
@@ -415,8 +422,8 @@ default boolean parseResourceVersionsForEventFilteringAndCaching() {
415422
*
416423
* @return {@code true} if Server-Side Apply (SSA) should be used when patching the primary
417424
* resources, {@code false} otherwise
418-
* @since 5.0.0
419425
* @see ConfigurationServiceOverrider#withUseSSAToPatchPrimaryResource(boolean)
426+
* @since 5.0.0
420427
*/
421428
default boolean useSSAToPatchPrimaryResource() {
422429
return true;
@@ -427,18 +434,17 @@ default boolean useSSAToPatchPrimaryResource() {
427434
* Determines whether resources retrieved from caches such as via calls to
428435
* {@link Context#getSecondaryResource(Class)} should be defensively cloned first.
429436
* </p>
430-
*
437+
*
431438
* <p>
432439
* Defensive cloning to prevent problematic cache modifications (modifying the resource would
433440
* otherwise modify the stored copy in the cache) was transparently done in previous JOSDK
434441
* versions. This might have performance consequences and, with the more prevalent use of
435442
* Server-Side Apply, where you should create a new copy of your resource with only modified
436443
* fields, such modifications of these resources are less likely to occur.
437444
* </p>
438-
*
445+
*
439446
* @return {@code true} if resources should be defensively cloned before returning them from
440447
* caches, {@code false} otherwise
441-
*
442448
* @since 5.0.0
443449
*/
444450
default boolean cloneSecondaryResourcesWhenGettingFromCache() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ConfigurationServiceOverrider {
3232
private InformerStoppedHandler informerStoppedHandler;
3333
private Boolean stopOnInformerErrorDuringStartup;
3434
private Duration cacheSyncTimeout;
35+
private Duration reconciliationTerminationTimeout;
3536
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
3637
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
3738
private Boolean previousAnnotationForDependentResources;
@@ -127,6 +128,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime
127128
return this;
128129
}
129130

131+
public ConfigurationServiceOverrider withReconciliationTerminationTimeout(
132+
Duration reconciliationTerminationTimeout) {
133+
this.reconciliationTerminationTimeout = reconciliationTerminationTimeout;
134+
return this;
135+
}
136+
130137
public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources(
131138
boolean value) {
132139
this.ssaBasedCreateUpdateMatchForDependentResources = value;
@@ -251,6 +258,12 @@ public Duration cacheSyncTimeout() {
251258
return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout);
252259
}
253260

261+
@Override
262+
public Duration reconciliationTerminationTimeout() {
263+
return overriddenValueOrDefault(reconciliationTerminationTimeout,
264+
ConfigurationService::reconciliationTerminationTimeout);
265+
}
266+
254267
@Override
255268
public boolean ssaBasedCreateUpdateMatchForDependentResources() {
256269
return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources,

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Optional;
45
import java.util.concurrent.Executors;
56

@@ -63,6 +64,7 @@ public <R extends HasMetadata> R clone(R object) {
6364
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
6465
.withInformerStoppedHandler((informer, ex) -> {
6566
})
67+
.withReconciliationTerminationTimeout(Duration.ofSeconds(30))
6668
.build();
6769

6870
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -77,6 +79,8 @@ public <R extends HasMetadata> R clone(R object) {
7779
overridden.getLeaderElectionConfiguration());
7880
assertNotEquals(config.getInformerStoppedHandler(),
7981
overridden.getLeaderElectionConfiguration());
82+
assertNotEquals(config.reconciliationTerminationTimeout(),
83+
overridden.reconciliationTerminationTimeout());
8084
}
8185

8286
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class GracefulStopIT {
2828
.build();
2929

3030
@Test
31-
void stopsGracefullyWIthTimeout() {
31+
void stopsGracefullyWithTimeout() {
3232
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
3333
}
3434

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
9+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
10+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource;
11+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec;
12+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler;
13+
14+
import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.*;
15+
import static org.assertj.core.api.Assertions.*;
16+
import static org.awaitility.Awaitility.*;
17+
18+
public class GracefulStopWithConfigurationIT {
19+
20+
public static final String TEST_1 = "test1";
21+
22+
@RegisterExtension
23+
LocallyRunOperatorExtension operator =
24+
LocallyRunOperatorExtension.builder()
25+
.withConfigurationService(o -> o.withCloseClientOnStop(false)
26+
.withReconciliationTerminationTimeout(Duration.ofMillis(RECONCILER_SLEEP)))
27+
.withReconciler(new GracefulStopTestReconciler())
28+
.build();
29+
30+
@Test
31+
void stopsGracefullyWithTimeoutConfiguration() {
32+
testGracefulStop(TEST_1, 2);
33+
}
34+
35+
private void testGracefulStop(String resourceName, int expectedFinalGeneration) {
36+
var testRes = operator.create(testResource(resourceName));
37+
await().untilAsserted(() -> {
38+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
39+
assertThat(r.getStatus()).isNotNull();
40+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1);
41+
assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
42+
.getNumberOfExecutions()).isEqualTo(1);
43+
});
44+
45+
testRes.getSpec().setValue(2);
46+
operator.replace(testRes);
47+
48+
await().pollDelay(Duration.ofMillis(50)).untilAsserted(
49+
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
50+
.getNumberOfExecutions()).isEqualTo(2));
51+
52+
operator.getOperator().stop();
53+
54+
await().untilAsserted(() -> {
55+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
56+
assertThat(r.getStatus()).isNotNull();
57+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration);
58+
});
59+
}
60+
61+
public GracefulStopTestCustomResource testResource(String name) {
62+
GracefulStopTestCustomResource resource =
63+
new GracefulStopTestCustomResource();
64+
resource.setMetadata(
65+
new ObjectMetaBuilder()
66+
.withName(name)
67+
.withNamespace(operator.getNamespace())
68+
.build());
69+
resource.setSpec(new GracefulStopTestCustomResourceSpec());
70+
resource.getSpec().setValue(1);
71+
return resource;
72+
}
73+
74+
}

0 commit comments

Comments
 (0)