Skip to content

Commit fc3fae4

Browse files
authored
Merge pull request #49 from ContainerSolutions/Reconciliation_Retry
Reconciliation retry
2 parents 62dd8ff + c3f6732 commit fc3fae4

File tree

25 files changed

+897
-92
lines changed

25 files changed

+897
-92
lines changed

operator-framework/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
</build>
3232

3333
<dependencies>
34+
<dependency>
35+
<groupId>com.google.guava</groupId>
36+
<artifactId>guava</artifactId>
37+
</dependency>
3438
<dependency>
3539
<groupId>io.fabric8</groupId>
3640
<artifactId>openshift-client</artifactId>
@@ -51,5 +55,22 @@
5155
<groupId>org.mockito</groupId>
5256
<artifactId>mockito-core</artifactId>
5357
</dependency>
58+
<dependency>
59+
<groupId>org.springframework</groupId>
60+
<artifactId>spring-core</artifactId>
61+
<scope>compile</scope>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.apache.logging.log4j</groupId>
65+
<artifactId>log4j-slf4j-impl</artifactId>
66+
<version>2.11.2</version>
67+
<scope>test</scope>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.assertj</groupId>
71+
<artifactId>assertj-core</artifactId>
72+
<version>3.4.1</version>
73+
<scope>test</scope>
74+
</dependency>
5475
</dependencies>
5576
</project>

operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.github.containersolutions.operator;
22

33
import com.github.containersolutions.operator.api.ResourceController;
4+
import com.github.containersolutions.operator.processing.EventDispatcher;
5+
import com.github.containersolutions.operator.processing.EventScheduler;
46
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
57
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionList;
68
import io.fabric8.kubernetes.client.CustomResource;
@@ -24,8 +26,9 @@ public class Operator {
2426

2527
private final KubernetesClient k8sClient;
2628

27-
private Map<ResourceController, EventDispatcher> controllers = new HashMap<>();
29+
private Map<ResourceController, EventScheduler> controllers = new HashMap<>();
2830
private Map<Class<? extends CustomResource>, CustomResourceOperationsImpl> customResourceClients = new HashMap<>();
31+
private EventScheduler eventScheduler;
2932

3033
private final static Logger log = LoggerFactory.getLogger(Operator.class);
3134

@@ -56,29 +59,32 @@ private <R extends CustomResource> void registerController(ResourceController<R>
5659
EventDispatcher<R> eventDispatcher =
5760
new EventDispatcher<>(controller, (CustomResourceOperationsImpl) client, client, k8sClient,
5861
ControllerUtils.getDefaultFinalizer(controller));
59-
registerWatches(controller, client, eventDispatcher, resClass, watchAllNamespaces, targetNamespaces);
62+
63+
eventScheduler = new EventScheduler(eventDispatcher);
64+
65+
registerWatches(controller, client, resClass, watchAllNamespaces, targetNamespaces);
6066
} else {
6167
throw new OperatorException("CRD '" + resClass.getSimpleName() + "' with version '"
6268
+ getVersion(controller) + "' not found");
6369
}
6470
}
6571

6672
private <R extends CustomResource> void registerWatches(ResourceController<R> controller, MixedOperation client,
67-
EventDispatcher<R> eventDispatcher, Class<R> resClass,
73+
Class<R> resClass,
6874
boolean watchAllNamespaces, String[] targetNamespaces) {
6975
CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client;
7076
if (watchAllNamespaces) {
71-
crClient.inAnyNamespace().watch(eventDispatcher);
77+
crClient.inAnyNamespace().watch(eventScheduler);
7278
} else if (targetNamespaces.length == 0) {
73-
client.watch(eventDispatcher);
79+
client.watch(eventScheduler);
7480
} else {
7581
for (String targetNamespace : targetNamespaces) {
76-
crClient.inNamespace(targetNamespace).watch(eventDispatcher);
82+
crClient.inNamespace(targetNamespace).watch(eventScheduler);
7783
log.debug("Registered controller for namespace: {}", targetNamespace);
7884
}
7985
}
8086
customResourceClients.put(resClass, (CustomResourceOperationsImpl) client);
81-
controllers.put(controller, eventDispatcher);
87+
controllers.put(controller, eventScheduler);
8288
log.info("Registered Controller: '{}' for CRD: '{}' for namespaces: {}", controller.getClass().getSimpleName(),
8389
resClass, targetNamespaces.length == 0 ? "[all/client namespace]" : Arrays.toString(targetNamespaces));
8490
}

operator-framework/src/main/java/com/github/containersolutions/operator/Context.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.api;
22

