Skip to content

Commit cc9292a

Browse files
committed
lib/resourcebuilder: Replace wait-for with single-shot "is it alive now?"
We've had 'if updated' guards around waitFor*Completion since the library landed in 2d334c2 (lib: add resource builder that allows Do on any lib.Manifest, 2018-08-20, #10). But, only waiting when 'updated' is true is a weak block, because if/when we fail to complete, Task.Run will back-off and call builder.Apply again. That new Apply will see the already-updated object, set 'updated' false, and not wait. So whether we block or not is orthogonal to 'updated'; nobody cares about whether the most recent update happened in this builder.Apply, this sync cycle, or a previous cycle. We don't even care all that much about whether the Deployment, DaemonSet, CustomResourceDefinition, or Job succeeded. Most feedback is going to come from the ClusterOperator, so with this commit we continue past the resource wait-for unless the resource is really hurting, in which case we fail immediately (inside builder.Apply, Task.Run will still hit us a few times) to bubble that up. In situations where we don't see anything too terrible going on, we'll continue on past and later block on ClusterOperator not being ready. The "unknown state" Deployment logging has changed a bit. I'd initially dropped it, but Jack suggested keeping it to make identifying broken-Deployment-controller and similar situations easier [1]. Previously it was logged when we weren't happy with updatedReplicas and unavailableReplicas, nothing obviously bad was happening, and we were not Progressing=True. We no longer check updatedReplicas or unavailableReplicas, so now it's just "nothing obviously bad is happening, but that may just be because the Deployment controller isn't giving us any of the oconditions we look at to judge badness". It's possible that we should also check for "when we do have those conditions, the values are either True or False, not some unexpected key". But I'm leaving that alone for now. There's no object status for CRDs or DaemonSets that marks "we are really hurting". The v1.18.0 Kubernetes CRD and DaemonSet controllers do not set any conditions in their operand status (although the API for those conditions exists [2,3]). With this commit, we have very minimal wait logic for either. Sufficiently unhealthy DaemonSet should be reported on via their associated ClusterOperator, and sufficiently unhealthy CRD should be reported on when we fail to push any custom resources consuming them (Task.Run retries will give the API server time to ready itself after accepting a CRD update before the CVO fails its sync cycle). We still need the public WaitForJobCompletion, because fetchUpdatePayloadToDir uses it to wait on the release download. Also expand "iff" -> "if and only if" while I'm touching that line, at Jack's suggestion [4]. [1]: #400 (comment) [2]: https://github.com/kubernetes/api/blob/v0.18.0/apps/v1/types.go#L586-L590 [3]: https://github.com/kubernetes/apiextensions-apiserver/blob/v0.18.0/pkg/apis/apiextensions/types.go#L319-L320 [4]: #400 (comment)
1 parent 30582e8 commit cc9292a

File tree

3 files changed

+97
-181
lines changed

3 files changed

+97
-181
lines changed

lib/resourcebuilder/apiext.go

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,13 @@ import (
44
"context"
55
"fmt"
66

7-
"k8s.io/klog"
8-
97
"github.com/openshift/cluster-version-operator/lib"
108
"github.com/openshift/cluster-version-operator/lib/resourceapply"
119
"github.com/openshift/cluster-version-operator/lib/resourceread"
1210
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1311
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
1412
apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
1513
apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
16-
"k8s.io/apimachinery/pkg/api/errors"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/util/wait"
1914
"k8s.io/client-go/rest"
2015
)
2116

@@ -46,57 +41,24 @@ func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
4641
func (b *crdBuilder) Do(ctx context.Context) error {
4742
crd := resourceread.ReadCustomResourceDefinitionOrDie(b.raw)
4843

49-
var updated bool
50-
var err error
51-
var name string
52-
5344
switch typedCRD := crd.(type) {
5445
case *apiextv1beta1.CustomResourceDefinition:
5546
if b.modifier != nil {
5647
b.modifier(typedCRD)
5748
}
58-
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD)
59-
if err != nil {
49+
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD); err != nil {
6050
return err
6151
}
62-
name = typedCRD.Name
6352
case *apiextv1.CustomResourceDefinition:
6453
if b.modifier != nil {
6554
b.modifier(typedCRD)
6655
}
67-
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD)
68-
if err != nil {
56+
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD); err != nil {
6957
return err
7058
}
71-
name = typedCRD.Name
7259
default:
7360
return fmt.Errorf("unrecognized CustomResourceDefinition version: %T", crd)
7461
}
7562

