@@ -17,6 +17,7 @@ limitations under the License.
17
17
package apps
18
18
19
19
import (
20
+ "context"
20
21
"fmt"
21
22
"math/rand"
22
23
"time"
@@ -36,6 +37,7 @@ import (
36
37
"k8s.io/apimachinery/pkg/util/wait"
37
38
"k8s.io/apimachinery/pkg/watch"
38
39
clientset "k8s.io/client-go/kubernetes"
40
+ watchtools "k8s.io/client-go/tools/watch"
39
41
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
40
42
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
41
43
"k8s.io/kubernetes/test/e2e/framework"
@@ -48,6 +50,7 @@ import (
48
50
)
49
51
50
52
const (
53
+ poll = 2 * time .Second
51
54
dRetryPeriod = 2 * time .Second
52
55
dRetryTimeout = 5 * time .Minute
53
56
)
@@ -333,7 +336,7 @@ func testRecreateDeployment(f *framework.Framework) {
333
336
framework .ExpectNoError (err )
334
337
335
338
framework .Logf ("Watching deployment %q to verify that new pods will not run with olds pods" , deploymentName )
336
- err = e2edeploy . WatchRecreateDeployment (c , deployment )
339
+ err = watchRecreateDeployment (c , deployment )
337
340
framework .ExpectNoError (err )
338
341
}
339
342
@@ -403,7 +406,7 @@ func testDeploymentCleanUpPolicy(f *framework.Framework) {
403
406
framework .ExpectNoError (err )
404
407
405
408
ginkgo .By (fmt .Sprintf ("Waiting for deployment %s history to be cleaned up" , deploymentName ))
406
- err = e2edeploy . WaitForDeploymentOldRSsNum (c , ns , deploymentName , int (* revisionHistoryLimit ))
409
+ err = waitForDeploymentOldRSsNum (c , ns , deploymentName , int (* revisionHistoryLimit ))
407
410
framework .ExpectNoError (err )
408
411
}
409
412
@@ -1018,3 +1021,71 @@ func setAffinities(d *appsv1.Deployment, setAffinity bool) {
1018
1021
}
1019
1022
d .Spec .Template .Spec .Affinity = affinity
1020
1023
}
1024
+
1025
+ // watchRecreateDeployment watches Recreate deployments and ensures no new pods will run at the same time with
1026
+ // old pods.
1027
+ func watchRecreateDeployment (c clientset.Interface , d * appsv1.Deployment ) error {
1028
+ if d .Spec .Strategy .Type != appsv1 .RecreateDeploymentStrategyType {
1029
+ return fmt .Errorf ("deployment %q does not use a Recreate strategy: %s" , d .Name , d .Spec .Strategy .Type )
1030
+ }
1031
+
1032
+ w , err := c .AppsV1 ().Deployments (d .Namespace ).Watch (metav1 .SingleObject (metav1.ObjectMeta {Name : d .Name , ResourceVersion : d .ResourceVersion }))
1033
+ if err != nil {
1034
+ return err
1035
+ }
1036
+
1037
+ status := d .Status
1038
+
1039
+ condition := func (event watch.Event ) (bool , error ) {
1040
+ d := event .Object .(* appsv1.Deployment )
1041
+ status = d .Status
1042
+
1043
+ if d .Status .UpdatedReplicas > 0 && d .Status .Replicas != d .Status .UpdatedReplicas {
1044
+ _ , allOldRSs , err := deploymentutil .GetOldReplicaSets (d , c .AppsV1 ())
1045
+ newRS , nerr := deploymentutil .GetNewReplicaSet (d , c .AppsV1 ())
1046
+ if err == nil && nerr == nil {
1047
+ framework .Logf ("%+v" , d )
1048
+ testutil .LogReplicaSetsOfDeployment (d , allOldRSs , newRS , framework .Logf )
1049
+ testutil .LogPodsOfDeployment (c , d , append (allOldRSs , newRS ), framework .Logf )
1050
+ }
1051
+ return false , fmt .Errorf ("deployment %q is running new pods alongside old pods: %#v" , d .Name , status )
1052
+ }
1053
+
1054
+ return * (d .Spec .Replicas ) == d .Status .Replicas &&
1055
+ * (d .Spec .Replicas ) == d .Status .UpdatedReplicas &&
1056
+ d .Generation <= d .Status .ObservedGeneration , nil
1057
+ }
1058
+
1059
+ ctx , cancel := context .WithTimeout (context .Background (), 2 * time .Minute )
1060
+ defer cancel ()
1061
+ _ , err = watchtools .UntilWithoutRetry (ctx , w , condition )
1062
+ if err == wait .ErrWaitTimeout {
1063
+ err = fmt .Errorf ("deployment %q never completed: %#v" , d .Name , status )
1064
+ }
1065
+ return err
1066
+ }
1067
+
1068
+ // waitForDeploymentOldRSsNum waits for the deployment to clean up old rcs.
1069
+ func waitForDeploymentOldRSsNum (c clientset.Interface , ns , deploymentName string , desiredRSNum int ) error {
1070
+ var oldRSs []* appsv1.ReplicaSet
1071
+ var d * appsv1.Deployment
1072
+
1073
+ pollErr := wait .PollImmediate (poll , 5 * time .Minute , func () (bool , error ) {
1074
+ deployment , err := c .AppsV1 ().Deployments (ns ).Get (deploymentName , metav1.GetOptions {})
1075
+ if err != nil {
1076
+ return false , err
1077
+ }
1078
+ d = deployment
1079
+
1080
+ _ , oldRSs , err = deploymentutil .GetOldReplicaSets (deployment , c .AppsV1 ())
1081
+ if err != nil {
1082
+ return false , err
1083
+ }
1084
+ return len (oldRSs ) == desiredRSNum , nil
1085
+ })
1086
+ if pollErr == wait .ErrWaitTimeout {
1087
+ pollErr = fmt .Errorf ("%d old replica sets were not cleaned up for deployment %q" , len (oldRSs )- desiredRSNum , deploymentName )
1088
+ testutil .LogReplicaSetsOfDeployment (d , oldRSs , nil , framework .Logf )
1089
+ }
1090
+ return pollErr
1091
+ }
0 commit comments