Skip to content

Commit e575e4e

Browse files
committed
JobWatcher should not react to responses that are not for the current job being watched
1 parent f658fe9 commit e575e4e

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

operator/src/main/java/oracle/kubernetes/operator/JobWatcher.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,11 @@ public void receivedResponse(Watch.Response<V1Job> item) {
8383
case "ADDED":
8484
case "MODIFIED":
8585
V1Job job = item.object;
86-
Boolean isComplete = isComplete(job) || isFailed(job);
86+
Boolean isComplete = isComplete(job);
87+
Boolean isFailed = isFailed(job);
8788
String jobName = job.getMetadata().getName();
88-
if (isComplete) {
89-
Complete complete = completeCallbackRegistrations.remove(jobName);
89+
if (isComplete || isFailed) {
90+
Complete complete = completeCallbackRegistrations.get(jobName);
9091
if (complete != null) {
9192
complete.isComplete(job);
9293
}
@@ -150,6 +151,11 @@ private WaitForJobReadyStep(V1Job job, Step next) {
150151
this.job = job;
151152
}
152153

154+
boolean shouldProcessJob(V1Job job) {
155+
return (this.job.getMetadata().getCreationTimestamp().getMillis()
156+
== job.getMetadata().getCreationTimestamp().getMillis());
157+
}
158+
153159
@Override
154160
public NextAction apply(Packet packet) {
155161
if (isComplete(job)) {
@@ -165,6 +171,10 @@ public NextAction apply(Packet packet) {
165171
(fiber) -> {
166172
Complete complete =
167173
(V1Job job) -> {
174+
if (!shouldProcessJob(job)) {
175+
return;
176+
}
177+
completeCallbackRegistrations.remove(job.getMetadata().getName());
168178
if (didResume.compareAndSet(false, true)) {
169179
LOGGER.fine("Job status: " + job.getStatus());
170180
packet.put(ProcessingConstants.DOMAIN_INTROSPECTOR_JOB, job);

0 commit comments

Comments
 (0)