Skip to content

Commit e73363f

Browse files
authored
[FLINK-35278] Fix occasional NPE on getting latest resource for status replace
1 parent b1f4b3f commit e73363f

File tree

2 files changed

+76
-38
lines changed

2 files changed

+76
-38
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.kubernetes.operator.utils;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2223
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2324
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
@@ -127,47 +128,65 @@ private void replaceStatus(CR resource, STATUS prevStatus, KubernetesClient clie
127128
} catch (KubernetesClientException kce) {
128129
// 409 is the error code for conflicts resulting from the locking
129130
if (kce.getCode() == 409) {
130-
var currentVersion = resource.getMetadata().getResourceVersion();
131-
LOG.debug(
132-
"Could not apply status update for resource version {}",
133-
currentVersion);
134-
135-
var latest = client.resource(resource).get();
136-
var latestVersion = latest.getMetadata().getResourceVersion();
137-
138-
if (latestVersion.equals(currentVersion)) {
139-
// This should not happen as long as the client works consistently
140-
LOG.error("Unable to fetch latest resource version");
141-
throw kce;
142-
}
143-
144-
if (latest.getStatus().equals(prevStatus)) {
145-
if (retries++ < 3) {
146-
LOG.debug(
147-
"Retrying status update for latest version {}", latestVersion);
148-
resource.getMetadata().setResourceVersion(latestVersion);
149-
} else {
150-
// If we cannot get the latest version in 3 tries we throw the error to
151-
// retry with delay
152-
throw kce;
153-
}
154-
} else {
155-
throw new StatusConflictException(
156-
"Status have been modified externally in version "
157-
+ latestVersion
158-
+ " Previous: "
159-
+ objectMapper.writeValueAsString(prevStatus)
160-
+ " Latest: "
161-
+ objectMapper.writeValueAsString(latest.getStatus()));
162-
}
131+
handleLockingError(resource, prevStatus, client, retries, kce);
132+
++retries;
163133
} else {
164-
// We simply throw non conflict errors, to trigger retry with delay
134+
// We simply throw non-conflict errors, to trigger retry with delay
165135
throw kce;
166136
}
167137
}
168138
}
169139
}
170140

141+
@VisibleForTesting
142+
void handleLockingError(
143+
CR resource,
144+
STATUS prevStatus,
145+
KubernetesClient client,
146+
int retries,
147+
KubernetesClientException kce)
148+
throws JsonProcessingException {
149+
150+
var currentVersion = resource.getMetadata().getResourceVersion();
151+
LOG.debug("Could not apply status update for resource version {}", currentVersion);
152+
153+
var latest = client.resource(resource).get();
154+
if (latest == null || latest.getMetadata() == null) {
155+
// This can happen occasionally, we throw the error to retry with delay.
156+
throw new KubernetesClientException(
157+
String.format(
158+
"Failed to retrieve latest %s",
159+
latest == null ? "resource" : "metadata"),
160+
kce);
161+
}
162+
163+
var latestVersion = latest.getMetadata().getResourceVersion();
164+
if (currentVersion.equals(latestVersion)) {
165+
// This should not happen as long as the client works consistently
166+
LOG.error("Unable to fetch latest resource version");
167+
throw kce;
168+
}
169+
170+
if (latest.getStatus().equals(prevStatus)) {
171+
if (retries < 3) {
172+
LOG.debug("Retrying status update for latest version {}", latestVersion);
173+
resource.getMetadata().setResourceVersion(latestVersion);
174+
} else {
175+
// If we cannot get the latest version in 3 tries we throw the error to
176+
// retry with delay
177+
throw kce;
178+
}
179+
} else {
180+
throw new StatusConflictException(
181+
"Status have been modified externally in version "
182+
+ latestVersion
183+
+ " Previous: "
184+
+ objectMapper.writeValueAsString(prevStatus)
185+
+ " Latest: "
186+
+ objectMapper.writeValueAsString(latest.getStatus()));
187+
}
188+
}
189+
171190
/**
172191
* Update the custom resource status based on the in-memory cached to ensure that any status
173192
* updates that we made previously are always visible in the reconciliation loop. This is

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
2525

2626
import io.fabric8.kubernetes.client.KubernetesClient;
27+
import io.fabric8.kubernetes.client.KubernetesClientException;
2728
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
2829
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
2930
import org.junit.jupiter.api.Test;
3031

31-
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3234

3335
/** Test for {@link StatusRecorder}. */
3436
@EnableKubernetesMockClient(crud = true)
@@ -47,17 +49,34 @@ public void testPatchOnlyWhenChanged() throws InterruptedException {
4749
var lastRequest = mockServer.getLastRequest();
4850

4951
helper.patchAndCacheStatus(deployment, kubernetesClient);
50-
assertTrue(mockServer.getLastRequest() != lastRequest);
52+
assertThat(lastRequest).isNotSameAs(mockServer.getLastRequest());
5153
lastRequest = mockServer.getLastRequest();
5254
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLING_BACK);
5355
helper.patchAndCacheStatus(deployment, kubernetesClient);
5456

5557
// We intentionally compare references
56-
assertTrue(mockServer.getLastRequest() != lastRequest);
58+
assertThat(lastRequest).isNotSameAs(mockServer.getLastRequest());
5759
lastRequest = mockServer.getLastRequest();
5860

5961
// No update
6062
helper.patchAndCacheStatus(deployment, kubernetesClient);
61-
assertTrue(mockServer.getLastRequest() == lastRequest);
63+
assertThat(lastRequest).isSameAs(mockServer.getLastRequest());
64+
}
65+
66+
@Test
67+
public void testNullLatestResource() {
68+
var statusRecorder =
69+
new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>(
70+
new MetricManager<>(), (e, s) -> {});
71+
72+
var resource = TestUtils.buildApplicationCluster();
73+
var cause = new KubernetesClientException("dummy");
74+
assertThatThrownBy(
75+
() ->
76+
statusRecorder.handleLockingError(
77+
resource, null, kubernetesClient, 0, cause))
78+
.isInstanceOf(KubernetesClientException.class)
79+
.hasMessage("Failed to retrieve latest resource")
80+
.hasCause(cause);
6281
}
6382
}

0 commit comments

Comments
 (0)