Skip to content

Commit 7141c36

Browse files
Merge pull request #400 from wking/drop-most-manifest-waits
lib/resourcebuilder: Replace wait-for with single-shot "is it alive now?"
2 parents 30582e8 + cc9292a commit 7141c36

File tree

3 files changed

+97
-181
lines changed

3 files changed

+97
-181
lines changed

lib/resourcebuilder/apiext.go

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,13 @@ import (
44
"context"
55
"fmt"
66

7-
"k8s.io/klog"
8-
97
"github.com/openshift/cluster-version-operator/lib"
108
"github.com/openshift/cluster-version-operator/lib/resourceapply"
119
"github.com/openshift/cluster-version-operator/lib/resourceread"
1210
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1311
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
1412
apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
1513
apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
16-
"k8s.io/apimachinery/pkg/api/errors"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/util/wait"
1914
"k8s.io/client-go/rest"
2015
)
2116

@@ -46,57 +41,24 @@ func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
4641
func (b *crdBuilder) Do(ctx context.Context) error {
4742
crd := resourceread.ReadCustomResourceDefinitionOrDie(b.raw)
4843

49-
var updated bool
50-
var err error
51-
var name string
52-
5344
switch typedCRD := crd.(type) {
5445
case *apiextv1beta1.CustomResourceDefinition:
5546
if b.modifier != nil {
5647
b.modifier(typedCRD)
5748
}
58-
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD)
59-
if err != nil {
49+
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD); err != nil {
6050
return err
6151
}
62-
name = typedCRD.Name
6352
case *apiextv1.CustomResourceDefinition:
6453
if b.modifier != nil {
6554
b.modifier(typedCRD)
6655
}
67-
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD)
68-
if err != nil {
56+
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD); err != nil {
6957
return err
7058
}
71-
name = typedCRD.Name
7259
default:
7360
return fmt.Errorf("unrecognized CustomResourceDefinition version: %T", crd)
7461
}
7562

76-
if updated {
77-
return waitForCustomResourceDefinitionCompletion(ctx, b.clientV1beta1, name)
78-
}
7963
return nil
8064
}
81-
82-
func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd string) error {
83-
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
84-
c, err := client.CustomResourceDefinitions().Get(ctx, crd, metav1.GetOptions{})
85-
if errors.IsNotFound(err) {
86-
// exit early to recreate the crd.
87-
return false, err
88-
}
89-
if err != nil {
90-
klog.Errorf("error getting CustomResourceDefinition %s: %v", crd, err)
91-
return false, nil
92-
}
93-
94-
for _, condition := range c.Status.Conditions {
95-
if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue {
96-
return true, nil
97-
}
98-
}
99-
klog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", c.Name, c.Status.Conditions)
100-
return false, nil
101-
}, ctx.Done())
102-
}

lib/resourcebuilder/apps.go

Lines changed: 61 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
corev1 "k8s.io/api/core/v1"
1313
"k8s.io/apimachinery/pkg/api/errors"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15-
"k8s.io/apimachinery/pkg/util/wait"
1615
appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
1716
"k8s.io/client-go/rest"
1817
"k8s.io/klog"
@@ -113,103 +112,72 @@ func (b *deploymentBuilder) Do(ctx context.Context) error {
113112
}
114113
}
115114