76-
if updated {
77-
return waitForCustomResourceDefinitionCompletion(ctx, b.clientV1beta1, name)
78-
}
7963
return nil
8064
}
81-
82-
func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd string) error {
83-
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
84-
c, err := client.CustomResourceDefinitions().Get(ctx, crd, metav1.GetOptions{})
85-
if errors.IsNotFound(err) {
86-
// exit early to recreate the crd.
87-
return false, err
88-
}
89-
if err != nil {
90-
klog.Errorf("error getting CustomResourceDefinition %s: %v", crd, err)
91-
return false, nil
92-
}
93-
94-
for _, condition := range c.Status.Conditions {
95-
if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue {
96-
return true, nil
97-
}
98-
}
99-
klog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", c.Name, c.Status.Conditions)
100-
return false, nil
101-
}, ctx.Done())
102-
}

lib/resourcebuilder/apps.go

Lines changed: 61 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
corev1 "k8s.io/api/core/v1"
1313
"k8s.io/apimachinery/pkg/api/errors"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15-
"k8s.io/apimachinery/pkg/util/wait"
1615
appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
1716
"k8s.io/client-go/rest"
1817
"k8s.io/klog"
@@ -113,103 +112,72 @@ func (b *deploymentBuilder) Do(ctx context.Context) error {
113112
}
114113
}
115114