33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.fabric8.kubernetes.client.CustomResourceDoneable;

operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.github.containersolutions.operator.api;
22

3-
import com.github.containersolutions.operator.Context;
43
import io.fabric8.kubernetes.client.CustomResource;
54

65
import java.util.Optional;

operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceControllerAdapter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.github.containersolutions.operator.api;
22

3-
import com.github.containersolutions.operator.Context;
43
import io.fabric8.kubernetes.client.CustomResource;
54

65
import java.util.Optional;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.github.containersolutions.operator.processing;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.fabric8.kubernetes.client.Watcher;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.util.backoff.BackOffExecution;
8+
import org.springframework.util.backoff.ExponentialBackOff;
9+
10+
import java.util.Optional;
11+
12+
public class CustomResourceEvent {
13+
14+
public static final long INITIAL_BACK_OFF_INTERVAL = 2000L;
15+
16+
public static final int MAX_RETRY_COUNT = 5;
17+
public static final double BACK_OFF_MULTIPLIER = 1.5;
18+
private final static Logger log = LoggerFactory.getLogger(CustomResourceEvent.class);
19+
private final static BackOffExecution backOff = new ExponentialBackOff(INITIAL_BACK_OFF_INTERVAL, BACK_OFF_MULTIPLIER).start();
20+
21+
private final Watcher.Action action;
22+
private final CustomResource resource;
23+
private Integer retryIndex = -1;
24+
25+
CustomResourceEvent(Watcher.Action action, CustomResource resource) {
26+
this.action = action;
27+
this.resource = resource;
28+
}
29+
30+
Watcher.Action getAction() {
31+
return action;
32+
}
33+
34+
public CustomResource getResource() {
35+
return resource;
36+
}
37+
38+
public String getEventInfo() {
39+
CustomResource resource = this.getResource();
40+
return new StringBuilder().
41+
append("Resource ").append(getAction().toString().toLowerCase()).append(" -> ").
42+
append(Optional.ofNullable(resource.getMetadata().getNamespace()).orElse("cluster")).append("/").
43+
append(resource.getKind()).append(":").
44+
append(resource.getMetadata().getName()).toString();
45+
46+
}
47+
48+
public String resourceUid() {
49+
return resource.getMetadata().getUid();
50+
}
51+
52+
public Boolean sameResourceAs(CustomResourceEvent otherEvent) {
53+
return getResource().getMetadata().getUid().equals(otherEvent.getResource().getMetadata().getUid());
54+
}
55+
56+
public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
57+
return sameResourceAs(otherEvent) &&
58+
Long.parseLong(getResource().getMetadata().getResourceVersion()) >
59+
Long.parseLong(otherEvent.getResource().getMetadata().getResourceVersion());
60+
61+
}
62+
63+
64+
public Optional<Long> nextBackOff() {
65+
if (retryIndex == -1) {
66+
retryIndex = 0;
67+
return Optional.of(0l);
68+
} else {
69+
if (retryIndex >= MAX_RETRY_COUNT - 1) {
70+
return Optional.empty();
71+
} else {
72+
retryIndex++;
73+
return Optional.of(backOff.nextBackOff());
74+
}
75+
}
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "CustomResourceEvent{" +
81+
"action=" + action +
82+
", resource=[ name=" + resource.getMetadata().getName() + ", kind=" + resource.getKind() +
83+
", apiVersion=" + resource.getApiVersion() + " ,resourceVersion=" + resource.getMetadata().getResourceVersion() +
84+
" ], retriesIndex=" + retryIndex +
85+
'}';
86+
}
87+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.github.containersolutions.operator.processing;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.fabric8.kubernetes.client.Watcher;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
class EventConsumer implements Runnable {
9+
10+
private final static Logger log = LoggerFactory.getLogger(EventConsumer.class);
11+
private final CustomResourceEvent event;
12+
private final EventDispatcher eventDispatcher;
13+
private final EventScheduler eventScheduler;
14+
15+
EventConsumer(CustomResourceEvent event, EventDispatcher eventDispatcher, EventScheduler eventScheduler) {
16+
this.event = event;
17+
this.eventDispatcher = eventDispatcher;
18+
this.eventScheduler = eventScheduler;
19+
}
20+
21+
@Override
22+
public void run() {
23+
boolean stillScheduledForProcessing = eventScheduler.eventProcessingStarted(event);
24+
if (!stillScheduledForProcessing) {
25+
return;
26+
}
27+
if (processEvent()) {
28+
eventScheduler.eventProcessingFinishedSuccessfully(event);
29+
} else {
30+
this.eventScheduler.eventProcessingFailed(event);
31+
}
32+
}
33+
34+
@SuppressWarnings("unchecked")
35+
private boolean processEvent() {
36+
37+
Watcher.Action action = event.getAction();
38+
CustomResource resource = event.getResource();
39+
log.info("Processing event {}", event.getEventInfo());
40+
try {
41+
eventDispatcher.handleEvent(action, resource);
42+
} catch (RuntimeException e) {
43+
log.error("Processing event {} failed.", event.getEventInfo(), e);
44+
return false;
45+
}
46+
47+
return true;
48+
}
49+
}

