Skip to content

Commit 27ae567

Browse files
committed
Addressing PR comments
1 parent 7d5072a commit 27ae567

File tree

6 files changed

+64
-208
lines changed

6 files changed

+64
-208
lines changed

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public void setup() {
8383
stateStore = new InMemoryAutoScalerStateStore<>();
8484
}
8585

86-
@Test
8786
void testMetricReporting() throws Exception {
8887
JobVertexID jobVertexID = new JobVertexID();
8988
JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, Map.of(), 1, 10));

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionMode.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.flink.kubernetes.operator.api.spec;
1919

20-
import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode;
21-
2220
import com.fasterxml.jackson.annotation.JsonIgnore;
2321
import com.fasterxml.jackson.annotation.JsonProperty;
2422
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -49,9 +47,6 @@ public class FlinkDeploymentTemplateSpec {
4947
@JsonProperty("reconciliationReschedulingIntervalMs")
5048
private int reconciliationReschedulingIntervalMs;
5149

52-
@JsonProperty("transitionMode")
53-
private TransitionMode transitionMode;
54-
5550
@JsonProperty("spec")
5651
private FlinkDeploymentSpec spec;
5752

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 5 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2323
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2424
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
25-
import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode;
2625
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2726
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
2827
import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -33,15 +32,9 @@
3332
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3433
import org.apache.flink.util.Preconditions;
3534

36-
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
37-
3835
import com.fasterxml.jackson.core.JsonProcessingException;
39-
import io.fabric8.kubernetes.api.model.Event;
4036
import io.fabric8.kubernetes.api.model.ObjectMeta;
41-
import io.fabric8.kubernetes.api.model.OwnerReference;
4237
import io.fabric8.kubernetes.api.model.StatusDetails;
43-
import io.fabric8.kubernetes.client.dsl.PodResource;
44-
import io.fabric8.kubernetes.client.dsl.Resource;
4538
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
4639
import io.javaoperatorsdk.operator.api.reconciler.Context;
4740
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -55,15 +48,9 @@
5548
import org.slf4j.Logger;
5649
import org.slf4j.LoggerFactory;
5750

58-
import javax.naming.OperationNotSupportedException;
59-
60-
import java.time.Instant;
6151
import java.util.List;
6252
import java.util.Map;
6353
import java.util.Optional;
64-
import java.util.Set;
65-
import java.util.stream.Collectors;
66-
import java.util.stream.Stream;
6754

6855
/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
6956
@ControllerConfiguration
@@ -101,12 +88,6 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
10188
FlinkBlueGreenDeployment bgDeployment, Context<FlinkBlueGreenDeployment> josdkContext)
10289
throws Exception {
10390

104-
// TODO: this verification is only for FLIP-503, remove later.
105-
if (bgDeployment.getSpec().getTemplate().getTransitionMode() != TransitionMode.BASIC) {
106-
throw new OperationNotSupportedException(
107-
"Only TransitionMode == BASIC is currently supported");
108-
}
109-
11091
FlinkBlueGreenDeploymentStatus deploymentStatus = bgDeployment.getStatus();
11192

11293
if (deploymentStatus == null) {
@@ -430,61 +411,6 @@ private static void setLastReconciledSpec(
430411
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
431412
}
432413

433-
public void logPotentialWarnings(
434-
FlinkDeployment flinkDeployment,
435-
Context<FlinkBlueGreenDeployment> josdkContext,
436-
long lastReconciliationTimestamp) {
437-
// Event reason constants
438-
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go
439-
Set<String> badEventPatterns =
440-
ImmutableSet.of(
441-
"FAIL", "EXCEPTION", "BACKOFF", "ERROR", "EVICTION", "KILL", "EXCEED");
442-
Set<String> goodPodPhases = ImmutableSet.of("PENDING", "RUNNING");
443-
444-
Set<String> podPhases =
445-
getDeploymentPods(josdkContext, flinkDeployment)
446-
.map(p -> p.get().getStatus().getPhase().toUpperCase())
447-
.collect(Collectors.toSet());
448-
449-
podPhases.removeAll(goodPodPhases);
450-
451-
if (!podPhases.isEmpty()) {
452-
LOG.warn("Deployment not healthy, some Pods have the following status: " + podPhases);
453-
}
454-
455-
List<Event> abnormalEvents =
456-
josdkContext
457-
.getClient()
458-
.v1()
459-
.events()
460-
.inNamespace(flinkDeployment.getMetadata().getNamespace())
461-
.resources()
462-
.map(Resource::item)
463-
.filter(e -> !e.getType().equalsIgnoreCase("NORMAL"))
464-
.filter(
465-
e ->
466-
e.getInvolvedObject()
467-
.getName()
468-
.contains(flinkDeployment.getMetadata().getName()))
469-
.filter(
470-
e ->
471-
Instant.parse(e.getLastTimestamp()).toEpochMilli()
472-
> lastReconciliationTimestamp)
473-
.filter(
474-
e ->
475-
badEventPatterns.stream()
476-
.anyMatch(
477-
p ->
478-
e.getReason()
479-
.toUpperCase()
480-
.contains(p)))
481-
.collect(Collectors.toList());
482-
483-
if (!abnormalEvents.isEmpty()) {
484-
LOG.warn("Abnormal events detected: " + abnormalEvents);
485-
}
486-
}
487-
488414
private static Savepoint configureSavepoint(
489415
FlinkResourceContext<FlinkDeployment> resourceContext) throws Exception {
490416
// TODO: if the user specified an initialSavepointPath, use it and skip this?
@@ -533,43 +459,8 @@ private boolean isDeploymentReady(
533459
FlinkDeployment deployment,
534460
Context<FlinkBlueGreenDeployment> josdkContext,
535461
FlinkBlueGreenDeploymentStatus deploymentStatus) {
536-
if (ResourceLifecycleState.STABLE == deployment.getStatus().getLifecycleState()
537-
&& JobStatus.RUNNING == deployment.getStatus().getJobStatus().getState()) {
538-
// TODO: checking for running pods seems to be redundant, check if this can be removed
539-
int notRunningPods =
540-
(int)
541-
getDeploymentPods(josdkContext, deployment)
542-
.filter(
543-
p ->
544-
!p.get()
545-
.getStatus()
546-
.getPhase()
547-
.equalsIgnoreCase("RUNNING"))
548-
.count();
549-
550-
if (notRunningPods > 0) {
551-
LOG.warn("Waiting for " + notRunningPods + " Pods to transition to RUNNING status");
552-
}
553-
554-
return notRunningPods == 0;
555-
}
556-
557-
logPotentialWarnings(
558-
deployment, josdkContext, deploymentStatus.getLastReconciledTimestamp());
559-
return false;
560-
}
561-
562-
private static Stream<PodResource> getDeploymentPods(
563-
Context<FlinkBlueGreenDeployment> josdkContext, FlinkDeployment deployment) {
564-
var namespace = deployment.getMetadata().getNamespace();
565-
var deploymentName = deployment.getMetadata().getName();
566-
567-
return josdkContext
568-
.getClient()
569-
.pods()
570-
.inNamespace(namespace)
571-
.withLabel("app", deploymentName)
572-
.resources();
462+
return ResourceLifecycleState.STABLE == deployment.getStatus().getLifecycleState()
463+
&& JobStatus.RUNNING == deployment.getStatus().getJobStatus().getState();
573464
}
574465

575466
private boolean hasSpecChanged(
@@ -578,8 +469,6 @@ private boolean hasSpecChanged(
578469
String lastReconciledSpec = deploymentStatus.getLastReconciledSpec();
579470
String newSpecSerialized = SpecUtils.serializeObject(newSpec, "spec");
580471

581-
// TODO: in FLIP-504 check here the TransitionMode has not been changed
582-
583472
return !lastReconciledSpec.equals(newSpecSerialized);
584473
}
585474

@@ -620,7 +509,7 @@ private void deploy(
620509
bgMeta.getName() + "-" + deploymentType.toString().toLowerCase();
621510

622511
FlinkBlueGreenDeploymentSpec adjustedSpec =
623-
adjustNameReferences(
512+
FlinkBlueGreenDeploymentUtils.adjustNameReferences(
624513
spec,
625514
bgMeta.getName(),
626515
childDeploymentName,
@@ -636,7 +525,8 @@ private void deploy(
636525
flinkDeployment.setSpec(adjustedSpec.getTemplate().getSpec());
637526

638527
// Deployment metadata
639-
ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(bgDeployment);
528+
ObjectMeta flinkDeploymentMeta =
529+
FlinkBlueGreenDeploymentUtils.getDependentObjectMeta(bgDeployment);
640530
flinkDeploymentMeta.setName(childDeploymentName);
641531
flinkDeploymentMeta.setLabels(
642532
Map.of(deploymentType.getClass().getSimpleName(), deploymentType.toString()));
@@ -668,36 +558,7 @@ private static void deleteDeployment(
668558
}
669559
}
670560

671-
private ObjectMeta getDependentObjectMeta(FlinkBlueGreenDeployment bgDeployment) {
672-
ObjectMeta bgMeta = bgDeployment.getMetadata();
673-
ObjectMeta objectMeta = new ObjectMeta();
674-
objectMeta.setNamespace(bgMeta.getNamespace());
675-
objectMeta.setOwnerReferences(
676-
List.of(
677-
new OwnerReference(
678-
bgDeployment.getApiVersion(),
679-
true,
680-
false,
681-
bgDeployment.getKind(),
682-
bgMeta.getName(),
683-
bgMeta.getUid())));
684-
return objectMeta;
685-
}
686-
687-
private static <T> T adjustNameReferences(
688-
T spec,
689-
String deploymentName,
690-
String childDeploymentName,
691-
String wrapperKey,
692-
Class<T> valueType)
693-
throws JsonProcessingException {
694-
String serializedSpec = SpecUtils.serializeObject(spec, wrapperKey);
695-
String replacedSerializedSpec = serializedSpec.replace(deploymentName, childDeploymentName);
696-
return SpecUtils.deserializeObject(replacedSerializedSpec, wrapperKey, valueType);
697-
}
698-
699561
public static void logAndThrow(String message) {
700-
LOG.error(message);
701562
throw new RuntimeException(message);
702563
}
703564
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.controller;
19+
20+
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
21+
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import io.fabric8.kubernetes.api.model.ObjectMeta;
25+
import io.fabric8.kubernetes.api.model.OwnerReference;
26+
27+
import java.util.List;
28+
29+
/** Utility methods for the FlinkBlueGreenDeploymentController. */
30+
public class FlinkBlueGreenDeploymentUtils {
31+
32+
public static ObjectMeta getDependentObjectMeta(FlinkBlueGreenDeployment bgDeployment) {
33+
ObjectMeta bgMeta = bgDeployment.getMetadata();
34+
ObjectMeta objectMeta = new ObjectMeta();
35+
objectMeta.setNamespace(bgMeta.getNamespace());
36+
objectMeta.setOwnerReferences(
37+
List.of(
38+
new OwnerReference(
39+
bgDeployment.getApiVersion(),
40+
true,
41+
false,
42+
bgDeployment.getKind(),
43+
bgMeta.getName(),
44+
bgMeta.getUid())));
45+
return objectMeta;
46+
}
47+
48+
public static <T> T adjustNameReferences(
49+
T spec,
50+
String deploymentName,
51+
String childDeploymentName,
52+
String wrapperKey,
53+
Class<T> valueType)
54+
throws JsonProcessingException {
55+
String serializedSpec = SpecUtils.serializeObject(spec, wrapperKey);
56+
String replacedSerializedSpec = serializedSpec.replace(deploymentName, childDeploymentName);
57+
return SpecUtils.deserializeObject(replacedSerializedSpec, wrapperKey, valueType);
58+
}
59+
}

0 commit comments

Comments
 (0)