116-
_, updated, err := resourceapply.ApplyDeployment(ctx, b.client, deployment)
117-
if err != nil {
115+
if _, _, err := resourceapply.ApplyDeployment(ctx, b.client, deployment); err != nil {
118116
return err
119117
}
120-
if updated && b.mode != InitializingMode {
121-
return waitForDeploymentCompletion(ctx, b.client, deployment)
118+
119+
if b.mode != InitializingMode {
120+
return checkDeploymentHealth(ctx, b.client, deployment)
122121
}
123122
return nil
124123
}
125124

126-
func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
125+
func checkDeploymentHealth(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
127126
iden := fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
128-
var lastErr error
129-
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
130-
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
131-
if errors.IsNotFound(err) {
132-
// exit early to recreate the deployment.
133-
return false, err
134-
}
135-
if err != nil {
136-
// Do not return error here, as we could be updating the API Server itself, in which case we
137-
// want to continue waiting.
138-
lastErr = &payload.UpdateError{
139-
Nested: err,
140-
Reason: "WorkloadNotAvailable",
141-
Message: fmt.Sprintf("could not find the deployment %s during rollout", iden),
142-
Name: iden,
143-
}
144-
return false, nil
145-
}
146-
147-
if d.DeletionTimestamp != nil {
148-
return false, fmt.Errorf("Deployment %s is being deleted", iden)
149-
}
150-
151-
if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 {
152-
return true, nil
153-
}
127+
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
128+
if err != nil {
129+
return err
130+
}
154131

155-
var availableCondition *appsv1.DeploymentCondition
156-
var progressingCondition *appsv1.DeploymentCondition
157-
var replicafailureCondition *appsv1.DeploymentCondition
158-
for idx, dc := range d.Status.Conditions {
159-
switch dc.Type {
160-
case appsv1.DeploymentProgressing:
161-
progressingCondition = &d.Status.Conditions[idx]
162-
case appsv1.DeploymentAvailable:
163-
availableCondition = &d.Status.Conditions[idx]
164-
case appsv1.DeploymentReplicaFailure:
165-
replicafailureCondition = &d.Status.Conditions[idx]
166-
}
167-
}
132+
if d.DeletionTimestamp != nil {
133+
return fmt.Errorf("deployment %s is being deleted", iden)
134+
}
168135

169-
if replicafailureCondition != nil && replicafailureCondition.Status == corev1.ConditionTrue {
170-
lastErr = &payload.UpdateError{
171-
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
172-
Reason: "WorkloadNotProgressing",
173-
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicafailureCondition.Reason, replicafailureCondition.Message),
174-
Name: iden,
175-
}
176-
return false, nil
136+
var availableCondition *appsv1.DeploymentCondition
137+
var progressingCondition *appsv1.DeploymentCondition
138+
var replicaFailureCondition *appsv1.DeploymentCondition
139+
for idx, dc := range d.Status.Conditions {
140+
switch dc.Type {
141+
case appsv1.DeploymentProgressing:
142+
progressingCondition = &d.Status.Conditions[idx]
143+
case appsv1.DeploymentAvailable:
144+
availableCondition = &d.Status.Conditions[idx]
145+
case appsv1.DeploymentReplicaFailure:
146+
replicaFailureCondition = &d.Status.Conditions[idx]
177147
}
148+
}
178149

179-
if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
180-
lastErr = &payload.UpdateError{
181-
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
182-
Reason: "WorkloadNotAvailable",
183-
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
184-
Name: iden,
185-
}
186-
return false, nil
150+
if replicaFailureCondition != nil && replicaFailureCondition.Status == corev1.ConditionTrue {
151+
return &payload.UpdateError{
152+
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
153+
Reason: "WorkloadNotProgressing",
154+
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicaFailureCondition.Reason, replicaFailureCondition.Message),
155+
Name: iden,
187156
}
157+
}
188158

189-
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse && progressingCondition.Reason == "ProgressDeadlineExceeded" {
190-
lastErr = &payload.UpdateError{
191-
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
192-
Reason: "WorkloadNotAvailable",
193-
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
194-
Name: iden,
195-
}
196-
return false, nil
159+
if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
160+
return &payload.UpdateError{
161+
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
162+
Reason: "WorkloadNotAvailable",
163+
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
164+
Name: iden,
197165
}
166+
}
198167

199-
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionTrue {
200-
klog.V(4).Infof("deployment %s is progressing", iden)
201-
return false, nil
168+
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse {
169+
return &payload.UpdateError{
170+
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
171+
Reason: "WorkloadNotAvailable",
172+
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
173+
Name: iden,
202174
}
175+
}
203176

204-
klog.Errorf("deployment %s is in unknown state", iden)
205-
return false, nil
206-
}, ctx.Done())
207-
if err != nil {
208-
if err == wait.ErrWaitTimeout && lastErr != nil {
209-
return lastErr
210-
}
211-
return err
177+
if availableCondition == nil && progressingCondition == nil && replicaFailureCondition == nil {
178+
klog.Warningf("deployment %s is not setting any expected conditions, and is therefore in an unknown state", iden)
212179
}
180+
213181
return nil
214182
}
215183

@@ -264,52 +232,28 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error {
264232
}
265233
}
266234

267-
_, updated, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset)
268-
if err != nil {
235+
if _, _, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset); err != nil {
269236
return err
270237
}
271-
if updated && b.mode != InitializingMode {
272-
return waitForDaemonsetRollout(ctx, b.client, daemonset)
238+
239+
if b.mode != InitializingMode {
240+
return checkDaemonSetHealth(ctx, b.client, daemonset)
273241
}
242+
274243
return nil
275244
}
276245

