diff --git a/flink-kubernetes-operator-api/pom.xml b/flink-kubernetes-operator-api/pom.xml
index 71b59abcc7..3b3d688b08 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -138,6 +138,13 @@ under the License.
junit-jupiter-params
test
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
index 8cecc5ebcf..b28d98756c 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
@@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
+import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -58,4 +59,6 @@ public abstract class AbstractFlinkSpec implements Diffable {
mode = KubernetesDeploymentMode.NATIVE)
})
private Map flinkConfiguration;
+
+ private JsonNode config;
}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
index 458dd69173..458a1665c5 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
@@ -23,15 +23,29 @@
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
/** Spec utilities. */
public class SpecUtils {
public static final String INTERNAL_METADATA_JSON_KEY = "resource_metadata";
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final ObjectMapper yamlObjectMapper = new ObjectMapper(new YAMLFactory());
+
+ private static final Load loader =
+ new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
/**
* Deserializes the spec and custom metadata object from JSON.
@@ -120,4 +134,43 @@ public static T clone(T object) {
throw new IllegalStateException(e);
}
}
+
+ public static void moveConfigToFlinkConfiguration(AbstractFlinkSpec abstractFlinkSpec) {
+ if (abstractFlinkSpec.getConfig() != null && !abstractFlinkSpec.getConfig().isEmpty()) {
+ var props = parseConfigToStringMap(abstractFlinkSpec.getConfig());
+ abstractFlinkSpec.setConfig(null);
+ if (abstractFlinkSpec.getFlinkConfiguration() == null) {
+ abstractFlinkSpec.setFlinkConfiguration(new HashMap<>(props));
+ } else {
+ abstractFlinkSpec.getFlinkConfiguration().putAll(props);
+ }
+ }
+ }
+
+ public static Map parseConfigToStringMap(JsonNode node) {
+ Map flatMap = new LinkedHashMap<>();
+ flattenHelper(node, "", flatMap);
+ return flatMap;
+ }
+
+ private static void flattenHelper(
+ JsonNode node, String parentKey, Map flatMap) {
+ if (node.isObject()) {
+ Iterator> fields = node.fields();
+ while (fields.hasNext()) {
+ Map.Entry field = fields.next();
+ String newKey =
+ parentKey.isEmpty() ? field.getKey() : parentKey + "." + field.getKey();
+ flattenHelper(field.getValue(), newKey, flatMap);
+ }
+ } else if (node.isArray()) {
+ for (int i = 0; i < node.size(); i++) {
+ String newKey = parentKey + "[" + i + "]";
+ flattenHelper(node.get(i), newKey, flatMap);
+ }
+ } else {
+ // Store values as strings
+ flatMap.put(parentKey, node.asText());
+ }
+ }
}
diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
index dfb48e4917..2142ffc1de 100644
--- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
+++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
@@ -22,27 +22,34 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
/** Test for {@link SpecUtils}. */
-public class SpecUtilsTest {
+class SpecUtilsTest {
@Test
- public void testSpecSerializationWithVersion() throws JsonProcessingException {
+ void testSpecSerializationWithVersion() throws JsonProcessingException {
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
+ var deserialized =
+ SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec();
+ deserialized.setConfig(null);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
- assertEquals(
- app.getSpec(),
- SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
+ assertEquals(app.getSpec(), deserialized);
// test backward compatibility
String oldSerialized =
@@ -56,7 +63,7 @@ public void testSpecSerializationWithVersion() throws JsonProcessingException {
}
@Test
- public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
+ void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
// with regards to ReconcialiationMetadata & SpecWithMeta
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
app.getMetadata().setGeneration(12L);
@@ -64,10 +71,11 @@ public void testSpecSerializationWithoutGeneration() throws JsonProcessingExcept
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
+ var deserialized =
+ SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec();
+ deserialized.setConfig(null);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
- assertEquals(
- app.getSpec(),
- SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
+ assertEquals(app.getSpec(), deserialized);
assertNull(app.getStatus().getObservedGeneration());
// test backward compatibility
@@ -76,4 +84,44 @@ public void testSpecSerializationWithoutGeneration() throws JsonProcessingExcept
var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
assertNull(migrated.getMeta());
}
+
+ @Test
+ void testMovePropertiesFromConfigToFlinkConfiguration() {
+ FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+ var properties = new HashMap();
+ properties.put("taskmanager.numberOfTaskSlots", "2");
+ properties.put("high-availability.storageDir", "file:///flink-data/ha");
+
+ app.getSpec().setFlinkConfiguration(properties);
+ app.getSpec()
+ .setConfig(
+ toJsonNode(
+ """
+ taskmanager:
+ numberOfTaskSlots: 3
+ high-availability:
+ type: "KUBERNETES"
+ """));
+
+ SpecUtils.moveConfigToFlinkConfiguration(app.getSpec());
+
+ assertThat(app.getSpec().getFlinkConfiguration())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+ "taskmanager.numberOfTaskSlots",
+ "3",
+ "high-availability.type",
+ "KUBERNETES",
+ "high-availability.storageDir",
+ "file:///flink-data/ha"));
+ }
+
+ JsonNode toJsonNode(String yaml) {
+ try {
+ var objectMapper = new ObjectMapper(new YAMLFactory());
+ return objectMapper.readTree(yaml);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index e5d418b81f..cfbe611d4b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
@@ -122,7 +123,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
@Override
public UpdateControl reconcile(FlinkDeployment flinkApp, Context josdkContext)
throws Exception {
-
+ SpecUtils.moveConfigToFlinkConfiguration(flinkApp.getSpec());
if (canaryResourceManager.handleCanaryResourceReconciliation(
flinkApp, josdkContext.getClient())) {
return UpdateControl.noUpdate();
@@ -132,6 +133,8 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex
statusRecorder.updateStatusFromCache(flinkApp);
FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
+ SpecUtils.moveConfigToFlinkConfiguration(previousDeployment.getSpec());
+
var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);
// If we get an unsupported Flink version, trigger event and exit
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index a7f2106b06..0290191523 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
@@ -88,7 +89,7 @@ public FlinkSessionJobController(
@Override
public UpdateControl reconcile(
FlinkSessionJob flinkSessionJob, Context josdkContext) {
-
+ SpecUtils.moveConfigToFlinkConfiguration(flinkSessionJob.getSpec());
if (canaryResourceManager.handleCanaryResourceReconciliation(
flinkSessionJob, josdkContext.getClient())) {
return UpdateControl.noUpdate();
@@ -98,6 +99,7 @@ public UpdateControl reconcile(
statusRecorder.updateStatusFromCache(flinkSessionJob);
FlinkSessionJob previousJob = ReconciliationUtils.clone(flinkSessionJob);
+ SpecUtils.moveConfigToFlinkConfiguration(previousJob.getSpec());
var ctx = ctxFactory.getResourceContext(flinkSessionJob, josdkContext);
// If we get an unsupported Flink version, trigger event and exit
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 292ecf3d4c..9211ebd245 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -31,6 +31,8 @@ spec:
properties:
spec:
properties:
+ config:
+ x-kubernetes-preserve-unknown-fields: true
flinkConfiguration:
additionalProperties:
type: string
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index f66d67ea72..2b28e0dbf5 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -31,6 +31,8 @@ spec:
properties:
spec:
properties:
+ config:
+ x-kubernetes-preserve-unknown-fields: true
deploymentName:
type: string
flinkConfiguration: