File tree Expand file tree Collapse file tree 6 files changed +19
-19
lines changed
flink-kubernetes-operator-api/src
main/java/org/apache/flink/kubernetes/operator/api/spec
test/java/org/apache/flink/kubernetes/operator/api/utils
flink-kubernetes-operator/src
main/java/org/apache/flink/kubernetes/operator/reconciler/diff
test/java/org/apache/flink/kubernetes/operator/reconciler/deployment Expand file tree Collapse file tree 6 files changed +19
-19
lines changed Original file line number Diff line number Diff line change @@ -56,6 +56,6 @@ public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
5656 type = DiffType .SCALE ,
5757 mode = KubernetesDeploymentMode .NATIVE )
5858 })
59- @ JsonDeserialize (using = ConfigJsonNodeDeserializer .class )
60- private ConfigJsonNode flinkConfiguration = new ConfigJsonNode ();
59+ @ JsonDeserialize (using = ConfigObjectNodeDeserializer .class )
60+ private ConfigObjectNode flinkConfiguration = new ConfigObjectNode ();
6161}
Original file line number Diff line number Diff line change 2626import java .util .Map ;
2727
2828/** */
29- public class ConfigJsonNode extends ObjectNode {
29+ public class ConfigObjectNode extends ObjectNode {
3030
31- public ConfigJsonNode () {
31+ public ConfigObjectNode () {
3232 this (JsonNodeFactory .instance );
3333 }
3434
35- public ConfigJsonNode (JsonNodeFactory nc , Map <String , JsonNode > kids ) {
35+ public ConfigObjectNode (JsonNodeFactory nc , Map <String , JsonNode > kids ) {
3636 super (nc , kids );
3737 }
3838
39- public ConfigJsonNode (JsonNodeFactory nc ) {
39+ public ConfigObjectNode (JsonNodeFactory nc ) {
4040 super (nc );
4141 }
4242
Original file line number Diff line number Diff line change 2424
2525import java .io .IOException ;
2626
27- /** ConfigJsonNode deserializer . */
28- public class ConfigJsonNodeDeserializer extends JsonDeserializer <ConfigJsonNode > {
27+ /** Allows to deserialize to ConfigObjectNode . */
28+ public class ConfigObjectNodeDeserializer extends JsonDeserializer <ConfigObjectNode > {
2929
3030 @ Override
31- public ConfigJsonNode deserialize (
31+ public ConfigObjectNode deserialize (
3232 JsonParser jsonParser , DeserializationContext deserializationContext )
3333 throws IOException {
3434 ObjectNode tree = jsonParser .readValueAsTree ();
35- var res = new ConfigJsonNode ();
35+ var res = new ConfigObjectNode ();
3636 tree .fields ().forEachRemaining (entry -> res .set (entry .getKey (), entry .getValue ()));
3737 return res ;
3838 }
Original file line number Diff line number Diff line change 2525import org .apache .flink .kubernetes .operator .api .FlinkSessionJob ;
2626import org .apache .flink .kubernetes .operator .api .FlinkStateSnapshot ;
2727import org .apache .flink .kubernetes .operator .api .spec .CheckpointSpec ;
28- import org .apache .flink .kubernetes .operator .api .spec .ConfigJsonNode ;
28+ import org .apache .flink .kubernetes .operator .api .spec .ConfigObjectNode ;
2929import org .apache .flink .kubernetes .operator .api .spec .FlinkDeploymentSpec ;
3030import org .apache .flink .kubernetes .operator .api .spec .FlinkSessionJobSpec ;
3131import org .apache .flink .kubernetes .operator .api .spec .FlinkStateSnapshotSpec ;
@@ -143,7 +143,7 @@ public static FlinkSessionJob buildSessionJob(
143143 .withResourceVersion ("1" )
144144 .build ());
145145
146- ConfigJsonNode conf = new ConfigJsonNode ();
146+ ConfigObjectNode conf = new ConfigObjectNode ();
147147 conf .put ("kubernetes.operator.user.artifacts.http.header" , "header" );
148148 sessionJob .setSpec (
149149 FlinkSessionJobSpec .builder ()
@@ -169,7 +169,7 @@ public static FlinkSessionJob buildSessionJob(JobState state) {
169169 }
170170
171171 public static FlinkDeploymentSpec getTestFlinkDeploymentSpec (FlinkVersion version ) {
172- ConfigJsonNode conf = new ConfigJsonNode ();
172+ ConfigObjectNode conf = new ConfigObjectNode ();
173173 conf .put (TaskManagerOptions .NUM_TASK_SLOTS .key (), "2" );
174174 conf .put (
175175 HighAvailabilityOptions .HA_MODE .key (),
Original file line number Diff line number Diff line change 2121import org .apache .flink .kubernetes .operator .api .diff .DiffType ;
2222import org .apache .flink .kubernetes .operator .api .diff .Diffable ;
2323import org .apache .flink .kubernetes .operator .api .diff .SpecDiff ;
24- import org .apache .flink .kubernetes .operator .api .spec .ConfigJsonNode ;
24+ import org .apache .flink .kubernetes .operator .api .spec .ConfigObjectNode ;
2525import org .apache .flink .kubernetes .operator .api .spec .FlinkDeploymentSpec ;
2626import org .apache .flink .kubernetes .operator .api .spec .KubernetesDeploymentMode ;
2727
@@ -86,13 +86,13 @@ private void appendFields(final Class<?> clazz) {
8686 var leftField = readField (field , before , true );
8787 var rightField = readField (field , after , true );
8888 if (field .getName ().equals (FLINK_CONFIGURATION_PROPERTY_NAME )) {
89- leftField = ((ConfigJsonNode ) leftField ).asFlatMap ();
90- rightField = ((ConfigJsonNode ) rightField ).asFlatMap ();
89+ leftField = ((ConfigObjectNode ) leftField ).asFlatMap ();
90+ rightField = ((ConfigObjectNode ) rightField ).asFlatMap ();
9191 }
9292
9393 if (field .isAnnotationPresent (SpecDiff .Config .class )
9494 && (Map .class .isAssignableFrom (field .getType ())
95- || (field .getType ().equals (ConfigJsonNode .class )
95+ || (field .getType ().equals (ConfigObjectNode .class )
9696 && field .getName ().equals ("flinkConfiguration" )))) {
9797 diffBuilder .append (
9898 field .getName (),
Original file line number Diff line number Diff line change 2828import org .apache .flink .kubernetes .operator .TestUtils ;
2929import org .apache .flink .kubernetes .operator .api .CrdConstants ;
3030import org .apache .flink .kubernetes .operator .api .FlinkDeployment ;
31- import org .apache .flink .kubernetes .operator .api .spec .ConfigJsonNode ;
31+ import org .apache .flink .kubernetes .operator .api .spec .ConfigObjectNode ;
3232import org .apache .flink .kubernetes .operator .api .spec .FlinkDeploymentSpec ;
3333import org .apache .flink .kubernetes .operator .api .spec .FlinkVersion ;
3434import org .apache .flink .kubernetes .operator .api .spec .JobState ;
@@ -954,7 +954,7 @@ public static FlinkDeployment buildApplicationCluster(
954954 default :
955955 throw new RuntimeException ("Unsupported upgrade mode " + upgradeMode );
956956 }
957- deployment .getSpec ().setFlinkConfiguration (new ConfigJsonNode ());
957+ deployment .getSpec ().setFlinkConfiguration (new ConfigObjectNode ());
958958 deployment .getSpec ().getFlinkConfiguration ().putAllFrom (conf );
959959 return deployment ;
960960 }
You can’t perform that action at this time.
0 commit comments