@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
14
14
limitations under the License.
15
15
*/
16
16
17
- package kubectl
17
+ package rollingupdate
18
18
19
19
import (
20
20
"fmt"
@@ -34,6 +34,7 @@ import (
34
34
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
35
35
scaleclient "k8s.io/client-go/scale"
36
36
"k8s.io/client-go/util/retry"
37
+ "k8s.io/kubectl/pkg/scale"
37
38
"k8s.io/kubectl/pkg/util"
38
39
deploymentutil "k8s.io/kubectl/pkg/util/deployment"
39
40
"k8s.io/kubectl/pkg/util/podutils"
@@ -128,7 +129,7 @@ type RollingUpdater struct {
128
129
// Namespace for resources
129
130
ns string
130
131
// scaleAndWait scales a controller and returns its updated state.
131
- scaleAndWait func (rc * corev1.ReplicationController , retry * RetryParams , wait * RetryParams ) (* corev1.ReplicationController , error )
132
+ scaleAndWait func (rc * corev1.ReplicationController , retry * scale. RetryParams , wait * scale. RetryParams ) (* corev1.ReplicationController , error )
132
133
//getOrCreateTargetController gets and validates an existing controller or
133
134
//makes a new one.
134
135
getOrCreateTargetController func (controller * corev1.ReplicationController , sourceID string ) (* corev1.ReplicationController , bool , error )
@@ -180,7 +181,7 @@ func NewRollingUpdater(namespace string, rcClient corev1client.ReplicationContro
180
181
func (r * RollingUpdater ) Update (config * RollingUpdaterConfig ) error {
181
182
out := config .Out
182
183
oldRc := config .OldRc
183
- scaleRetryParams := NewRetryParams (config .Interval , config .Timeout )
184
+ scaleRetryParams := scale . NewRetryParams (config .Interval , config .Timeout )
184
185
185
186
// Find an existing controller (for continuing an interrupted update) or
186
187
// create a new one if necessary.
@@ -321,7 +322,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
321
322
// scaleUp scales up newRc to desired by whatever increment is possible given
322
323
// the configured surge threshold. scaleUp will safely no-op as necessary when
323
324
// it detects redundancy or other relevant conditions.
324
- func (r * RollingUpdater ) scaleUp (newRc , oldRc * corev1.ReplicationController , desired , maxSurge , maxUnavailable int32 , scaleRetryParams * RetryParams , config * RollingUpdaterConfig ) (* corev1.ReplicationController , error ) {
325
+ func (r * RollingUpdater ) scaleUp (newRc , oldRc * corev1.ReplicationController , desired , maxSurge , maxUnavailable int32 , scaleRetryParams * scale. RetryParams , config * RollingUpdaterConfig ) (* corev1.ReplicationController , error ) {
325
326
// If we're already at the desired, do nothing.
326
327
if valOrZero (newRc .Spec .Replicas ) == desired {
327
328
return newRc , nil
@@ -398,7 +399,10 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, d
398
399
}
399
400
// Perform the scale-down.
400
401
fmt .Fprintf (config .Out , "Scaling %s down to %d\n " , oldRc .Name , valOrZero (oldRc .Spec .Replicas ))
401
- retryWait := & RetryParams {config .Interval , config .Timeout }
402
+ retryWait := & scale.RetryParams {
403
+ Interval : config .Interval ,
404
+ Timeout : config .Timeout ,
405
+ }
402
406
scaledRc , err := r .scaleAndWait (oldRc , retryWait , retryWait )
403
407
if err != nil {
404
408
return nil , err
@@ -407,9 +411,9 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, d
407
411
}
408
412
409
413
// scalerScaleAndWait scales a controller using a Scaler and a real client.
410
- func (r * RollingUpdater ) scaleAndWaitWithScaler (rc * corev1.ReplicationController , retry * RetryParams , wait * RetryParams ) (* corev1.ReplicationController , error ) {
411
- scaler := NewScaler (r .scaleClient )
412
- if err := scaler .Scale (rc .Namespace , rc .Name , uint (valOrZero (rc .Spec .Replicas )), & ScalePrecondition {- 1 , "" }, retry , wait , schema.GroupResource {Resource : "replicationcontrollers" }); err != nil {
414
+ func (r * RollingUpdater ) scaleAndWaitWithScaler (rc * corev1.ReplicationController , retry * scale. RetryParams , wait * scale. RetryParams ) (* corev1.ReplicationController , error ) {
415
+ scaler := scale . NewScaler (r .scaleClient )
416
+ if err := scaler .Scale (rc .Namespace , rc .Name , uint (valOrZero (rc .Spec .Replicas )), & scale. ScalePrecondition {Size : - 1 , ResourceVersion : "" }, retry , wait , schema.GroupResource {Resource : "replicationcontrollers" }); err != nil {
413
417
return nil , err
414
418
}
415
419
return r .rcClient .ReplicationControllers (rc .Namespace ).Get (rc .Name , metav1.GetOptions {})
@@ -520,7 +524,7 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *corev1.ReplicationCont
520
524
return err
521
525
}
522
526
523
- if err = wait .Poll (config .Interval , config .Timeout , ControllerHasDesiredReplicas (r .rcClient , newRc )); err != nil {
527
+ if err = wait .Poll (config .Interval , config .Timeout , controllerHasDesiredReplicas (r .rcClient , newRc )); err != nil {
524
528
return err
525
529
}
526
530
newRc , err = r .rcClient .ReplicationControllers (r .ns ).Get (newRc .Name , metav1.GetOptions {})
@@ -838,3 +842,24 @@ func FindSourceController(r corev1client.ReplicationControllersGetter, namespace
838
842
}
839
843
return nil , fmt .Errorf ("couldn't find a replication controller with source id == %s/%s" , namespace , name )
840
844
}
845
+
846
+ // controllerHasDesiredReplicas returns a condition that will be true if and only if
847
+ // the desired replica count for a controller's ReplicaSelector equals the Replicas count.
848
+ func controllerHasDesiredReplicas (rcClient corev1client.ReplicationControllersGetter , controller * corev1.ReplicationController ) wait.ConditionFunc {
849
+
850
+ // If we're given a controller where the status lags the spec, it either means that the controller is stale,
851
+ // or that the rc manager hasn't noticed the update yet. Polling status.Replicas is not safe in the latter case.
852
+ desiredGeneration := controller .Generation
853
+
854
+ return func () (bool , error ) {
855
+ ctrl , err := rcClient .ReplicationControllers (controller .Namespace ).Get (controller .Name , metav1.GetOptions {})
856
+ if err != nil {
857
+ return false , err
858
+ }
859
+ // There's a chance a concurrent update modifies the Spec.Replicas causing this check to pass,
860
+ // or, after this check has passed, a modification causes the rc manager to create more pods.
861
+ // This will not be an issue once we've implemented graceful delete for rcs, but till then
862
+ // concurrent stop operations on the same rc might have unintended side effects.
863
+ return ctrl .Status .ObservedGeneration >= desiredGeneration && ctrl .Status .Replicas == valOrZero (ctrl .Spec .Replicas ), nil
864
+ }
865
+ }
0 commit comments