Skip to content

Commit 014fff3

Browse files
committed
scheduler and dispatcher fixed, cocurrency test passes
1 parent 463c2ff commit 014fff3

File tree

11 files changed

+69
-43
lines changed

11 files changed

+69
-43
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,6 @@ public CustomResource getResource() {
3535
return resource;
3636
}
3737

38-
public String getEventInfo() {
39-
CustomResource resource = this.getResource();
40-
return new StringBuilder().
41-
append("Resource ").append(getAction().toString().toLowerCase()).append(" -> ").
42-
append(Optional.ofNullable(resource.getMetadata().getNamespace()).orElse("cluster")).append("/").
43-
append(resource.getKind()).append(":").
44-
append(resource.getMetadata().getName()).toString();
45-
46-
}
47-
4838
public String resourceUid() {
4939
return resource.getMetadata().getUid();
5040
}
@@ -60,7 +50,6 @@ public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
6050

6151
}
6252

63-
6453
public Optional<Long> nextBackOff() {
6554
if (retryIndex == -1) {
6655
retryIndex = 0;
@@ -81,7 +70,7 @@ public String toString() {
8170
"action=" + action +
8271
", resource=[ name=" + resource.getMetadata().getName() + ", kind=" + resource.getKind() +
8372
", apiVersion=" + resource.getApiVersion() + " ,resourceVersion=" + resource.getMetadata().getResourceVersion() +
84-
" ], retriesIndex=" + retryIndex +
73+
", markerForDeletion: " + (resource.getMetadata().getDeletionTimestamp() != null && !resource.getMetadata().getDeletionTimestamp().isEmpty()) + " ], retriesIndex=" + retryIndex +
8574
'}';
8675
}
8776
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ private boolean processEvent() {
3636

3737
Watcher.Action action = event.getAction();
3838
CustomResource resource = event.getResource();
39-
log.info("Processing event {}", event.getEventInfo());
39+
log.info("Processing event {}", event);
4040
try {
4141
eventDispatcher.handleEvent(action, resource);
4242
} catch (RuntimeException e) {
43-
log.error("Processing event {} failed.", event.getEventInfo(), e);
43+
log.error("Processing event {} failed.", event, e);
44+
log.debug("Failed object: {}", resource);
4445
return false;
4546
}
4647

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public void handleEvent(Watcher.Action action, R resource) {
4545
if (action == Watcher.Action.MODIFIED || action == Watcher.Action.ADDED) {
4646
// we don't want to call delete resource if it not contains our finalizer,
4747
// since the resource still can be updates when marked for deletion and contains other finalizers
48-
if (markedForDeletion(resource) && hasDefaultFinalizer(resource)) {
48+
if (markedForDeletion(resource)) {
4949
boolean removeFinalizer = controller.deleteResource(resource, new Context(k8sClient, resourceClient));
50-
if (removeFinalizer) {
50+
if (removeFinalizer && hasDefaultFinalizer(resource)) {
5151
log.debug("Removing finalizer on {}: {}", resource.getMetadata().getName(), resource.getMetadata());
5252
removeDefaultFinalizer(resource);
5353
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,20 @@ public void eventReceived(Watcher.Action action, R resource) {
7676

7777
void scheduleEvent(CustomResourceEvent event) {
7878
log.debug("Current queue size {}", executor.getQueue().size());
79-
log.info("Scheduling event: {}", event.getEventInfo());
79+
log.info("Scheduling event: {}", event);
8080
try {
8181
lock.lock();
82-
if (eventStore.processedNewerVersionBefore(event)) {
83-
log.debug("Skipping event processing since was processed event with newer version before. {}", event);
82+
if (event.getAction() == Action.DELETED) {
83+
log.debug("Received delete action for event: {}", event);
84+
//
85+
// should we still check if its a retry for a "marked for deletion" modification event?
8486
return;
8587
}
86-
if (event.getAction() == Action.DELETED) {
88+
if (eventStore.processedNewerVersionBefore(event)) {
89+
log.debug("Skipping event processing since was processed event with newer version before. {}", event);
8790
return;
8891
}
92+
eventStore.updateLatestResourceVersionProcessed(event);
8993
if (eventStore.containsOlderVersionOfNotScheduledEvent(event)) {
9094
log.debug("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}", event);
9195
eventStore.addOrReplaceEventAsNotScheduledYet(event);
@@ -101,16 +105,11 @@ void scheduleEvent(CustomResourceEvent event) {
101105
EventStore.ResourceScheduleHolder scheduleHolder = eventStore.getEventScheduledForProcessing(event.resourceUid());
102106
CustomResourceEvent scheduledEvent = scheduleHolder.getCustomResourceEvent();
103107
ScheduledFuture<?> scheduledFuture = scheduleHolder.getScheduledFuture();
104-
// If newEvent is older than existing in queue, don't schedule and remove from cache
105-
if (scheduledEvent.isSameResourceAndNewerVersion(event)) {
106-
log.debug("Incoming event discarded because already scheduled event is newer. Discarded: {}, Scheduled: {}", event, scheduledEvent);
107-
return;
108-
}
109108
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
110109
if (event.isSameResourceAndNewerVersion(scheduledEvent)) {
111110
log.debug("Scheduled event canceled because incoming event is newer. Discarded: {}, New: {}", scheduledEvent, event);
112111
scheduledFuture.cancel(false);
113-
eventStore.removeEventScheduledForProcessing(scheduledEvent.resourceUid());
112+
eventStore.removeEventScheduledForProcessingVersionAware(scheduledEvent.resourceUid());
114113
}
115114
}
116115

@@ -119,18 +118,20 @@ void scheduleEvent(CustomResourceEvent event) {
119118
log.warn("Event limited max retry limit ({}), will be discarded. {}", MAX_RETRY_COUNT, event);
120119
return;
121120
}
121+
log.debug("Creating scheduled task for event: {}", event);
122122
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(event, eventDispatcher, this),
123123
nextBackOff.get(), TimeUnit.MILLISECONDS);
124124
eventStore.addEventScheduledForProcessing(new EventStore.ResourceScheduleHolder(event, scheduledTask));
125125
} finally {
126+
log.info("Scheduling event finished: {}", event);
126127
lock.unlock();
127128
}
128129
}
129130

130131
boolean eventProcessingStarted(CustomResourceEvent event) {
131132
try {
132133
lock.lock();
133-
EventStore.ResourceScheduleHolder res = eventStore.removeEventScheduledForProcessing(event.resourceUid());
134+
EventStore.ResourceScheduleHolder res = eventStore.removeEventScheduledForProcessingVersionAware(event);
134135
if (res == null) {
135136
// Double checking if the event is still scheduled. This is a corner case, but can actually happen.
136137
// In detail: it can happen that we scheduled an event for processing, it took some time that is was picked

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
public class EventStore {
1111

12-
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
12+
private final static Logger log = LoggerFactory.getLogger(EventStore.class);
1313

1414
private final Map<String, Long> lastResourceVersion = new HashMap<>();
1515

@@ -28,7 +28,6 @@ public CustomResourceEvent removeEventNotScheduledYet(String uid) {
2828

2929
public void addOrReplaceEventAsNotScheduledYet(CustomResourceEvent event) {
3030
eventsNotScheduledYet.put(event.resourceUid(), event);
31-
updateLatestResourceVersionProcessed(event);
3231
}
3332

3433
public boolean containsOlderVersionOfEventUnderProcessing(CustomResourceEvent newEvent) {
@@ -42,27 +41,34 @@ public boolean containsEventScheduledForProcessing(String uid) {
4241

4342
public void addEventUnderProcessing(CustomResourceEvent event) {
4443
eventsUnderProcessing.put(event.resourceUid(), event);
45-
updateLatestResourceVersionProcessed(event);
4644
}
4745

4846
public ResourceScheduleHolder getEventScheduledForProcessing(String uid) {
4947
return eventsScheduledForProcessing.get(uid);
5048
}
5149

52-
public ResourceScheduleHolder removeEventScheduledForProcessing(String uid) {
50+
public ResourceScheduleHolder removeEventScheduledForProcessingVersionAware(String uid) {
5351
return eventsScheduledForProcessing.remove(uid);
5452
}
5553

54+
public ResourceScheduleHolder removeEventScheduledForProcessingVersionAware(CustomResourceEvent customResourceEvent) {
55+
ResourceScheduleHolder holder = eventsScheduledForProcessing.get(customResourceEvent.resourceUid());
56+
if (holder.getCustomResourceEvent().getResource().getMetadata().getResourceVersion().equals(customResourceEvent.getResource().getMetadata().getResourceVersion())) {
57+
return removeEventScheduledForProcessingVersionAware(customResourceEvent.resourceUid());
58+
} else {
59+
return null;
60+
}
61+
}
62+
5663
public void addEventScheduledForProcessing(ResourceScheduleHolder resourceScheduleHolder) {
5764
eventsScheduledForProcessing.put(resourceScheduleHolder.getCustomResourceEvent().resourceUid(), resourceScheduleHolder);
58-
updateLatestResourceVersionProcessed(resourceScheduleHolder.getCustomResourceEvent());
5965
}
6066

6167
public CustomResourceEvent removeEventUnderProcessing(String uid) {
6268
return eventsUnderProcessing.remove(uid);
6369
}
6470

65-
private void updateLatestResourceVersionProcessed(CustomResourceEvent event) {
71+
public void updateLatestResourceVersionProcessed(CustomResourceEvent event) {
6672
Long current = lastResourceVersion.get(event.resourceUid());
6773
long received = Long.parseLong(event.getResource().getMetadata().getResourceVersion());
6874
if (current == null || received > current) {

operator-framework/src/test/java/com/github/containersolutions/operator/ConcurrencyTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.fabric8.kubernetes.api.model.ConfigMap;
55
import org.awaitility.Awaitility;
66
import org.junit.jupiter.api.*;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
79

810
import java.util.List;
911
import java.util.concurrent.TimeUnit;
@@ -15,9 +17,10 @@
1517
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
1618
public class ConcurrencyTest {
1719

18-
public static final int NUMBER_OF_RESOURCES_CREATED = 35;
19-
public static final int NUMBER_OF_RESOURCES_DELETED = 10;
20-
public static final int NUMBER_OF_RESOURCES_UPDATED = 15;
20+
public static final int NUMBER_OF_RESOURCES_CREATED = 50;
21+
public static final int NUMBER_OF_RESOURCES_DELETED = 30;
22+
public static final int NUMBER_OF_RESOURCES_UPDATED = 20;
23+
private static final Logger log = LoggerFactory.getLogger(ConcurrencyTest.class);
2124
public static final String UPDATED_SUFFIX = "_updated";
2225
private IntegrationTestSupport integrationTest = new IntegrationTestSupport();
2326

@@ -36,9 +39,9 @@ public void teardown() {
3639
integrationTest.teardown();
3740
}
3841

39-
4042
@Test
41-
public void manyResourcesGetCreatedUpdatedAndDeleted() {
43+
public void manyResourcesGetCreatedUpdatedAndDeleted() throws InterruptedException {
44+
log.info("Adding new resources.");
4245
for (int i = 0; i < NUMBER_OF_RESOURCES_CREATED; i++) {
4346
TestCustomResource tcr = integrationTest.createTestCustomResource(String.valueOf(i));
4447
integrationTest.getCrOperations().inNamespace(TEST_NAMESPACE).create(tcr);
@@ -49,9 +52,10 @@ public void manyResourcesGetCreatedUpdatedAndDeleted() {
4952
List<ConfigMap> items = integrationTest.getK8sClient().configMaps()
5053
.inNamespace(TEST_NAMESPACE)
5154
.list().getItems();
52-
assertThat(items).hasSize(35);
55+
assertThat(items).hasSize(NUMBER_OF_RESOURCES_CREATED);
5356
});
5457

58+
log.info("Updating resources.");
5559
// update some resources
5660
for (int i = 0; i < NUMBER_OF_RESOURCES_UPDATED; i++) {
5761
TestCustomResource tcr = integrationTest.createTestCustomResource(String.valueOf(i));
@@ -71,6 +75,7 @@ public void manyResourcesGetCreatedUpdatedAndDeleted() {
7175
}
7276
});
7377

78+
log.info("Deleting resources.");
7479
// deleting some resources
7580
for (int i = 0; i < NUMBER_OF_RESOURCES_DELETED; i++) {
7681
TestCustomResource tcr = integrationTest.createTestCustomResource(String.valueOf(i));

operator-framework/src/test/java/com/github/containersolutions/operator/IntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ public class IntegrationTest {
2323
private final static Logger log = LoggerFactory.getLogger(IntegrationTest.class);
2424
private IntegrationTestSupport integrationTestSupport = new IntegrationTestSupport();
2525

26-
private Operator operator;
27-
2826
@BeforeAll
2927
public void setup() {
3028
integrationTestSupport.initialize();

operator-framework/src/test/java/com/github/containersolutions/operator/sample/TestCustomResource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,13 @@ public TestCustomResourceStatus getStatus() {
2323
public void setStatus(TestCustomResourceStatus status) {
2424
this.status = status;
2525
}
26-
}
26+
27+
@Override
28+
public String toString() {
29+
return "TestCustomResource{" +
30+
"spec=" + spec +
31+
", status=" + status +
32+
", extendedFrom=" + super.toString() +
33+
'}';
34+
}
35+
}

operator-framework/src/test/java/com/github/containersolutions/operator/sample/TestCustomResourceSpec.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,13 @@ public String getValue() {
3131
public void setValue(String value) {
3232
this.value = value;
3333
}
34+
35+
@Override
36+
public String toString() {
37+
return "TestCustomResourceSpec{" +
38+
"configMapName='" + configMapName + '\'' +
39+
", key='" + key + '\'' +
40+
", value='" + value + '\'' +
41+
'}';
42+
}
3443
}

operator-framework/src/test/java/com/github/containersolutions/operator/sample/TestCustomResourceStatus.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,11 @@ public String getConfigMapStatus() {
1111
public void setConfigMapStatus(String configMapStatus) {
1212
this.configMapStatus = configMapStatus;
1313
}
14+
15+
@Override
16+
public String toString() {
17+
return "TestCustomResourceStatus{" +
18+
"configMapStatus='" + configMapStatus + '\'' +
19+
'}';
20+
}
1421
}

0 commit comments

Comments
 (0)