operator-framework/src/main/java/com/github/containersolutions/operator/EventDispatcher.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.processing;
22

3+
import com.github.containersolutions.operator.api.Context;
34
import com.github.containersolutions.operator.api.ResourceController;
45
import io.fabric8.kubernetes.client.*;
56
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
@@ -11,7 +12,10 @@
1112
import java.util.ArrayList;
1213
import java.util.Optional;
1314

14-
public class EventDispatcher<R extends CustomResource> implements Watcher<R> {
15+
/**
16+
* Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource.
17+
*/
18+
public class EventDispatcher<R extends CustomResource> {
1519

1620
private final static Logger log = LoggerFactory.getLogger(EventDispatcher.class);
1721

@@ -25,31 +29,19 @@ public class EventDispatcher<R extends CustomResource> implements Watcher<R> {
2529
public EventDispatcher(ResourceController<R> controller,
2630
CustomResourceOperationsImpl<R, CustomResourceList<R>, CustomResourceDoneable<R>> resourceOperation,
2731
NonNamespaceOperation<R, CustomResourceList<R>, CustomResourceDoneable<R>,
28-
Resource<R, CustomResourceDoneable<R>>> resourceClient, KubernetesClient k8sClient,
32+
Resource<R, CustomResourceDoneable<R>>> resourceClient, KubernetesClient k8sClient,
2933
String defaultFinalizer
3034

3135
) {
32-
3336
this.controller = controller;
3437
this.resourceOperation = resourceOperation;
3538
this.resourceClient = resourceClient;
3639
this.resourceDefaultFinalizer = defaultFinalizer;
3740
this.k8sClient = k8sClient;
3841
}
3942

40-
public void eventReceived(Action action, R resource) {
41-
try {
42-
log.debug("Action: {}, {}: {}, Resource: {}", action, resource.getClass().getSimpleName(),
43-
resource.getMetadata().getName(), resource);
44-
handleEvent(action, resource);
45-
log.trace("Even handling finished for action: {} resource: {}", action, resource);
46-
} catch (RuntimeException e) {
47-
log.error("Error on resource: {}", resource.getMetadata().getName(), e);
48-
}
49-
}
50-
51-
private void handleEvent(Action action, R resource) {
52-
if (action == Action.MODIFIED || action == Action.ADDED) {
43+
public void handleEvent(Watcher.Action action, R resource) {
44+
if (action == Watcher.Action.MODIFIED || action == Watcher.Action.ADDED) {
5345
// we don't want to call delete resource if it not contains our finalizer,
5446
// since the resource still can be updates when marked for deletion and contains other finalizers
5547
if (markedForDeletion(resource) && hasDefaultFinalizer(resource)) {
@@ -70,11 +62,13 @@ private void handleEvent(Action action, R resource) {
7062
}
7163
}
7264
}
73-
if (Action.ERROR == action) {
65+
if (Watcher.Action.ERROR == action) {
7466
log.error("Received error for resource: {}", resource.getMetadata().getName());
67+
return;
7568
}
76-
if (Action.DELETED == action) {
69+
if (Watcher.Action.DELETED == action) {
7770
log.debug("Resource deleted: {}", resource.getMetadata().getName());
71+
return;
7872
}
7973
}
8074

@@ -106,14 +100,4 @@ private void addFinalizerIfNotPresent(R resource) {
106100
private boolean markedForDeletion(R resource) {
107101
return resource.getMetadata().getDeletionTimestamp() != null && !resource.getMetadata().getDeletionTimestamp().isEmpty();
108102
}
109-
110-
@Override
111-
public void onClose(KubernetesClientException e) {
112-
if (e != null) {
113-
log.error("Error: ", e);
114-
// we will exit the application if there was a watching exception, because of the bug in fabric8 client
115-
// see https://github.com/fabric8io/kubernetes-client/issues/1318
116-
System.exit(1);
117-
}
118-
}
119103
}

0 commit comments

Comments
 (0)