277-
func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
246+
func checkDaemonSetHealth(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
278247
iden := fmt.Sprintf("%s/%s", daemonset.Namespace, daemonset.Name)
279-
var lastErr error
280-
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
281-
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
282-
if errors.IsNotFound(err) {
283-
// exit early to recreate the daemonset.
284-
return false, err
285-
}
286-
if err != nil {
287-
// Do not return error here, as we could be updating the API Server itself, in which case we
288-
// want to continue waiting.
289-
lastErr = &payload.UpdateError{
290-
Nested: err,
291-
Reason: "WorkloadNotAvailable",
292-
Message: fmt.Sprintf("could not find the daemonset %s during rollout", iden),
293-
Name: iden,
294-
}
295-
return false, nil
296-
}
297-
298-
if d.DeletionTimestamp != nil {
299-
return false, fmt.Errorf("Daemonset %s is being deleted", daemonset.Name)
300-
}
301-
302-
if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedNumberScheduled == d.Status.DesiredNumberScheduled && d.Status.NumberUnavailable == 0 {
303-
return true, nil
304-
}
305-
klog.V(4).Infof("daemonset %s is progressing", iden)
306-
return false, nil
307-
}, ctx.Done())
248+
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
308249
if err != nil {
309-
if err == wait.ErrWaitTimeout && lastErr != nil {
310-
return lastErr
311-
}
312250
return err
313251
}
252+
253+
if d.DeletionTimestamp != nil {
254+
return fmt.Errorf("daemonset %s is being deleted", iden)
255+
}
256+
257+
// Kubernetes DaemonSet controller doesn't set status conditions yet (v1.18.0), so nothing more to check.
314258
return nil
315259
}

lib/resourcebuilder/batch.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"k8s.io/klog"
8-
97
"github.com/openshift/cluster-version-operator/lib"
108
"github.com/openshift/cluster-version-operator/lib/resourceapply"
119
"github.com/openshift/cluster-version-operator/lib/resourceread"
@@ -14,6 +12,7 @@ import (
1412
"k8s.io/apimachinery/pkg/util/wait"
1513
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
1614
"k8s.io/client-go/rest"
15+
"k8s.io/klog"
1716
)
1817

1918
type jobBuilder struct {
@@ -45,40 +44,51 @@ func (b *jobBuilder) Do(ctx context.Context) error {
4544
if b.modifier != nil {
4645
b.modifier(job)
4746
}
48-
_, updated, err := resourceapply.ApplyJob(ctx, b.client, job)
49-
if err != nil {
47+
if _, _, err := resourceapply.ApplyJob(ctx, b.client, job); err != nil {
5048
return err
5149
}
52-
if updated && b.mode != InitializingMode {
53-
return WaitForJobCompletion(ctx, b.client, job)
50+
if b.mode != InitializingMode {
51+
_, err := checkJobHealth(ctx, b.client, job)
52+
return err
5453
}
5554
return nil
5655
}
5756

5857
// WaitForJobCompletion waits for job to complete.
5958
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
6059
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
61-
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
62-
if err != nil {
63-
klog.Errorf("error getting Job %s: %v", job.Name, err)
60+
if done, err := checkJobHealth(ctx, client, job); err != nil {
61+
klog.Error(err)
62+
return false, nil
63+
} else if !done {
64+
klog.V(4).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Namespace, job.ObjectMeta.Name)
6465
return false, nil
6566
}
67+
return true, nil
68+
}, ctx.Done())
69+
}
6670

67-
if j.Status.Succeeded > 0 {
68-
return true, nil
69-
}
71+
// checkJobHealth returns an error if the job status is bad enough to block further manifest application.
72+
func checkJobHealth(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) (bool, error) {
73+
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
74+
if err != nil {
75+
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
76+
}
7077

71-
// Since we have filled in "activeDeadlineSeconds",
72-
// the Job will 'Active == 0' iff it exceeds the deadline.
73-
// Failed jobs will be recreated in the next run.
74-
if j.Status.Active == 0 && j.Status.Failed > 0 {
75-
reason := "DeadlineExceeded"
76-
message := "Job was active longer than specified deadline"
77-
if len(j.Status.Conditions) > 0 {
78-
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
79-
}
80-
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
78+
if j.Status.Succeeded > 0 {
79+
return true, nil
80+
}
81+
82+
// Since we have filled in "activeDeadlineSeconds",
83+
// the Job will 'Active == 0' if and only if it exceeds the deadline.
84+
// Failed jobs will be recreated in the next run.
85+
if j.Status.Active == 0 && j.Status.Failed > 0 {
86+
reason := "DeadlineExceeded"
87+
message := "Job was active longer than specified deadline"
88+
if len(j.Status.Conditions) > 0 {
89+
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
8190
}
82-
return false, nil
83-
}, ctx.Done())
91+
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
92+
}
93+
return false, nil
8494
}

0 commit comments

Comments
 (0)