116-
_, updated, err := resourceapply.ApplyDeployment(ctx, b.client, deployment)
117-
if err != nil {
115+
if _, _, err := resourceapply.ApplyDeployment(ctx, b.client, deployment); err != nil {
118116
return err
119117
}
120-
if updated && b.mode != InitializingMode {
121-
return waitForDeploymentCompletion(ctx, b.client, deployment)
118+
119+
if b.mode != InitializingMode {
120+
return checkDeploymentHealth(ctx, b.client, deployment)
122121
}
123122
return nil
124123
}
125124

126-
func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
125+
func checkDeploymentHealth(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
127126
iden := fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
128-
var lastErr error
129-
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
130-
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
131-
if errors.IsNotFound(err) {
132-
// exit early to recreate the deployment.
133-
return false, err
134-
}
135-
if err != nil {
136-
// Do not return error here, as we could be updating the API Server itself, in which case we
137-
// want to continue waiting.
138-
lastErr = &payload.UpdateError{
139-
Nested: err,
140-
Reason: "WorkloadNotAvailable",
141-
Message: fmt.Sprintf("could not find the deployment %s during rollout", iden),
142-
Name: iden,
143-
}
144-
return false, nil
145-
}
146-
147-
if d.DeletionTimestamp != nil {
148-
return false, fmt.Errorf("Deployment %s is being deleted", iden)
149-
}
150-
151-
if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 {
152-
return true, nil
153-
}
127+
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
128+
if err != nil {
129+
return err
130+
}
154131

155-
var availableCondition *appsv1.DeploymentCondition
156-
var progressingCondition *appsv1.DeploymentCondition
157-
var replicafailureCondition *appsv1.DeploymentCondition
158-
for idx, dc := range d.Status.Conditions {
159-
switch dc.Type {
160-
case appsv1.DeploymentProgressing:
161-
progressingCondition = &d.Status.Conditions[idx]
162-
case appsv1.DeploymentAvailable:
163-
availableCondition = &d.Status.Conditions[idx]
164-
case appsv1.DeploymentReplicaFailure:
165-
replicafailureCondition = &d.Status.Conditions[idx]
166-
}
167-
}
132+
if d.DeletionTimestamp != nil {
133+
return fmt.Errorf("deployment %s is being deleted", iden)
134+
}
168135

169-
if replicafailureCondition != nil && replicafailureCondition.Status == corev1.ConditionTrue {
170-
lastErr = &payload.UpdateError{
171-
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
172-
Reason: "WorkloadNotProgressing",
173-
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicafailureCondition.Reason, replicafailureCondition.Message),
174-
Name: iden,
175-
}
176-
return false, nil
136+
var availableCondition *appsv1.DeploymentCondition
137+
var progressingCondition *appsv1.DeploymentCondition
138+
var replicaFailureCondition *appsv1.DeploymentCondition
139+
for idx, dc := range d.Status.Conditions {
140+
switch dc.Type {
141+
case appsv1.DeploymentProgressing:
142+
progressingCondition = &d.Status.Conditions[idx]
143+
case appsv1.DeploymentAvailable:
144+
availableCondition = &d.Status.Conditions[idx]
145+
case appsv1.DeploymentReplicaFailure:
146+
replicaFailureCondition = &d.Status.Conditions[idx]
177147
}
148+
}
178149

179-
if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
180-
lastErr = &payload.UpdateError{
181-
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
182-
Reason: "WorkloadNotAvailable",
183-
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
184-
Name: iden,
185-
}
186-
return false, nil
150+
if replicaFailureCondition != nil && replicaFailureCondition.Status == corev1.ConditionTrue {
151+
return &payload.UpdateError{
152+
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
153+
Reason: "WorkloadNotProgressing",
154+
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicaFailureCondition.Reason, replicaFailureCondition.Message),
155+
Name: iden,
187156
}
157+
}
188158

189-
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse && progressingCondition.Reason == "ProgressDeadlineExceeded" {
190-
lastErr = &payload.UpdateError{
191-
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
192-
Reason: "WorkloadNotAvailable",
193-
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
194-
Name: iden,
195-
}
196-
return false, nil
159+
if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
160+
return &payload.UpdateError{
161+
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
162+
Reason: "WorkloadNotAvailable",
163+
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
164+
Name: iden,
197165
}
166+
}
198167

199-
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionTrue {
200-
klog.V(4).Infof("deployment %s is progressing", iden)
201-
return false, nil
168+
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse {
169+
return &payload.UpdateError{
170+
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
171+
Reason: "WorkloadNotAvailable",
172+
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
173+
Name: iden,
202174
}
175+
}
203176

204-
klog.Errorf("deployment %s is in unknown state", iden)
205-
return false, nil
206-
}, ctx.Done())
207-
if err != nil {
208-
if err == wait.ErrWaitTimeout && lastErr != nil {
209-
return lastErr
210-
}
211-
return err
177+
if availableCondition == nil && progressingCondition == nil && replicaFailureCondition == nil {
178+
klog.Warningf("deployment %s is not setting any expected conditions, and is therefore in an unknown state", iden)
212179
}
180+
213181
return nil
214182
}
215183

@@ -264,52 +232,28 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error {
264232
}
265233
}
266234

267-
_, updated, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset)
268-
if err != nil {
235+
if _, _, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset); err != nil {
269236
return err
270237
}
271-
if updated && b.mode != InitializingMode {
272-
return waitForDaemonsetRollout(ctx, b.client, daemonset)
238+
239+
if b.mode != InitializingMode {
240+
return checkDaemonSetHealth(ctx, b.client, daemonset)
273241
}
242+
274243
return nil
275244
}
276245

