Skip to content

Commit d067c83

Browse files
committed
Backport 8.0.2 and fiber suspend debugging
1 parent 8baf1a5 commit d067c83

File tree

10 files changed

+84
-18
lines changed

10 files changed

+84
-18
lines changed

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ public void makeRightDomainPresence(
3030
public void dispatchEventWatch(Watch.Response<V1Event> item);
3131

3232
public void stopNamespace(String ns);
33+
34+
public void reportSuspendedFibers();
3335
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.ScheduledFuture;
1414
import java.util.concurrent.TimeUnit;
1515
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.function.BiConsumer;
1617
import javax.annotation.Nullable;
1718

1819
import io.kubernetes.client.openapi.models.V1ConfigMap;
@@ -219,6 +220,27 @@ public void stopNamespace(String ns) {
219220
}
220221
}
221222

223+
/**
224+
* Report on currently suspended fibers. This is the first step toward diagnosing if we need special handling
225+
* to kill or kick these fibers.
226+
*/
227+
public void reportSuspendedFibers() {
228+
if (LOGGER.isFineEnabled()) {
229+
BiConsumer<String, FiberGate> consumer =
230+
(namespace, gate) -> {
231+
gate.getCurrentFibers().forEach(
232+
(key, fiber) -> {
233+
Optional.ofNullable(fiber.getSuspendedStep()).ifPresent(suspendedStep -> {
234+
LOGGER.fine("Namespace: " + namespace + ", DomainUid: " + key
235+
+ ", Fiber: " + fiber.toString() + " is SUSPENDED at " + suspendedStep.getName());
236+
});
237+
});
238+
};
239+
makeRightFiberGates.forEach(consumer);
240+
statusFiberGates.forEach(consumer);
241+
}
242+
}
243+
222244
/**
223245
* Dispatch pod watch event.
224246
* @param item watch event

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private static void begin() {
210210
new InitializeNamespacesSecurityStep(targetNamespaces),
211211
new NamespaceRulesReviewStep(),
212212
CrdHelper.createDomainCrdStep(version,
213-
new StartNamespacesStep(targetNamespaces)));
213+
new StartNamespacesStep(targetNamespaces, false)));
214214
if (!isDedicated()) {
215215
strategy = Step.chain(strategy, readExistingNamespaces());
216216
}
@@ -304,7 +304,10 @@ static Runnable recheckDomains() {
304304
Collection<String> namespacesToStart = targetNamespaces;
305305
int recheckInterval = tuningAndConfig.getMainTuning().domainPresenceRecheckIntervalSeconds;
306306
DateTime now = DateTime.now();
307+
boolean isFullRecheck = false;
307308
if (lastFullRecheck.get().plusSeconds(recheckInterval).isBefore(now)) {
309+
processor.reportSuspendedFibers();
310+
isFullRecheck = true;
308311
lastFullRecheck.set(now);
309312
} else {
310313
// check for namespaces that need to be started
@@ -318,7 +321,7 @@ static Runnable recheckDomains() {
318321
}
319322

320323
if (!namespacesToStart.isEmpty()) {
321-
runSteps(new StartNamespacesStep(namespacesToStart));
324+
runSteps(new StartNamespacesStep(namespacesToStart, isFullRecheck));
322325
}
323326
};
324327
}
@@ -586,30 +589,35 @@ public NextAction apply(Packet packet) {
586589
}
587590

588591
private static class StartNamespacesStep extends ForEachNamespaceStep {
589-
StartNamespacesStep(Collection<String> targetNamespaces) {
592+
private final boolean isFullRecheck;
593+
594+
StartNamespacesStep(Collection<String> targetNamespaces, boolean isFullRecheck) {
590595
super(targetNamespaces);
596+
this.isFullRecheck = isFullRecheck;
591597
}
592598

593599
@Override
594600
protected Step action(String ns) {
595601
return Step.chain(
596602
new NamespaceRulesReviewStep(ns),
597-
new StartNamespaceBeforeStep(ns),
603+
new StartNamespaceBeforeStep(ns, isFullRecheck),
598604
readExistingResources(operatorNamespace, ns));
599605
}
600606
}
601607

602608
private static class StartNamespaceBeforeStep extends Step {
603609
private final String ns;
610+
private final boolean isFullRecheck;
604611

605-
StartNamespaceBeforeStep(String ns) {
612+
StartNamespaceBeforeStep(String ns, boolean isFullRecheck) {
606613
this.ns = ns;
614+
this.isFullRecheck = isFullRecheck;
607615
}
608616

609617
@Override
610618
public NextAction apply(Packet packet) {
611619
NamespaceStatus nss = namespaceStatuses.computeIfAbsent(ns, (key) -> new NamespaceStatus());
612-
if (!nss.isNamespaceStarting().getAndSet(true)) {
620+
if (isFullRecheck || !nss.isNamespaceStarting().getAndSet(true)) {
613621
return doNext(packet);
614622
}
615623
return doEnd(packet);

operator/src/main/java/oracle/kubernetes/operator/ServerStatusReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ public NextAction apply(Packet packet) {
181181
InputStream in = proc.getInputStream();
182182
if (proc.waitFor(timeoutSeconds, TimeUnit.SECONDS)) {
183183
int exitValue = proc.exitValue();
184+
LOGGER.fine("readState exit: " + exitValue + ", readState for " + pod.getMetadata().getName());
184185
if (exitValue == 0) {
185186
try (final Reader reader = new InputStreamReader(in, Charsets.UTF_8)) {
186187
state = CharStreams.toString(reader);
@@ -205,6 +206,7 @@ public NextAction apply(Packet packet) {
205206
}
206207
}
207208

209+
LOGGER.fine("readState: " + state + " for " + pod.getMetadata().getName());
208210
state = chooseStateOrLastKnownServerStatus(lastKnownStatus, state);
209211
serverStateMap.put(serverName, state);
210212
fiber.resume(packet);

operator/src/main/java/oracle/kubernetes/operator/WaitForReadyStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private String getNamespace() {
155155
return getMetadata(initialResource).getNamespace();
156156
}
157157

158-
private String getName() {
158+
public String getName() {
159159
return getMetadata(initialResource).getName();
160160
}
161161

operator/src/main/java/oracle/kubernetes/operator/work/Fiber.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public final class Fiber implements Runnable, Future<Void>, ComponentRegistry, A
7272
private final Map<String, Component> components = new ConcurrentHashMap<>();
7373
/** The next action for this Fiber. */
7474
private NextAction na;
75+
private NextAction last;
7576
private ClassLoader contextClassLoader;
7677
private CompletionCallback completionCallback;
7778
/** The thread on which this Fiber is currently executing, if applicable. */
@@ -88,7 +89,7 @@ public final class Fiber implements Runnable, Future<Void>, ComponentRegistry, A
8889
Fiber(Engine engine, Fiber parent) {
8990
this.owner = engine;
9091
this.parent = parent;
91-
id = iotaGen.incrementAndGet();
92+
id = (parent == null) ? iotaGen.incrementAndGet() : (parent.children.size() + 1);
9293

9394
// if this is run from another fiber, then we naturally inherit its context
9495
// classloader,
@@ -254,22 +255,21 @@ public void terminate(Throwable t, Packet packet) {
254255
*/
255256
@Override
256257
public Fiber createChildFiber() {
257-
Fiber child = owner.createChildFiber(this);
258-
259258
synchronized (this) {
260259
if (children == null) {
261260
children = new ArrayList<>();
262261
}
262+
Fiber child = owner.createChildFiber(this);
263+
263264
children.add(child);
264265
if (status.get() == NOT_COMPLETE) {
265266
addBreadCrumb(child);
266267
} else {
267268
// Race condition where child is created after parent is cancelled or done
268269
child.status.set(CANCELLED);
269270
}
271+
return child;
270272
}
271-
272-
return child;
273273
}
274274

275275
/**
@@ -318,6 +318,22 @@ public boolean isDone() {
318318
return status.get() == DONE;
319319
}
320320

321+
/**
322+
* The most recently invoked step if the fiber is currently suspended.
323+
* @return Last invoked step for suspended fiber.
324+
*/
325+
public Step getSuspendedStep() {
326+
lock.lock();
327+
try {
328+
if (na != null && na.kind == Kind.SUSPEND) {
329+
return last.next;
330+
}
331+
return null;
332+
} finally {
333+
lock.unlock();
334+
}
335+
}
336+
321337
/**
322338
* Wait for Fiber to complete.
323339
* @return none
@@ -631,6 +647,7 @@ private boolean doRunInternal(Holder<Boolean> isRequireUnlock) {
631647
result.packet = na.packet;
632648
}
633649

650+
last = na;
634651
na = result;
635652
switch (result.kind) {
636653
case INVOKE:

operator/src/main/java/oracle/kubernetes/operator/work/FiberGate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package oracle.kubernetes.operator.work;
55

6+
import java.util.HashMap;
7+
import java.util.Map;
68
import java.util.concurrent.ConcurrentHashMap;
79
import java.util.concurrent.ConcurrentMap;
810
import java.util.concurrent.ScheduledExecutorService;
@@ -34,6 +36,14 @@ public FiberGate(Engine engine) {
3436
this.placeholder = engine.createFiber();
3537
}
3638

39+
/**
40+
* Access map of current fibers.
41+
* @return Map of fibers in this gate
42+
*/
43+
public Map<String, Fiber> getCurrentFibers() {
44+
return new HashMap<>(gateMap);
45+
}
46+
3747
public ScheduledExecutorService getExecutor() {
3848
return engine.getExecutor();
3949
}

operator/src/main/java/oracle/kubernetes/operator/work/Step.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,11 @@ private static List<Step> stepToArray(Step stepGroup) {
9797
return stepsArray;
9898
}
9999

100-
101-
String getName() {
100+
/**
101+
* The name of the step. This will default to the class name minus "Step".
102+
* @return The name of the step
103+
*/
104+
public String getName() {
102105
String name = getClass().getName();
103106
int idx = name.lastIndexOf('.');
104107
if (idx >= 0) {
@@ -276,7 +279,8 @@ protected NextAction doForkJoin(
276279
new JoinCompletionCallback(fiber, packet, startDetails.size()) {
277280
@Override
278281
public void onCompletion(Packet p) {
279-
if (count.decrementAndGet() == 0) {
282+
int current = count.decrementAndGet();
283+
if (current == 0) {
280284
// no need to synchronize throwables as all fibers are done
281285
if (throwables.isEmpty()) {
282286
fiber.resume(packet);
@@ -369,7 +373,8 @@ public void onThrowable(Packet p, Throwable throwable) {
369373
synchronized (throwables) {
370374
throwables.add(throwable);
371375
}
372-
if (count.decrementAndGet() == 0) {
376+
int current = count.decrementAndGet();
377+
if (current == 0) {
373378
// no need to synchronize throwables as all fibers are done
374379
if (throwables.size() == 1) {
375380
fiber.terminate(throwable, packet);

operator/src/test/java/oracle/kubernetes/operator/work/StepChainTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private static class NamedStep extends Step {
130130
this.name = name;
131131
}
132132

133-
String getName() {
133+
public String getName() {
134134
return name;
135135
}
136136

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@
710710
<assertj.core.version>3.15.0</assertj.core.version>
711711
<commons.io.version>2.6</commons.io.version>
712712
<awaitility-version>4.0.2</awaitility-version>
713-
<client-java-version>8.0.0</client-java-version>
713+
<client-java-version>8.0.2</client-java-version>
714714
<junit.jupiter.version>5.6.0</junit.jupiter.version>
715715
<junit.vintage.version>5.6.0</junit.vintage.version>
716716
<junit.platform.version>1.6.0</junit.platform.version>

0 commit comments

Comments
 (0)