Skip to content

Commit 7880a11

Browse files
authored
[FLINK-37406] Fix config array serialization
1 parent d9fa479 commit 7880a11

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNode.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.apache.flink.configuration.Configuration;
2121

2222
import com.fasterxml.jackson.databind.JsonNode;
23+
import com.fasterxml.jackson.databind.node.ArrayNode;
2324
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2425
import com.fasterxml.jackson.databind.node.ObjectNode;
2526

27+
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.HashMap;
2830
import java.util.Iterator;
31+
import java.util.List;
2932
import java.util.Map;
3033

3134
/** Allows parsing configurations as YAML, and adds related utility methods. */
@@ -78,12 +81,26 @@ private static void flattenHelper(
7881
}
7982
} else if (node.isArray()) {
8083
for (int i = 0; i < node.size(); i++) {
81-
String newKey = parentKey + "[" + i + "]";
82-
flattenHelper(node.get(i), newKey, flatMap);
84+
if (node instanceof ArrayNode) {
85+
flatMap.put(parentKey, arrayNodeToSemicolonSepratedString((ArrayNode) node));
86+
} else {
87+
String newKey = parentKey + "[" + i + "]";
88+
flattenHelper(node.get(i), newKey, flatMap);
89+
}
8390
}
8491
} else {
85-
// Store values as strings
8692
flatMap.put(parentKey, node.asText());
8793
}
8894
}
95+
96+
private static String arrayNodeToSemicolonSepratedString(ArrayNode arrayNode) {
97+
if (arrayNode == null || arrayNode.isEmpty()) {
98+
return "";
99+
}
100+
List<String> stringValues = new ArrayList<>();
101+
for (JsonNode node : arrayNode) {
102+
stringValues.add(node.asText());
103+
}
104+
return String.join(";", stringValues);
105+
}
89106
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.fasterxml.jackson.databind.module.SimpleModule;
23+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.Map;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
class ConfigObjectNodeTest {
32+
33+
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
34+
35+
@BeforeEach
36+
void setup() {
37+
SimpleModule module = new SimpleModule();
38+
module.addDeserializer(ConfigObjectNode.class, new ConfigObjectNodeDeserializer());
39+
objectMapper.registerModule(module);
40+
}
41+
42+
@Test
43+
void deserializeArray() throws JsonProcessingException {
44+
var value = "a: 1\n" + "b:\n" + " c: [1,2]";
45+
46+
var flatMap = objectMapper.readValue(value, ConfigObjectNode.class).asFlatMap();
47+
48+
assertThat(flatMap).containsExactlyInAnyOrderEntriesOf(Map.of("a", "1", "b.c", "1;2"));
49+
}
50+
51+
@Test
52+
void deserializeStringArray() throws JsonProcessingException {
53+
var value = "a: 1\n" + "b:\n" + " c: [\"1\",\"2\"]";
54+
55+
var flatMap = objectMapper.readValue(value, ConfigObjectNode.class).asFlatMap();
56+
57+
assertThat(flatMap).containsExactlyInAnyOrderEntriesOf(Map.of("a", "1", "b.c", "1;2"));
58+
}
59+
}

0 commit comments

Comments
 (0)