Skip to content

Commit 65fa811

Browse files
committed
Fix onExit handling
1 parent 61133ba commit 65fa811

File tree

4 files changed

+76
-39
lines changed

4 files changed

+76
-39
lines changed

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,9 @@ public NextAction onSuccess(Packet packet, V1ServiceList result, int statusCode,
270270
if (info == null) {
271271
info = created;
272272
}
273-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
273+
ServerKubernetesObjects csko = new ServerKubernetesObjects();
274+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, csko);
275+
ServerKubernetesObjects sko = current != null ? current : csko;
274276
if (channelName != null) {
275277
sko.getChannels().put(channelName, service);
276278
} else {
@@ -305,7 +307,9 @@ public NextAction onSuccess(Packet packet, V1PodList result, int statusCode,
305307
if (info == null) {
306308
info = created;
307309
}
308-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
310+
ServerKubernetesObjects csko = new ServerKubernetesObjects();
311+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, csko);
312+
ServerKubernetesObjects sko = current != null ? current : csko;
309313
sko.getPod().set(pod);
310314
}
311315
}
@@ -1446,7 +1450,9 @@ private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
14461450
if (domainUID != null) {
14471451
DomainPresenceInfo info = domains.get(domainUID);
14481452
if (info != null && serverName != null) {
1449-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
1453+
ServerKubernetesObjects created = new ServerKubernetesObjects();
1454+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
1455+
ServerKubernetesObjects sko = current != null ? current : created;
14501456
if (sko != null) {
14511457
Fiber f;
14521458
Packet packet;
@@ -1507,7 +1513,9 @@ private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
15071513
if (domainUID != null) {
15081514
DomainPresenceInfo info = domains.get(domainUID);
15091515
if (info != null && serverName != null) {
1510-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
1516+
ServerKubernetesObjects created = new ServerKubernetesObjects();
1517+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
1518+
ServerKubernetesObjects sko = current != null ? current : created;
15111519
if (sko != null) {
15121520
switch (item.type) {
15131521
case "ADDED":

src/main/java/oracle/kubernetes/operator/helpers/PodHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ public NextAction apply(Packet packet) {
203203

204204
// Verify if Kubernetes api server has a matching Pod
205205
// Create or replace, if necessary
206-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(spec.getAsName(), new ServerKubernetesObjects());
206+
ServerKubernetesObjects created = new ServerKubernetesObjects();
207+
ServerKubernetesObjects current = info.getServers().putIfAbsent(spec.getAsName(), created);
208+
ServerKubernetesObjects sko = current != null ? current : created;
207209

208210
// First, verify existing Pod
209211
Step read = CallBuilder.create().readPodAsync(podName, namespace, new ResponseStep<V1Pod>(next) {
@@ -535,7 +537,9 @@ public NextAction apply(Packet packet) {
535537

536538
// Verify if Kubernetes api server has a matching Pod
537539
// Create or replace, if necessary
538-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(weblogicServerName, new ServerKubernetesObjects());
540+
ServerKubernetesObjects created = new ServerKubernetesObjects();
541+
ServerKubernetesObjects current = info.getServers().putIfAbsent(weblogicServerName, created);
542+
ServerKubernetesObjects sko = current != null ? current : created;
539543

540544
// First, verify there existing Pod
541545
Step read = CallBuilder.create().readPodAsync(podName, namespace, new ResponseStep<V1Pod>(next) {

src/main/java/oracle/kubernetes/operator/helpers/ServiceHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ public NextAction apply(Packet packet) {
100100

101101
// Verify if Kubernetes api server has a matching Service
102102
// Create or replace, if necessary
103-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
103+
ServerKubernetesObjects created = new ServerKubernetesObjects();
104+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
105+
ServerKubernetesObjects sko = current != null ? current : created;
104106

105107
// First, verify existing Service
106108
Step read = CallBuilder.create().readServiceAsync(name, namespace, new ResponseStep<V1Service>(next) {
@@ -315,7 +317,9 @@ public NextAction apply(Packet packet) {
315317

316318
// Verify if Kubernetes api server has a matching Service
317319
// Create or replace, if necessary
318-
ServerKubernetesObjects sko = info.getServers().putIfAbsent(serverName, new ServerKubernetesObjects());
320+
ServerKubernetesObjects created = new ServerKubernetesObjects();
321+
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
322+
ServerKubernetesObjects sko = current != null ? current : created;
319323

320324
// First, verify existing Service
321325
Step read = CallBuilder.create().readServiceAsync(name, namespace, new ResponseStep<V1Service>(next) {

src/main/java/oracle/kubernetes/operator/work/Fiber.java

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,12 @@ private boolean suspend(Holder<Boolean> isRequireUnlock, Consumer<Fiber> onExit)
424424
onExit.accept(this);
425425
} catch (Throwable t) {
426426
throw new OnExitRunnableException(t);
427+
} finally {
428+
synchronized (this) {
429+
if (currentThread == null) {
430+
triggerExitCallback();
431+
}
432+
}
427433
}
428434

429435
return true;
@@ -467,36 +473,25 @@ public ClassLoader setContextClassLoader(ClassLoader contextClassLoader) {
467473
*/
468474
@Override
469475
public void run() {
470-
// Clear the interrupted status, if present
471-
Thread.interrupted();
472-
473-
final Fiber oldFiber = CURRENT_FIBER.get();
474-
CURRENT_FIBER.set(this);
475-
Container oldContainer = ContainerResolver.getDefault().enterContainer(owner.getContainer());
476-
try {
477-
// doRun returns true to indicate an early exit from fiber processing
478-
if (!doRun(next)) {
479-
completionCheck();
480-
}
476+
if (status.get() == NOT_COMPLETE) {
477+
// Clear the interrupted status, if present
478+
Thread.interrupted();
481479

482-
// Trigger exitCallback
483-
synchronized (this) {
484-
if (exitCallback != null && exitCallback != PLACEHOLDER) {
485-
486-
if (LOGGER.isFineEnabled()) {
487-
LOGGER.fine("{0} invoking exit callback", new Object[] { getName() });
488-
}
489-
490-
exitCallback.onExit();
480+
final Fiber oldFiber = CURRENT_FIBER.get();
481+
CURRENT_FIBER.set(this);
482+
Container oldContainer = ContainerResolver.getDefault().enterContainer(owner.getContainer());
483+
try {
484+
// doRun returns true to indicate an early exit from fiber processing
485+
if (!doRun(next)) {
486+
completionCheck();
491487
}
492-
exitCallback = PLACEHOLDER;
488+
} finally {
489+
ContainerResolver.getDefault().exitContainer(oldContainer);
490+
CURRENT_FIBER.set(oldFiber);
493491
}
494-
} finally {
495-
ContainerResolver.getDefault().exitContainer(oldContainer);
496-
CURRENT_FIBER.set(oldFiber);
497492
}
498493
}
499-
494+
500495
private void completionCheck() {
501496
lock.lock();
502497
try {
@@ -573,12 +568,27 @@ private boolean doRun(Step next) {
573568
if (isRequireUnlock.value) {
574569
synchronized (this) {
575570
currentThread = null;
571+
triggerExitCallback();
576572
}
577573
lock.unlock();
578574
}
579575
}
580576
}
581577

578+
private void triggerExitCallback() {
579+
synchronized (this) {
580+
if (exitCallback != null && exitCallback != PLACEHOLDER) {
581+
582+
if (LOGGER.isFineEnabled()) {
583+
LOGGER.fine("{0} triggering exit callback", new Object[] { getName() });
584+
}
585+
586+
exitCallback.onExit();
587+
}
588+
exitCallback = PLACEHOLDER;
589+
}
590+
}
591+
582592
private boolean _doRun(Holder<Boolean> isRequireUnlock) {
583593
assert (lock.isHeldByCurrentThread());
584594

@@ -710,25 +720,36 @@ public boolean cancelAndExitCallback(boolean mayInterrupt, ExitCallback exitCall
710720
LOGGER.fine("{0} cancelled", new Object[] { getName() });
711721
}
712722

723+
AtomicInteger count = new AtomicInteger(1); // ensure we don't hit zero before iterating children
713724
synchronized (this) {
714725
if (currentThread != null) {
715726
if (mayInterrupt) {
716727
currentThread.interrupt();
717728
}
729+
count.incrementAndGet();
718730
}
719731

732+
ExitCallback myCallback = () -> {
733+
if (count.decrementAndGet() == 0) {
734+
exitCallback.onExit();
735+
}
736+
};
737+
720738
if (children != null) {
721739
for (Fiber child : children) {
722-
child.cancel(mayInterrupt);
740+
if (child.cancelAndExitCallback(mayInterrupt, myCallback)) {
741+
count.incrementAndGet();
742+
}
723743
}
724744
}
725745

726-
if (this.exitCallback != null) {
727-
throw new IllegalStateException();
728-
}
729-
boolean isWillCall = this.exitCallback != PLACEHOLDER;
746+
boolean isWillCall = count.get() > 1; // more calls outstanding then our initial buffer count
730747
if (isWillCall) {
731-
this.exitCallback = exitCallback;
748+
if (this.exitCallback != null || this.exitCallback == PLACEHOLDER) {
749+
throw new IllegalStateException();
750+
}
751+
this.exitCallback = myCallback;
752+
myCallback.onExit(); // remove the buffer count
732753
}
733754

734755
return isWillCall;

0 commit comments

Comments
 (0)