2424import com .fasterxml .jackson .core .JsonProcessingException ;
2525import com .fasterxml .jackson .databind .ObjectMapper ;
2626import com .fasterxml .jackson .databind .node .ObjectNode ;
27+ import com .fasterxml .jackson .dataformat .yaml .YAMLFactory ;
2728import org .junit .jupiter .api .Test ;
2829
30+ import java .util .Map ;
31+
32+ import static org .assertj .core .api .Assertions .assertThat ;
2933import static org .junit .jupiter .api .Assertions .assertEquals ;
3034import static org .junit .jupiter .api .Assertions .assertNull ;
3135
3236/** Test for {@link SpecUtils}. */
33- public class SpecUtilsTest {
37+ class SpecUtilsTest {
38+
39+ private static final ObjectMapper yamlObjectMapper = new ObjectMapper (new YAMLFactory ());
3440
3541 @ Test
36- public void testSpecSerializationWithVersion () throws JsonProcessingException {
42+ void testSpecSerializationWithVersion () throws JsonProcessingException {
3743 FlinkDeployment app = BaseTestUtils .buildApplicationCluster ();
3844 String serialized = SpecUtils .writeSpecWithMeta (app .getSpec (), app );
3945 ObjectNode node = (ObjectNode ) new ObjectMapper ().readTree (serialized );
@@ -56,7 +62,7 @@ public void testSpecSerializationWithVersion() throws JsonProcessingException {
5662 }
5763
5864 @ Test
59- public void testSpecSerializationWithoutGeneration () throws JsonProcessingException {
65+ void testSpecSerializationWithoutGeneration () throws JsonProcessingException {
6066 // with regards to ReconcialiationMetadata & SpecWithMeta
6167 FlinkDeployment app = BaseTestUtils .buildApplicationCluster ();
6268 app .getMetadata ().setGeneration (12L );
@@ -76,4 +82,57 @@ public void testSpecSerializationWithoutGeneration() throws JsonProcessingExcept
7682 var migrated = SpecUtils .deserializeSpecWithMeta (oldSerialized , FlinkDeploymentSpec .class );
7783 assertNull (migrated .getMeta ());
7884 }
85+
86+ @ Test
87+ void convertsStringMapToJsonNode () {
88+ var map = Map .of ("k1" , "v1" , "k2" , "v2" , "k3.nested" , "v3" );
89+ var node = SpecUtils .mapToJsonNode (map );
90+
91+ assertThat (node ).hasSize (3 );
92+ assertThat (node .get ("k1" ).asText ()).isEqualTo ("v1" );
93+ assertThat (node .get ("k2" ).asText ()).isEqualTo ("v2" );
94+ assertThat (node .get ("k3.nested" ).asText ()).isEqualTo ("v3" );
95+ }
96+
97+ @ Test
98+ void convertsJsonNodeToMap () throws JsonProcessingException {
99+ var node =
100+ yamlObjectMapper .readTree ("k1: v1 \n " + "k2: v2 \n " + "k3:\n " + " nested: v3\n " );
101+
102+ var map = SpecUtils .toStringMap (node );
103+ assertThat (map ).hasSize (3 );
104+ assertThat (map .get ("k1" )).isEqualTo ("v1" );
105+ assertThat (map .get ("k2" )).isEqualTo ("v2" );
106+ assertThat (map .get ("k3.nested" )).isEqualTo ("v3" );
107+ }
108+
109+ @ Test
110+ void addConfigPropertyToSpec () {
111+ var spec = new FlinkDeploymentSpec ();
112+
113+ SpecUtils .addConfigProperty (spec , "k1" , "v1" );
114+
115+ assertThat (spec .getFlinkConfiguration ().get ("k1" ).asText ()).isEqualTo ("v1" );
116+ }
117+
118+ @ Test
119+ void addConfigPropertiesToSpec () {
120+ var spec = new FlinkDeploymentSpec ();
121+
122+ SpecUtils .addConfigProperties (spec , Map .of ("k1" , "v1" , "k2" , "v2" ));
123+
124+ assertThat (spec .getFlinkConfiguration ().get ("k1" ).asText ()).isEqualTo ("v1" );
125+ assertThat (spec .getFlinkConfiguration ().get ("k2" ).asText ()).isEqualTo ("v2" );
126+ }
127+
128+ @ Test
129+ void removeConfigPropertiesFromSpec () {
130+ var spec = new FlinkDeploymentSpec ();
131+ SpecUtils .addConfigProperties (spec , Map .of ("k1" , "v1" , "k2" , "v2" ));
132+
133+ SpecUtils .removeConfigProperties (spec , "k1" );
134+
135+ assertThat (spec .getFlinkConfiguration ().get ("k1" )).isNull ();
136+ assertThat (spec .getFlinkConfiguration ().get ("k2" ).asText ()).isEqualTo ("v2" );
137+ }
79138}
0 commit comments