Skip to content

[WIP][FLINK-37406] Add support for structured YAML config in FlinkDeployment CRD #1012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-kubernetes-operator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ under the License.
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -58,4 +59,6 @@ public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
mode = KubernetesDeploymentMode.NATIVE)
})
private Map<String, String> flinkConfiguration;

private JsonNode config;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,29 @@
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.snakeyaml.engine.v2.api.Load;
import org.snakeyaml.engine.v2.api.LoadSettings;
import org.snakeyaml.engine.v2.schema.CoreSchema;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

/** Spec utilities. */
public class SpecUtils {
public static final String INTERNAL_METADATA_JSON_KEY = "resource_metadata";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper yamlObjectMapper = new ObjectMapper(new YAMLFactory());

private static final Load loader =
new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());

/**
* Deserializes the spec and custom metadata object from JSON.
Expand Down Expand Up @@ -120,4 +134,43 @@ public static <T> T clone(T object) {
throw new IllegalStateException(e);
}
}

public static void moveConfigToFlinkConfiguration(AbstractFlinkSpec abstractFlinkSpec) {
if (abstractFlinkSpec.getConfig() != null && !abstractFlinkSpec.getConfig().isEmpty()) {
var props = parseConfigToStringMap(abstractFlinkSpec.getConfig());
abstractFlinkSpec.setConfig(null);
if (abstractFlinkSpec.getFlinkConfiguration() == null) {
abstractFlinkSpec.setFlinkConfiguration(new HashMap<>(props));
} else {
abstractFlinkSpec.getFlinkConfiguration().putAll(props);
}
}
}

public static Map<String, String> parseConfigToStringMap(JsonNode node) {
Map<String, String> flatMap = new LinkedHashMap<>();
flattenHelper(node, "", flatMap);
return flatMap;
}

private static void flattenHelper(
JsonNode node, String parentKey, Map<String, String> flatMap) {
if (node.isObject()) {
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String newKey =
parentKey.isEmpty() ? field.getKey() : parentKey + "." + field.getKey();
flattenHelper(field.getValue(), newKey, flatMap);
}
} else if (node.isArray()) {
for (int i = 0; i < node.size(); i++) {
String newKey = parentKey + "[" + i + "]";
flattenHelper(node.get(i), newKey, flatMap);
}
} else {
// Store values as strings
flatMap.put(parentKey, node.asText());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,34 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/** Test for {@link SpecUtils}. */
public class SpecUtilsTest {
class SpecUtilsTest {

@Test
public void testSpecSerializationWithVersion() throws JsonProcessingException {
void testSpecSerializationWithVersion() throws JsonProcessingException {
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);

ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
var deserialized =
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec();
deserialized.setConfig(null);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
assertEquals(
app.getSpec(),
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
assertEquals(app.getSpec(), deserialized);

// test backward compatibility
String oldSerialized =
Expand All @@ -56,18 +63,19 @@ public void testSpecSerializationWithVersion() throws JsonProcessingException {
}

@Test
public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
// with regards to ReconcialiationMetadata & SpecWithMeta
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
app.getMetadata().setGeneration(12L);
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);

ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
var deserialized =
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec();
deserialized.setConfig(null);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
assertEquals(
app.getSpec(),
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
assertEquals(app.getSpec(), deserialized);
assertNull(app.getStatus().getObservedGeneration());

// test backward compatibility
Expand All @@ -76,4 +84,44 @@ public void testSpecSerializationWithoutGeneration() throws JsonProcessingExcept
var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
assertNull(migrated.getMeta());
}

@Test
void testMovePropertiesFromConfigToFlinkConfiguration() {
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
var properties = new HashMap<String, String>();
properties.put("taskmanager.numberOfTaskSlots", "2");
properties.put("high-availability.storageDir", "file:///flink-data/ha");

app.getSpec().setFlinkConfiguration(properties);
app.getSpec()
.setConfig(
toJsonNode(
"""
taskmanager:
numberOfTaskSlots: 3
high-availability:
type: "KUBERNETES"
"""));

SpecUtils.moveConfigToFlinkConfiguration(app.getSpec());

assertThat(app.getSpec().getFlinkConfiguration())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
"taskmanager.numberOfTaskSlots",
"3",
"high-availability.type",
"KUBERNETES",
"high-availability.storageDir",
"file:///flink-data/ha"));
}

JsonNode toJsonNode(String yaml) {
try {
var objectMapper = new ObjectMapper(new YAMLFactory());
return objectMapper.readTree(yaml);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
Expand Down Expand Up @@ -122,7 +123,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context josdkContext)
throws Exception {

SpecUtils.moveConfigToFlinkConfiguration(flinkApp.getSpec());
if (canaryResourceManager.handleCanaryResourceReconciliation(
flinkApp, josdkContext.getClient())) {
return UpdateControl.noUpdate();
Expand All @@ -132,6 +133,8 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex

statusRecorder.updateStatusFromCache(flinkApp);
FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
SpecUtils.moveConfigToFlinkConfiguration(previousDeployment.getSpec());

var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);

// If we get an unsupported Flink version, trigger event and exit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
Expand Down Expand Up @@ -88,7 +89,7 @@ public FlinkSessionJobController(
@Override
public UpdateControl<FlinkSessionJob> reconcile(
FlinkSessionJob flinkSessionJob, Context josdkContext) {

SpecUtils.moveConfigToFlinkConfiguration(flinkSessionJob.getSpec());
if (canaryResourceManager.handleCanaryResourceReconciliation(
flinkSessionJob, josdkContext.getClient())) {
return UpdateControl.noUpdate();
Expand All @@ -98,6 +99,7 @@ public UpdateControl<FlinkSessionJob> reconcile(

statusRecorder.updateStatusFromCache(flinkSessionJob);
FlinkSessionJob previousJob = ReconciliationUtils.clone(flinkSessionJob);
SpecUtils.moveConfigToFlinkConfiguration(previousJob.getSpec());
var ctx = ctxFactory.getResourceContext(flinkSessionJob, josdkContext);

// If we get an unsupported Flink version, trigger event and exit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
properties:
spec:
properties:
config:
x-kubernetes-preserve-unknown-fields: true
flinkConfiguration:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
properties:
spec:
properties:
config:
x-kubernetes-preserve-unknown-fields: true
deploymentName:
type: string
flinkConfiguration:
Expand Down