Skip to content

Commit ba62f07

Browse files
committed
FLIP-503 first batch
1 parent 01d2440 commit ba62f07

File tree

17 files changed

+12038
-5
lines changed

17 files changed

+12038
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api;
19+
20+
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
22+
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
23+
24+
import com.fasterxml.jackson.annotation.JsonInclude;
25+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
26+
import io.fabric8.kubernetes.api.model.Namespaced;
27+
import io.fabric8.kubernetes.client.CustomResource;
28+
import io.fabric8.kubernetes.model.annotation.Group;
29+
import io.fabric8.kubernetes.model.annotation.ShortNames;
30+
import io.fabric8.kubernetes.model.annotation.Version;
31+
32+
/** Custom resource definition that represents a deployments with Blue/Green rollout capability. */
33+
@Experimental
34+
@JsonInclude(JsonInclude.Include.NON_NULL)
35+
@JsonDeserialize()
36+
@Group(CrdConstants.API_GROUP)
37+
@Version(CrdConstants.API_VERSION)
38+
@ShortNames({"flinkbgdep"})
39+
public class FlinkBlueGreenDeployment
40+
extends CustomResource<FlinkBlueGreenDeploymentSpec, FlinkBlueGreenDeploymentStatus>
41+
implements Namespaced {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.bluegreen;
19+
20+
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
21+
22+
/** Enumeration of the two possible Flink Blue/Green deployment types. */
23+
public enum DeploymentType {
24+
BLUE,
25+
GREEN;
26+
27+
public static DeploymentType fromDeployment(FlinkDeployment flinkDeployment) {
28+
String typeAnnotation =
29+
flinkDeployment.getMetadata().getLabels().get(DeploymentType.class.getSimpleName());
30+
return DeploymentType.valueOf(typeAnnotation);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import org.apache.flink.annotation.Experimental;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import lombok.AllArgsConstructor;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
27+
/** Spec that describes a Flink application with blue/green deployment capabilities. */
28+
@Experimental
29+
@Data
30+
@NoArgsConstructor
31+
@AllArgsConstructor
32+
@JsonIgnoreProperties(ignoreUnknown = true)
33+
public class FlinkBlueGreenDeploymentSpec {
34+
35+
private FlinkDeploymentTemplateSpec template;
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import com.fasterxml.jackson.annotation.JsonIgnore;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
import io.fabric8.kubernetes.api.model.ObjectMeta;
23+
import lombok.AllArgsConstructor;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
import lombok.experimental.SuperBuilder;
27+
28+
import java.util.LinkedHashMap;
29+
import java.util.Map;
30+
31+
/** Template Spec that describes a Flink application managed by the blue/green controller. */
32+
@AllArgsConstructor
33+
@NoArgsConstructor
34+
@Data
35+
@SuperBuilder
36+
public class FlinkDeploymentTemplateSpec {
37+
38+
@JsonProperty("metadata")
39+
private ObjectMeta metadata;
40+
41+
@JsonProperty("deploymentDeletionDelaySec")
42+
private int deploymentDeletionDelaySec;
43+
44+
@JsonProperty("maxNumRetries")
45+
private int maxNumRetries;
46+
47+
@JsonProperty("reconciliationReschedulingIntervalMs")
48+
private int reconciliationReschedulingIntervalMs;
49+
50+
@JsonProperty("spec")
51+
private FlinkDeploymentSpec spec;
52+
53+
@JsonIgnore
54+
private Map<String, Object> additionalProperties = new LinkedHashMap<String, Object>();
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.status;
19+
20+
/** Enumeration of the possible states of the blue/green transition. */
21+
public enum FlinkBlueGreenDeploymentState {
22+
ACTIVE_BLUE,
23+
ACTIVE_GREEN,
24+
TRANSITIONING_TO_BLUE,
25+
TRANSITIONING_TO_GREEN,
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.status;
19+
20+
import org.apache.flink.annotation.Experimental;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import lombok.AllArgsConstructor;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
import lombok.ToString;
27+
import lombok.experimental.SuperBuilder;
28+
29+
/** Last observed status of the Flink Blue/Green deployment. */
30+
@Experimental
31+
@Data
32+
@AllArgsConstructor
33+
@NoArgsConstructor
34+
@ToString(callSuper = true)
35+
@SuperBuilder
36+
@JsonIgnoreProperties(ignoreUnknown = true)
37+
public class FlinkBlueGreenDeploymentStatus {
38+
39+
private JobStatus jobStatus = new JobStatus();
40+
41+
/** The state of the blue/green transition. */
42+
private FlinkBlueGreenDeploymentState blueGreenState;
43+
44+
/** Last reconciled (serialized) deployment spec. */
45+
private String lastReconciledSpec;
46+
47+
/** Timestamp of last reconciliation. */
48+
private Long lastReconciledTimestamp;
49+
50+
/** Current number of retries. */
51+
private int numRetries;
52+
53+
/** Information about the TaskManagers for the scale subresource. */
54+
private TaskManagerInfo taskManager;
55+
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,28 @@ public static String writeSpecWithMeta(
9999
}
100100
}
101101

102+
public static String serializeObject(Object object, String wrapperKey) {
103+
ObjectNode wrapper = objectMapper.createObjectNode();
104+
wrapper.set(wrapperKey, objectMapper.valueToTree(checkNotNull(object)));
105+
106+
try {
107+
return objectMapper.writeValueAsString(wrapper);
108+
} catch (JsonProcessingException e) {
109+
throw new RuntimeException(
110+
"Could not serialize " + wrapperKey + ", this indicates a bug...", e);
111+
}
112+
}
113+
114+
public static <T> T deserializeObject(String serialized, String wrapperKey, Class<T> valueType)
115+
throws JsonProcessingException {
116+
try {
117+
ObjectNode wrapper = (ObjectNode) objectMapper.readTree(serialized);
118+
return objectMapper.treeToValue(wrapper.get(wrapperKey), valueType);
119+
} catch (JsonProcessingException e) {
120+
throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
121+
}
122+
}
123+
102124
// We do not have access to Flink's Preconditions from here
103125
private static <T> T checkNotNull(T object) {
104126
if (object == null) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
3131
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3232
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
33+
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentController;
3334
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
3435
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
3536
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController;
@@ -242,6 +243,12 @@ void registerSnapshotController() {
242243
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
243244
}
244245

246+
@VisibleForTesting
247+
void registerBlueGreenController() {
248+
var controller = new FlinkBlueGreenDeploymentController(ctxFactory);
249+
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
250+
}
251+
245252
private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
246253
var operatorConf = configManager.getOperatorConfiguration();
247254
var watchNamespaces = operatorConf.getWatchedNamespaces();
@@ -262,6 +269,7 @@ public void run() {
262269
registerDeploymentController();
263270
registerSessionJobController();
264271
registerSnapshotController();
272+
registerBlueGreenController();
265273
operator.installShutdownHook(
266274
baseConfig.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT));
267275
operator.start();

0 commit comments

Comments
 (0)