Skip to content

Commit 37e7821

Browse files
authored
Merge pull request #533 from oracle/fix-take6
Suppress outdated watch events
2 parents e6995e5 + 5d4a65c commit 37e7821

File tree

2 files changed

+34
-23
lines changed

2 files changed

+34
-23
lines changed

operator/src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
abstract class Watcher<T> {
3030
static final String HAS_NEXT_EXCEPTION_MESSAGE = "IO Exception during hasNext method.";
3131
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
32-
private static final String IGNORED_RESOURCE_VERSION = "0";
32+
private static final long IGNORED_RESOURCE_VERSION = 0;
3333

3434
private final AtomicBoolean isDraining = new AtomicBoolean(false);
35-
private String resourceVersion;
35+
private Long resourceVersion;
3636
private AtomicBoolean stopping;
3737
private WatchListener<T> listener;
3838
private Thread thread = null;
@@ -45,7 +45,8 @@ abstract class Watcher<T> {
4545
* @param stopping an atomic boolean to watch to determine when to stop the watcher
4646
*/
4747
Watcher(String resourceVersion, AtomicBoolean stopping) {
48-
this.resourceVersion = resourceVersion;
48+
this.resourceVersion =
49+
!isNullOrEmptyString(resourceVersion) ? Long.parseLong(resourceVersion) : 0;
4950
this.stopping = stopping;
5051
}
5152

@@ -110,7 +111,8 @@ protected boolean isStopping() {
110111
}
111112

112113
private void watchForEvents() {
113-
try (WatchI<T> watch = initiateWatch(new WatchBuilder().withResourceVersion(resourceVersion))) {
114+
try (WatchI<T> watch =
115+
initiateWatch(new WatchBuilder().withResourceVersion(resourceVersion.toString()))) {
114116
while (watch.hasNext()) {
115117
Watch.Response<T> item = watch.next();
116118

@@ -139,9 +141,11 @@ private boolean isError(Watch.Response<T> item) {
139141
}
140142

141143
private void handleRegularUpdate(Watch.Response<T> item) {
142-
LOGGER.fine(MessageKeys.WATCH_EVENT, item.type, item.object);
143-
trackResourceVersion(item.type, item.object);
144-
if (listener != null) listener.receivedResponse(item);
144+
if (isFresh(item.type, item.object)) {
145+
LOGGER.fine(MessageKeys.WATCH_EVENT, item.type, item.object);
146+
trackResourceVersion(item.type, item.object);
147+
if (listener != null) listener.receivedResponse(item);
148+
}
145149
}
146150

147151
private void handleErrorResponse(Watch.Response<T> item) {
@@ -152,7 +156,8 @@ private void handleErrorResponse(Watch.Response<T> item) {
152156
if (index1 > 0) {
153157
int index2 = message.indexOf(')', index1 + 1);
154158
if (index2 > 0) {
155-
resourceVersion = message.substring(index1 + 1, index2);
159+
String val = message.substring(index1 + 1, index2);
160+
resourceVersion = !isNullOrEmptyString(val) ? Long.parseLong(val) : 0;
156161
}
157162
}
158163
}
@@ -170,27 +175,34 @@ private void trackResourceVersion(String type, Object object) {
170175
updateResourceVersion(getNewResourceVersion(type, object));
171176
}
172177

173-
private String getNewResourceVersion(String type, Object object) {
174-
if (type.equalsIgnoreCase("DELETED"))
175-
return Integer.toString(1 + Integer.parseInt(resourceVersion));
178+
private long getNewResourceVersion(String type, Object object) {
179+
if (type.equalsIgnoreCase("DELETED")) return 1 + resourceVersion;
176180
else return getResourceVersionFromMetadata(object);
177181
}
178182

179-
private String getResourceVersionFromMetadata(Object object) {
183+
private long getResourceVersionFromMetadata(Object object) {
180184
try {
181185
Method getMetadata = object.getClass().getDeclaredMethod("getMetadata");
182186
V1ObjectMeta metadata = (V1ObjectMeta) getMetadata.invoke(object);
183-
return metadata.getResourceVersion();
187+
String val = metadata.getResourceVersion();
188+
return !isNullOrEmptyString(val) ? Long.parseLong(val) : 0;
184189
} catch (Exception e) {
185190
LOGGER.warning(MessageKeys.EXCEPTION, e);
186191
return IGNORED_RESOURCE_VERSION;
187192
}
188193
}
189194

190-
private void updateResourceVersion(String newResourceVersion) {
191-
if (isNullOrEmptyString(resourceVersion)) resourceVersion = newResourceVersion;
192-
else if (newResourceVersion.compareTo(resourceVersion) > 0)
193-
resourceVersion = newResourceVersion;
195+
private boolean isFresh(String type, Object object) {
196+
if (resourceVersion == 0) return true;
197+
long newResourceVersion = getResourceVersionFromMetadata(object);
198+
return type.equalsIgnoreCase("DELETED")
199+
? newResourceVersion >= resourceVersion
200+
: newResourceVersion > resourceVersion;
201+
}
202+
203+
private void updateResourceVersion(long newResourceVersion) {
204+
if (resourceVersion == 0) resourceVersion = newResourceVersion;
205+
else if (newResourceVersion > resourceVersion) resourceVersion = newResourceVersion;
194206
}
195207

196208
private static boolean isNullOrEmptyString(String s) {

operator/src/test/java/oracle/kubernetes/operator/WatcherTestBase.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package oracle.kubernetes.operator;
66

77
import static java.net.HttpURLConnection.HTTP_GONE;
8-
import static oracle.kubernetes.operator.builders.EventMatcher.addEvent;
98
import static oracle.kubernetes.operator.builders.EventMatcher.modifyEvent;
109
import static org.hamcrest.MatcherAssert.assertThat;
1110
import static org.hamcrest.Matchers.contains;
@@ -107,12 +106,13 @@ private Watch.Response createHttpGoneErrorResponse(int nextResourceVersion) {
107106
@SuppressWarnings({"unchecked", "rawtypes"})
108107
@Test
109108
public void receivedEvents_areSentToListeners() {
110-
Object object = createObjectWithMetaData();
111-
StubWatchFactory.addCallResponses(createAddResponse(object), createModifyResponse(object));
109+
Object object1 = createObjectWithMetaData();
110+
Object object2 = createObjectWithMetaData();
111+
StubWatchFactory.addCallResponses(createAddResponse(object1), createModifyResponse(object2));
112112

113113
createAndRunWatcher(NAMESPACE, stopping, INITIAL_RESOURCE_VERSION);
114114

115-
assertThat(callBacks, contains(addEvent(object), modifyEvent(object)));
115+
assertThat(callBacks, contains(modifyEvent(object2)));
116116
}
117117

118118
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -121,15 +121,14 @@ public void afterFirstSetOfEvents_nextRequestSendsLastResourceVersion() {
121121
Object object1 = createObjectWithMetaData();
122122
Object object2 = createObjectWithMetaData();
123123
Watch.Response[] firstSet = {createAddResponse(object1), createModifyResponse(object2)};
124-
int resourceAfterFirstSet = resourceVersion - 1;
125124
StubWatchFactory.addCallResponses(firstSet);
126125
scheduleAddResponse(createObjectWithMetaData());
127126

128127
createAndRunWatcher(NAMESPACE, stopping, INITIAL_RESOURCE_VERSION);
129128

130129
assertThat(
131130
StubWatchFactory.getRequestParameters().get(1),
132-
hasEntry("resourceVersion", Integer.toString(resourceAfterFirstSet)));
131+
hasEntry("resourceVersion", String.valueOf(resourceVersion - 2)));
133132
}
134133

135134
@SuppressWarnings({"unchecked", "rawtypes"})

0 commit comments

Comments
 (0)