Skip to content

Commit 7ae6ec7

Browse files
authored
Refactor status prediction logic in PipelineReconciler to a common package (#131)
Reason: this logic will be used by the new K8s metadata status tables.
1 parent 7cf4aa5 commit 7ae6ec7

File tree

5 files changed

+461
-85
lines changed

5 files changed

+461
-85
lines changed

hoptimator-k8s/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ dependencies {
1818
testImplementation(testFixtures(project(':hoptimator-jdbc')))
1919
testImplementation(platform('org.junit:junit-bom:5.11.3'))
2020
testImplementation 'org.junit.jupiter:junit-jupiter'
21+
testImplementation 'org.mockito:mockito-junit-jupiter:5.+'
22+
testImplementation 'org.mockito:mockito-core:5.+'
23+
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.+'
2124
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
2225
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
2326
testRuntimeOnly libs.slf4j.simple
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.linkedin.hoptimator.k8s.status;
2+
3+
/** Represents status of an element which belongs to a {@link com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline}. */
4+
public class K8sPipelineElementStatus {
5+
private final String name;
6+
private final boolean ready;
7+
private final boolean failed;
8+
private final String message;
9+
10+
public K8sPipelineElementStatus(String name, boolean ready, boolean failed, String message) {
11+
this.name = name;
12+
this.ready = ready;
13+
this.failed = failed;
14+
this.message = message;
15+
}
16+
17+
/** Returns the name of this element. */
18+
public String getName() {
19+
return name;
20+
}
21+
22+
/** Returns true if this element is ready. */
23+
public boolean isReady() {
24+
return ready;
25+
}
26+
27+
/** Returns true if this element has failed . */
28+
public boolean isFailed() {
29+
return failed;
30+
}
31+
32+
/** Returns the detail message string of this element . */
33+
public String getMessage() {
34+
return message;
35+
}
36+
37+
@Override
38+
public String toString() {
39+
StringBuilder sb = new StringBuilder();
40+
sb.append("class K8sPipelineElementStatus {\n");
41+
sb.append(" name: ").append(ready).append("\n");
42+
sb.append(" ready: ").append(ready).append("\n");
43+
sb.append(" failed: ").append(failed).append("\n");
44+
sb.append(" message: ").append(message).append("\n");
45+
sb.append("}");
46+
return sb.toString();
47+
}
48+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package com.linkedin.hoptimator.k8s.status;
2+
3+
import com.google.gson.JsonObject;
4+
import com.linkedin.hoptimator.k8s.K8sContext;
5+
import com.linkedin.hoptimator.k8s.K8sUtils;
6+
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
7+
import io.kubernetes.client.util.generic.KubernetesApiResponse;
8+
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;
9+
import io.kubernetes.client.util.generic.dynamic.Dynamics;
10+
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.stream.Collectors;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
19+
/**
20+
* Estimates or guesses the status of an element of a {@link com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline} by inspecting its internal state.
21+
*/
22+
public class K8sPipelineElementStatusEstimator {
23+
private final static Logger log = LoggerFactory.getLogger(K8sPipelineElementStatusEstimator.class);
24+
25+
private final K8sContext context;
26+
27+
public K8sPipelineElementStatusEstimator(K8sContext context) {
28+
this.context = context;
29+
}
30+
31+
/**
32+
* Returns statuses of all elements specified in the given pipeline.
33+
*/
34+
public List<K8sPipelineElementStatus> estimateStatuses(V1alpha1Pipeline pipeline) {
35+
String namespace = pipeline.getMetadata().getNamespace();
36+
return Arrays.stream(pipeline.getSpec().getYaml().split("\n---\n"))
37+
.map(String::trim)
38+
.filter(x -> !x.isEmpty())
39+
.map(yaml -> estimateElementStatus(yaml, namespace))
40+
.collect(Collectors.toList());
41+
}
42+
43+
/**
44+
* Estimates status of an element. If we can not retrieve it from K8s, we assume that it's not ready and not failed yet.
45+
*/
46+
private K8sPipelineElementStatus estimateElementStatus(String elementYaml, String pipelineNamespace) {
47+
DynamicKubernetesObject obj = Dynamics.newFromYaml(elementYaml);
48+
String name = obj.getMetadata().getName();
49+
String namespace = obj.getMetadata().getNamespace() == null ? pipelineNamespace : obj.getMetadata().getNamespace();
50+
String kind = obj.getKind();
51+
try {
52+
KubernetesApiResponse<DynamicKubernetesObject> existing =
53+
context.dynamic(obj.getApiVersion(), K8sUtils.guessPlural(obj)).get(namespace, name);
54+
String failureMessage =
55+
String.format("Failed to fetch %s/%s in namespace %s: %s.", kind, name, namespace, existing.toString());
56+
existing.onFailure((code, status) -> log.warn(failureMessage));
57+
if (!existing.isSuccess()) {
58+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(name, failureMessage);
59+
}
60+
K8sPipelineElementStatus elementStatus = estimateDynamicObjectStatus(name, existing.getObject());
61+
if (elementStatus.isReady()) {
62+
log.info("{}/{} is ready.", kind, name);
63+
} else {
64+
log.info("{}/{} is NOT ready.", kind, name);
65+
}
66+
return elementStatus;
67+
} catch (Exception e) {
68+
String failureMessage =
69+
String.format("Encountered exception while checking status of %s/%s in namespace %s: %s", kind, name,
70+
namespace, e);
71+
log.error(failureMessage);
72+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(name, failureMessage);
73+
}
74+
}
75+
76+
public static K8sPipelineElementStatus estimateDynamicObjectStatus(String name, DynamicKubernetesObject obj) {
77+
// We make a best effort to guess the status of the dynamic object. By default, it's ready.
78+
if (obj == null || obj.getRaw() == null) {
79+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(name, "Returned K8s object is null or has no json");
80+
}
81+
82+
K8sPipelineElementStatus status = estimateBasedOnTopLevelStatusField(name, obj);
83+
if (status != null) {
84+
return status;
85+
}
86+
87+
// TODO: Look for common Conditions
88+
String message =
89+
String.format("Object %s/%s/%s considered ready by default.", obj.getMetadata().getNamespace(), obj.getKind(),
90+
obj.getMetadata().getName());
91+
log.warn(message);
92+
return new K8sPipelineElementStatus(name, true, false, message);
93+
}
94+
95+
private static K8sPipelineElementStatus estimateBasedOnTopLevelStatusField(String name, DynamicKubernetesObject obj) {
96+
if (!obj.getRaw().has("status")) {
97+
return null;
98+
}
99+
100+
JsonObject statusJson = obj.getRaw().get("status").getAsJsonObject();
101+
K8sPipelineElementStatus elementStatus;
102+
elementStatus = estimateBasedOnStatusReadyField(name, statusJson);
103+
if (elementStatus == null) {
104+
elementStatus = estimateBasedOnStatusStateField(name, statusJson);
105+
}
106+
if (elementStatus == null) {
107+
elementStatus = estimateBasedOnJobStatusStateField(name, statusJson);
108+
}
109+
110+
return elementStatus;
111+
}
112+
113+
private static K8sPipelineElementStatus estimateBasedOnStatusReadyField(String elementName, JsonObject statusJson) {
114+
try {
115+
boolean ready = statusJson.get("ready").getAsBoolean();
116+
boolean failed = statusJson.has("failed") && statusJson.get("failed").getAsBoolean();
117+
String message = statusJson.has("message") ? statusJson.get("message").getAsString() : "";
118+
return new K8sPipelineElementStatus(elementName, ready, failed, message);
119+
} catch (Exception e) {
120+
log.debug("Exception looking for .status.ready. Swallowing.", e);
121+
}
122+
return null;
123+
}
124+
125+
private static K8sPipelineElementStatus estimateBasedOnStatusStateField(String elementName, JsonObject statusJson) {
126+
try {
127+
String statusState = statusJson.get("state").getAsString();
128+
return fromStateString(elementName, statusState);
129+
} catch (Exception e) {
130+
log.debug("Exception looking for .status.state. Swallowing.", e);
131+
}
132+
return null;
133+
}
134+
135+
private static K8sPipelineElementStatus fromStateString(String elementName, String state) {
136+
boolean ready = state.matches("(?i)READY|RUNNING|FINISHED");
137+
boolean failed = state.matches("(?i)CRASHLOOPBACKOFF|FAILED");
138+
return new K8sPipelineElementStatus(elementName, ready, failed, state);
139+
}
140+
141+
private static K8sPipelineElementStatus estimateBasedOnJobStatusStateField(String elementName,
142+
JsonObject statusJson) {
143+
try {
144+
String jobState = statusJson.get("jobStatus").getAsJsonObject().get("state").getAsString();
145+
return fromStateString(elementName, jobState);
146+
} catch (Exception e) {
147+
log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e);
148+
}
149+
return null;
150+
}
151+
152+
/**
153+
* Defaults to unready state when we cannot retrieve the object from K8s.
154+
*/
155+
private static K8sPipelineElementStatus defaultUnreadyStatusOnK8sObjectRetrievalFailure(String elementName,
156+
String errorMessage) {
157+
return new K8sPipelineElementStatus(elementName, false, false, errorMessage);
158+
}
159+
}

0 commit comments

Comments
 (0)