277-
func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
246+
func checkDaemonSetHealth(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
278247
iden := fmt.Sprintf("%s/%s", daemonset.Namespace, daemonset.Name)
279-
var lastErr error
280-
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
281-
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
282-
if errors.IsNotFound(err) {
283-
// exit early to recreate the daemonset.
284-
return false, err
285-
}
286-
if err != nil {
287-
// Do not return error here, as we could be updating the API Server itself, in which case we
288-
// want to continue waiting.
289-
lastErr = &payload.UpdateError{
290-
Nested: err,
291-
Reason: "WorkloadNotAvailable",
292-
Message: fmt.Sprintf("could not find the daemonset %s during rollout", iden),
293-
Name: iden,
294-
}
295-
return false, nil
296-
}
297-
298-
if d.DeletionTimestamp != nil {
299-
return false, fmt.Errorf("Daemonset %s is being deleted", daemonset.Name)
300-
}
301-
302-
if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedNumberScheduled == d.Status.DesiredNumberScheduled && d.Status.NumberUnavailable == 0 {
303-
return true, nil
304-
}
305-
klog.V(4).Infof("daemonset %s is progressing", iden)
306-
return false, nil
307-
}, ctx.Done())
248+
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
308249
if err != nil {
309-
if err == wait.ErrWaitTimeout && lastErr != nil {
310-
return lastErr
311-
}
312250
return err
313251
}
252+
253+
if d.DeletionTimestamp != nil {
254+
return fmt.Errorf("daemonset %s is being deleted", iden)
255+
}
256+
257+
// Kubernetes DaemonSet controller doesn't set status conditions yet (v1.18.0), so nothing more to check.
314258
return nil
315259
}

lib/resourcebuilder/batch.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"k8s.io/klog"
8-
97
"github.com/openshift/cluster-version-operator/lib"
108
"github.com/openshift/cluster-version-operator/lib/resourceapply"
119
"github.com/openshift/cluster-version-operator/lib/resourceread"
@@ -14,6 +12,7 @@ import (
1412
"k8s.io/apimachinery/pkg/util/wait"
1513
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
1614
"k8s.io/client-go/rest"
15+
"k8s.io/klog"
1716
)
1817

1918
type jobBuilder struct {
@@ -45,40 +44,51 @@ func (b *jobBuilder) Do(ctx context.Context) error {
4544
if b.modifier != nil {
4645
b.modifier(job)
4746
}
48-
_, updated, err := resourceapply.ApplyJob(ctx, b.client, job)
49-
if err != nil {
47+
if _, _, err := resourceapply.ApplyJob(ctx, b.client, job); err != nil {
5048
return err
5149
}
52-
if updated && b.mode != InitializingMode {
53-
return WaitForJobCompletion(ctx, b.client, job)
50+
if b.mode != InitializingMode {
51+
_, err := checkJobHealth(ctx, b.client, job)
52+
return err
5453
}
5554
return nil
5655
}
5756

5857
// WaitForJobCompletion waits for job to complete.
5958
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
6059
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
61-
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
62-
if err != nil {
63-
klog.Errorf("error getting Job %s: %v", job.Name, err)
60+
if done, err := checkJobHealth(ctx, client, job); err != nil {
61+
klog.Error(err)
62+
return false, nil
63+
} else if !done {
64+
klog.V(4).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Namespace, job.ObjectMeta.Name)
6465
return false, nil
6566
}
67+
return true, nil
68+
}, ctx.Done())
69+
}
6670

67-
if j.Status.Succeeded > 0 {
68-
return true, nil
69-
}
71+
// checkJobHealth returns an error if the job status is bad enough to block further manifest application.
72+
func checkJobHealth(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) (bool, error) {
73+
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
74+
if err != nil {
75+
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
76+
}
7077

71-
// Since we have filled in "activeDeadlineSeconds",
72-
// the Job will 'Active == 0' iff it exceeds the deadline.
73-
// Failed jobs will be recreated in the next run.
74-
if j.Status.Active == 0 && j.Status.Failed > 0 {
75-
reason := "DeadlineExceeded"
76-
message := "Job was active longer than specified deadline"
77-
if len(j.Status.Conditions) > 0 {
78-
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
79-
}
80-
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
78+
if j.Status.Succeeded > 0 {
79+
return true, nil
80+
}
81+
82+
// Since we have filled in "activeDeadlineSeconds",
83+
// the Job will 'Active == 0' if and only if it exceeds the deadline.
84+
// Failed jobs will be recreated in the next run.
85+
if j.Status.Active == 0 && j.Status.Failed > 0 {
86+
reason := "DeadlineExceeded"
87+
message := "Job was active longer than specified deadline"
88+
if len(j.Status.Conditions) > 0 {
89+
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
8190
}
82-
return false, nil
83-
}, ctx.Done())
91+
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
92+
}
93+
return false, nil
8494
}

0 commit comments

Comments
 (0)