Skip to content

Commit a10fb45

Browse files
authored
[FLINK-33803]remove metadata/generation fields from ReconciliationMetadata
1 parent b26a0f5 commit a10fb45

File tree

6 files changed

+32
-29
lines changed

6 files changed

+32
-29
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/reconciler/ReconciliationMetadata.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
2222

2323
import com.fasterxml.jackson.annotation.JsonInclude;
24-
import io.fabric8.kubernetes.api.model.ObjectMeta;
2524
import lombok.AllArgsConstructor;
2625
import lombok.Data;
2726
import lombok.NoArgsConstructor;
@@ -35,19 +34,15 @@ public class ReconciliationMetadata {
3534

3635
private String apiVersion;
3736

38-
private ObjectMeta metadata;
39-
4037
private boolean firstDeployment;
4138

4239
public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
43-
ObjectMeta metadata = new ObjectMeta();
44-
metadata.setGeneration(resource.getMetadata().getGeneration());
4540

4641
var firstDeploy =
4742
resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
4843
|| isFirstDeployment(resource);
4944

50-
return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
45+
return new ReconciliationMetadata(resource.getApiVersion(), firstDeploy);
5146
}
5247

5348
private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ public static <T extends AbstractFlinkSpec> SpecWithMeta<T> deserializeSpecWithM
5050
try {
5151
ObjectNode wrapper = (ObjectNode) objectMapper.readTree(specWithMetaString);
5252
ObjectNode internalMeta = (ObjectNode) wrapper.remove(INTERNAL_METADATA_JSON_KEY);
53-
5453
if (internalMeta == null) {
5554
// migrating from old format
5655
wrapper.remove("apiVersion");
5756
return new SpecWithMeta<>(objectMapper.treeToValue(wrapper, specClass), null);
5857
} else {
58+
internalMeta.remove("metadata");
5959
return new SpecWithMeta<>(
6060
objectMapper.treeToValue(wrapper.get("spec"), specClass),
6161
objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));

flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,11 @@ public class SpecUtilsTest {
3535
@Test
3636
public void testSpecSerializationWithVersion() throws JsonProcessingException {
3737
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
38-
app.getMetadata().setGeneration(12L);
3938
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
4039
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
4140

4241
ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
4342
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
44-
assertEquals(12L, internalMeta.get("metadata").get("generation").asLong());
4543
assertEquals(
4644
app.getSpec(),
4745
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
@@ -56,4 +54,26 @@ public void testSpecSerializationWithVersion() throws JsonProcessingException {
5654
migrated.getSpec().getJob().getJarURI());
5755
assertNull(migrated.getMeta());
5856
}
57+
58+
@Test
59+
public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
60+
// with regards to ReconcialiationMetadata & SpecWithMeta
61+
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
62+
app.getMetadata().setGeneration(12L);
63+
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
64+
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
65+
66+
ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
67+
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
68+
assertEquals(
69+
app.getSpec(),
70+
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
71+
assertNull(app.getStatus().getObservedGeneration());
72+
73+
// test backward compatibility
74+
String oldSerialized =
75+
"{\"apiVersion\":\"flink.apache.org/v1beta1\",\"metadata\":{\"generation\":5},\"firstDeployment\":false}";
76+
var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
77+
assertNull(migrated.getMeta());
78+
}
5979
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ public void observeAlreadyUpgraded() {
856856
status.getJobManagerDeploymentStatus());
857857

858858
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
859-
assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
859+
assertEquals(321L, status.getObservedGeneration());
860860
assertEquals(JobState.RUNNING, specWithMeta.getSpec().getJob().getState());
861861
assertEquals(5, specWithMeta.getSpec().getJob().getParallelism());
862862
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void observeAlreadyUpgraded() {
152152
status.getJobManagerDeploymentStatus());
153153

154154
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
155-
assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
155+
assertEquals(321L, status.getObservedGeneration());
156156
assertEquals("1", specWithMeta.getSpec().getFlinkConfiguration().get("k"));
157157
}
158158
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1260,30 +1260,18 @@ public void testUpgradeReconciledGeneration() throws Exception {
12601260
reconciler.reconcile(deployment, context);
12611261
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
12621262

1263-
assertEquals(
1264-
1L,
1265-
deployment
1266-
.getStatus()
1267-
.getReconciliationStatus()
1268-
.deserializeLastReconciledSpecWithMeta()
1269-
.getMeta()
1270-
.getMetadata()
1271-
.getGeneration());
1263+
assertEquals(1L, deployment.getStatus().getObservedGeneration());
12721264

12731265
// Submit no-op upgrade
12741266
deployment.getSpec().getFlinkConfiguration().put("kubernetes.operator.test", "value");
12751267
deployment.getMetadata().setGeneration(2L);
1268+
deployment
1269+
.getStatus()
1270+
.getReconciliationStatus()
1271+
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
12761272

12771273
reconciler.reconcile(deployment, context);
1278-
assertEquals(
1279-
2L,
1280-
deployment
1281-
.getStatus()
1282-
.getReconciliationStatus()
1283-
.deserializeLastReconciledSpecWithMeta()
1284-
.getMeta()
1285-
.getMetadata()
1286-
.getGeneration());
1274+
assertEquals(2L, deployment.getStatus().getObservedGeneration());
12871275
}
12881276

12891277
@ParameterizedTest

0 commit comments

Comments
 (0)