Skip to content

Commit 191ab70

Browse files
authored
Merge pull request #59 from ContainerSolutions/configurable-retry
Retry back off, our custom implementation
2 parents fb42b85 + 82b7ec3 commit 191ab70

File tree

12 files changed

+274
-47
lines changed

12 files changed

+274
-47
lines changed

operator-framework/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@
5555
<groupId>org.mockito</groupId>
5656
<artifactId>mockito-core</artifactId>
5757
</dependency>
58-
<dependency>
59-
<groupId>org.springframework</groupId>
60-
<artifactId>spring-core</artifactId>
61-
<scope>compile</scope>
62-
</dependency>
6358
<dependency>
6459
<groupId>org.apache.logging.log4j</groupId>
6560
<artifactId>log4j-slf4j-impl</artifactId>

operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.github.containersolutions.operator.api.ResourceController;
44
import com.github.containersolutions.operator.processing.EventDispatcher;
55
import com.github.containersolutions.operator.processing.EventScheduler;
6+
import com.github.containersolutions.operator.processing.retry.GenericRetry;
7+
import com.github.containersolutions.operator.processing.retry.Retry;
68
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
79
import io.fabric8.kubernetes.client.CustomResource;
810
import io.fabric8.kubernetes.client.CustomResourceDoneable;
@@ -25,24 +27,34 @@ public class Operator {
2527

2628
private final static Logger log = LoggerFactory.getLogger(Operator.class);
2729

30+
private final Retry defaultRetry = GenericRetry.defaultLimitedExponentialRetry();
2831
private final KubernetesClient k8sClient;
2932
private Map<Class<? extends CustomResource>, CustomResourceOperationsImpl> customResourceClients = new HashMap<>();
3033

3134
public Operator(KubernetesClient k8sClient) {
3235
this.k8sClient = k8sClient;
3336
}
3437

38+
3539
public <R extends CustomResource> void registerControllerForAllNamespaces(ResourceController<R> controller) throws OperatorException {
36-
registerController(controller, true);
40+
registerController(controller, true, defaultRetry);
41+
}
42+
43+
public <R extends CustomResource> void registerControllerForAllNamespaces(ResourceController<R> controller, Retry retry) throws OperatorException {
44+
registerController(controller, true, retry);
3745
}
3846

3947
public <R extends CustomResource> void registerController(ResourceController<R> controller, String... targetNamespaces) throws OperatorException {
40-
registerController(controller, false, targetNamespaces);
48+
registerController(controller, false, defaultRetry, targetNamespaces);
49+
}
50+
51+
public <R extends CustomResource> void registerController(ResourceController<R> controller, Retry retry, String... targetNamespaces) throws OperatorException {
52+
registerController(controller, false, retry, targetNamespaces);
4153
}
4254

4355
@SuppressWarnings("rawtypes")
4456
private <R extends CustomResource> void registerController(ResourceController<R> controller,
45-
boolean watchAllNamespaces, String... targetNamespaces) throws OperatorException {
57+
boolean watchAllNamespaces, Retry retry, String... targetNamespaces) throws OperatorException {
4658
Class<R> resClass = getCustomResourceClass(controller);
4759
CustomResourceDefinition crd = getCustomResourceDefinitionForController(controller);
4860
KubernetesDeserializer.registerCustomKind(getApiVersion(crd), getKind(crd), resClass);
@@ -53,7 +65,7 @@ private <R extends CustomResource> void registerController(ResourceController<R>
5365

5466
EventDispatcher eventDispatcher = new EventDispatcher(controller, (CustomResourceOperationsImpl) client,
5567
getDefaultFinalizer(controller));
56-
EventScheduler eventScheduler = new EventScheduler(eventDispatcher);
68+
EventScheduler eventScheduler = new EventScheduler(eventDispatcher, retry);
5769
registerWatches(controller, client, resClass, watchAllNamespaces, targetNamespaces, eventScheduler);
5870
}
5971

Original file line numberDiff line numberDiff line change
@@ -1,30 +1,23 @@
11
package com.github.containersolutions.operator.processing;
22

3+
import com.github.containersolutions.operator.processing.retry.Retry;
4+
import com.github.containersolutions.operator.processing.retry.RetryExecution;
35
import io.fabric8.kubernetes.client.CustomResource;
46
import io.fabric8.kubernetes.client.Watcher;
5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
7-
import org.springframework.util.backoff.BackOffExecution;
8-
import org.springframework.util.backoff.ExponentialBackOff;
97

108
import java.util.Optional;
119

1210
public class CustomResourceEvent {
1311

14-
public static final long INITIAL_BACK_OFF_INTERVAL = 2000L;
15-
16-
public static final int MAX_RETRY_COUNT = 5;
17-
public static final double BACK_OFF_MULTIPLIER = 1.5;
18-
private final static Logger log = LoggerFactory.getLogger(CustomResourceEvent.class);
19-
private final static BackOffExecution backOff = new ExponentialBackOff(INITIAL_BACK_OFF_INTERVAL, BACK_OFF_MULTIPLIER).start();
20-
12+
private final RetryExecution retryExecution;
2113
private final Watcher.Action action;
2214
private final CustomResource resource;
23-
private Integer retryIndex = -1;
15+
private int retryCount = -1;
2416

25-
CustomResourceEvent(Watcher.Action action, CustomResource resource) {
17+
CustomResourceEvent(Watcher.Action action, CustomResource resource, Retry retry) {
2618
this.action = action;
2719
this.resource = resource;
20+
this.retryExecution = retry.initExecution();
2821
}
2922

3023
Watcher.Action getAction() {
@@ -51,17 +44,8 @@ public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
5144
}
5245

5346
public Optional<Long> nextBackOff() {
54-
if (retryIndex == -1) {
55-
retryIndex = 0;
56-
return Optional.of(0l);
57-
} else {
58-
if (retryIndex >= MAX_RETRY_COUNT - 1) {
59-
return Optional.empty();
60-
} else {
61-
retryIndex++;
62-
return Optional.of(backOff.nextBackOff());
63-
}
64-
}
47+
retryCount++;
48+
return retryExecution.nextDelay();
6549
}
6650

6751
@Override
@@ -72,7 +56,7 @@ public String toString() {
7256
", apiVersion=" + resource.getApiVersion() + " ,resourceVersion=" + resource.getMetadata().getResourceVersion() +
7357
", markedForDeletion: " + (resource.getMetadata().getDeletionTimestamp() != null
7458
&& !resource.getMetadata().getDeletionTimestamp().isEmpty()) +
75-
" ], retriesIndex=" + retryIndex +
59+
" ], retriesIndex=" + retryCount +
7660
'}';
7761
}
7862
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.containersolutions.operator.processing;
22

33

4+
import com.github.containersolutions.operator.processing.retry.Retry;
45
import com.google.common.util.concurrent.ThreadFactoryBuilder;
56
import io.fabric8.kubernetes.client.CustomResource;
67
import io.fabric8.kubernetes.client.KubernetesClientException;
@@ -14,9 +15,6 @@
1415
import java.util.concurrent.TimeUnit;
1516
import java.util.concurrent.locks.ReentrantLock;
1617

17-
import static com.github.containersolutions.operator.processing.CustomResourceEvent.MAX_RETRY_COUNT;
18-
19-
2018
/**
2119
* Requirements:
2220
* <ul>
@@ -49,11 +47,13 @@ public class EventScheduler implements Watcher<CustomResource> {
4947
private final EventDispatcher eventDispatcher;
5048
private final ScheduledThreadPoolExecutor executor;
5149
private final EventStore eventStore = new EventStore();
50+
private final Retry retry;
5251

5352
private ReentrantLock lock = new ReentrantLock();
5453

55-
public EventScheduler(EventDispatcher eventDispatcher) {
54+
public EventScheduler(EventDispatcher eventDispatcher, Retry retry) {
5655
this.eventDispatcher = eventDispatcher;
56+
this.retry = retry;
5757
ThreadFactory threadFactory = new ThreadFactoryBuilder()
5858
.setNameFormat("event-consumer-%d")
5959
.setDaemon(false)
@@ -66,7 +66,7 @@ public EventScheduler(EventDispatcher eventDispatcher) {
6666
public void eventReceived(Watcher.Action action, CustomResource resource) {
6767
log.debug("Event received for action: {}, {}: {}", action.toString().toLowerCase(), resource.getClass().getSimpleName(),
6868
resource.getMetadata().getName());
69-
CustomResourceEvent event = new CustomResourceEvent(action, resource);
69+
CustomResourceEvent event = new CustomResourceEvent(action, resource, retry);
7070
scheduleEvent(event);
7171
}
7272

@@ -95,7 +95,7 @@ void scheduleEvent(CustomResourceEvent event) {
9595

9696
Optional<Long> nextBackOff = event.nextBackOff();
9797
if (!nextBackOff.isPresent()) {
98-
log.warn("Event limited max retry limit ({}), will be discarded. {}", MAX_RETRY_COUNT, event);
98+
log.warn("Event max retry limit reached. Will be discarded. {}", event);
9999
return;
100100
}
101101
log.debug("Creating scheduled task for event: {}", event);

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ public class EventStore {
1111
private final static Logger log = LoggerFactory.getLogger(EventStore.class);
1212

1313
private final Map<String, Long> lastResourceVersion = new HashMap<>();
14-
1514
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
1615
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
1716

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.github.containersolutions.operator.processing.retry;
2+
3+
public class GenericRetry implements Retry {
4+
5+
public static final int DEFAULT_MAX_ATTEMPTS = 5;
6+
public static final long DEFAULT_INITIAL_INTERVAL = 2000L;
7+
public static final double DEFAULT_MULTIPLIER = 1.5D;
8+
9+
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
10+
private long initialInterval = DEFAULT_INITIAL_INTERVAL;
11+
private double intervalMultiplier = DEFAULT_MULTIPLIER;
12+
private long maxInterval = -1;
13+
private long maxElapsedTime = -1;
14+
15+
public static GenericRetry defaultLimitedExponentialRetry() {
16+
return new GenericRetry();
17+
}
18+
19+
@Override
20+
public GenericRetryExecution initExecution() {
21+
return new GenericRetryExecution(this);
22+
}
23+
24+
public int getMaxAttempts() {
25+
return maxAttempts;
26+
}
27+
28+
public GenericRetry setMaxAttempts(int maxRetryAttempts) {
29+
this.maxAttempts = maxRetryAttempts;
30+
return this;
31+
}
32+
33+
public long getInitialInterval() {
34+
return initialInterval;
35+
}
36+
37+
public GenericRetry setInitialInterval(long initialInterval) {
38+
this.initialInterval = initialInterval;
39+
return this;
40+
}
41+
42+
public double getIntervalMultiplier() {
43+
return intervalMultiplier;
44+
}
45+
46+
public GenericRetry setIntervalMultiplier(double intervalMultiplier) {
47+
this.intervalMultiplier = intervalMultiplier;
48+
return this;
49+
}
50+
51+
public long getMaxInterval() {
52+
return maxInterval;
53+
}
54+
55+
public GenericRetry setMaxInterval(long maxInterval) {
56+
this.maxInterval = maxInterval;
57+
return this;
58+
}
59+
60+
public long getMaxElapsedTime() {
61+
return maxElapsedTime;
62+
}
63+
64+
public GenericRetry setMaxElapsedTime(long maxElapsedTime) {
65+
this.maxElapsedTime = maxElapsedTime;
66+
return this;
67+
}
68+
69+
public GenericRetry withoutMaxInterval() {
70+
this.maxInterval = -1;
71+
return this;
72+
}
73+
74+
public GenericRetry withoutMaxElapsedTime() {
75+
this.maxElapsedTime = -1;
76+
return this;
77+
}
78+
79+
public GenericRetry withoutMaxAttempts() {
80+
return this.setMaxAttempts(-1);
81+
}
82+
83+
public GenericRetry withLinearRetry() {
84+
this.intervalMultiplier = 1;
85+
return this;
86+
}
87+
88+
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.github.containersolutions.operator.processing.retry;
2+
3+
import java.util.Optional;
4+
5+
6+
public class GenericRetryExecution implements RetryExecution {
7+
8+
private final GenericRetry genericRetry;
9+
10+
private int lastAttemptIndex = 0;
11+
private long currentInterval;
12+
private long elapsedTime = 0;
13+
14+
public GenericRetryExecution(GenericRetry genericRetry) {
15+
this.genericRetry = genericRetry;
16+
this.currentInterval = genericRetry.getInitialInterval();
17+
}
18+
19+
/**
20+
* Note that first attempt is always 0. Since this implementation is tailored for event scheduling.
21+
*/
22+
public Optional<Long> nextDelay() {
23+
if (lastAttemptIndex == 0) {
24+
lastAttemptIndex++;
25+
return Optional.of(0l);
26+
}
27+
if (genericRetry.getMaxElapsedTime() > 0 && lastAttemptIndex > 0) {
28+
elapsedTime += currentInterval;
29+
if (elapsedTime > genericRetry.getMaxElapsedTime()) {
30+
return Optional.empty();
31+
}
32+
}
33+
if (genericRetry.getMaxAttempts() > -1 && lastAttemptIndex >= genericRetry.getMaxAttempts()) {
34+
return Optional.empty();
35+
}
36+
37+
if (lastAttemptIndex > 1) {
38+
currentInterval = (long) (currentInterval * genericRetry.getIntervalMultiplier());
39+
if (genericRetry.getMaxInterval() > -1 && currentInterval > genericRetry.getMaxInterval()) {
40+
currentInterval = genericRetry.getMaxInterval();
41+
}
42+
}
43+
lastAttemptIndex++;
44+
return Optional.of(currentInterval);
45+
}
46+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.github.containersolutions.operator.processing.retry;
2+
3+
public interface Retry {
4+
5+
RetryExecution initExecution();
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.github.containersolutions.operator.processing.retry;
2+
3+
import java.util.Optional;
4+
5+
public interface RetryExecution {
6+
7+
/**
8+
* Calculates the delay for the next execution. This method should return 0, when called first time;
9+
*
10+
* @return
11+
*/
12+
Optional<Long> nextDelay();
13+
14+
}

operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.github.containersolutions.operator.processing.EventDispatcher;
44
import com.github.containersolutions.operator.processing.EventScheduler;
5+
import com.github.containersolutions.operator.processing.retry.GenericRetry;
56
import com.github.containersolutions.operator.sample.TestCustomResource;
67
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
78
import io.fabric8.kubernetes.client.CustomResource;
@@ -16,7 +17,7 @@
1617
import java.util.List;
1718
import java.util.concurrent.CompletableFuture;
1819

19-
import static com.github.containersolutions.operator.processing.CustomResourceEvent.*;
20+
import static com.github.containersolutions.operator.processing.retry.GenericRetry.*;
2021
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.assertj.core.api.Assertions.atIndex;
2223
import static org.mockito.Mockito.*;
@@ -27,7 +28,7 @@ class EventSchedulerTest {
2728
@SuppressWarnings("unchecked")
2829
private EventDispatcher eventDispatcher = mock(EventDispatcher.class);
2930

30-
private EventScheduler eventScheduler = new EventScheduler(eventDispatcher);
31+
private EventScheduler eventScheduler = new EventScheduler(eventDispatcher, GenericRetry.defaultLimitedExponentialRetry());
3132

3233
private List<EventProcessingDetail> eventProcessingList = Collections.synchronizedList(new ArrayList<>());
3334

@@ -136,8 +137,8 @@ public void numberOfRetriesIsLimited() {
136137

137138
CompletableFuture.runAsync(() -> eventScheduler.eventReceived(Watcher.Action.MODIFIED, sampleResource()));
138139

139-
waitTimeForExecution(1, MAX_RETRY_COUNT + 2);
140-
assertThat(eventProcessingList).hasSize(MAX_RETRY_COUNT);
140+
waitTimeForExecution(1, DEFAULT_MAX_ATTEMPTS + 2);
141+
assertThat(eventProcessingList).hasSize(DEFAULT_MAX_ATTEMPTS);
141142
}
142143

143144
public void normalDispatcherExecution() {
@@ -183,7 +184,7 @@ private void waitTimeForExecution(int numberOfEvents) {
183184
private void waitTimeForExecution(int numberOfEvents, int retries) {
184185
try {
185186
Thread.sleep((long) (200 + ((INVOCATION_DURATION + 30) * numberOfEvents) + (retries * (INVOCATION_DURATION + 100)) +
186-
(Math.pow(BACK_OFF_MULTIPLIER, retries) * (INITIAL_BACK_OFF_INTERVAL + 100))));
187+
(Math.pow(DEFAULT_MULTIPLIER, retries) * (DEFAULT_INITIAL_INTERVAL + 100))));
187188
} catch (InterruptedException e) {
188189
throw new IllegalStateException(e);
189190
}

0 commit comments

Comments
 (0)