Skip to content

Commit 6eec9d6

Browse files
committed
kubectl wait: split condition functions into separate files
1 parent f35d8a4 commit 6eec9d6

File tree

4 files changed

+461
-361
lines changed

4 files changed

+461
-361
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package wait
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
"strings"
25+
"time"
26+
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30+
"k8s.io/apimachinery/pkg/fields"
31+
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/util/wait"
33+
"k8s.io/apimachinery/pkg/watch"
34+
"k8s.io/cli-runtime/pkg/resource"
35+
"k8s.io/client-go/tools/cache"
36+
watchtools "k8s.io/client-go/tools/watch"
37+
"k8s.io/kubectl/pkg/util/interrupt"
38+
)
39+
40+
// ConditionalWait hold information to check an API status condition
41+
type ConditionalWait struct {
42+
conditionName string
43+
conditionStatus string
44+
// errOut is written to if an error occurs
45+
errOut io.Writer
46+
}
47+
48+
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
49+
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
50+
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
51+
}
52+
53+
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
54+
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
55+
if err != nil {
56+
return false, err
57+
}
58+
if !found {
59+
return false, nil
60+
}
61+
for _, conditionUncast := range conditions {
62+
condition := conditionUncast.(map[string]interface{})
63+
name, found, err := unstructured.NestedString(condition, "type")
64+
if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
65+
continue
66+
}
67+
status, found, err := unstructured.NestedString(condition, "status")
68+
if !found || err != nil {
69+
continue
70+
}
71+
generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
72+
if found {
73+
observedGeneration, found := getObservedGeneration(obj, condition)
74+
if found && observedGeneration < generation {
75+
return false, nil
76+
}
77+
}
78+
return strings.EqualFold(status, w.conditionStatus), nil
79+
}
80+
81+
return false, nil
82+
}
83+
84+
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
85+
if event.Type == watch.Error {
86+
// keep waiting in the event we see an error - we expect the watch to be closed by
87+
// the server
88+
err := apierrors.FromObject(event.Object)
89+
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
90+
return false, nil
91+
}
92+
if event.Type == watch.Deleted {
93+
// this will chain back out, result in another get and an return false back up the chain
94+
return false, nil
95+
}
96+
obj := event.Object.(*unstructured.Unstructured)
97+
return w.checkCondition(obj)
98+
}
99+
100+
type isCondMetFunc func(event watch.Event) (bool, error)
101+
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
102+
103+
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
104+
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
105+
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
106+
if len(info.Name) == 0 {
107+
return info.Object, false, fmt.Errorf("resource name must be provided")
108+
}
109+
110+
endTime := time.Now().Add(o.Timeout)
111+
timeout := time.Until(endTime)
112+
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
113+
if o.Timeout == 0 {
114+
// If timeout is zero we will fetch the object(s) once only and check
115+
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
116+
if initObjGetErr != nil {
117+
return nil, false, initObjGetErr
118+
}
119+
if gottenObj == nil {
120+
return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
121+
}
122+
conditionCheck, err := check(gottenObj)
123+
if err != nil {
124+
return gottenObj, false, err
125+
}
126+
if !conditionCheck {
127+
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
128+
}
129+
return gottenObj, true, nil
130+
}
131+
if timeout < 0 {
132+
// we're out of time
133+
return info.Object, false, errWaitTimeoutWithName
134+
}
135+
136+
mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
137+
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
138+
lw := &cache.ListWatch{
139+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
140+
options.FieldSelector = fieldSelector
141+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
142+
},
143+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
144+
options.FieldSelector = fieldSelector
145+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
146+
},
147+
}
148+
149+
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
150+
preconditionFunc := func(store cache.Store) (bool, error) {
151+
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
152+
if err != nil {
153+
return true, err
154+
}
155+
if !exists {
156+
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
157+
}
158+
159+
return false, nil
160+
}
161+
162+
intrCtx, cancel := context.WithCancel(ctx)
163+
defer cancel()
164+
var result runtime.Object
165+
intr := interrupt.New(nil, cancel)
166+
err := intr.Run(func() error {
167+
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
168+
if ev != nil {
169+
result = ev.Object
170+
}
171+
if errors.Is(err, context.DeadlineExceeded) {
172+
return errWaitTimeoutWithName
173+
}
174+
return err
175+
})
176+
if err != nil {
177+
if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
178+
return result, false, errWaitTimeoutWithName
179+
}
180+
return result, false, err
181+
}
182+
183+
return result, true, nil
184+
}
185+
186+
func extendErrWaitTimeout(err error, info *resource.Info) error {
187+
return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
188+
}
189+
190+
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
191+
conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
192+
if found {
193+
return conditionObservedGeneration, true
194+
}
195+
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
196+
return statusObservedGeneration, found
197+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package wait
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
"time"
25+
26+
apierrors "k8s.io/apimachinery/pkg/api/errors"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/fields"
30+
"k8s.io/apimachinery/pkg/runtime"
31+
"k8s.io/apimachinery/pkg/util/wait"
32+
"k8s.io/apimachinery/pkg/watch"
33+
"k8s.io/cli-runtime/pkg/resource"
34+
"k8s.io/client-go/tools/cache"
35+
watchtools "k8s.io/client-go/tools/watch"
36+
"k8s.io/kubectl/pkg/util/interrupt"
37+
)
38+
39+
// IsDeleted is a condition func for waiting for something to be deleted
40+
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
41+
if len(info.Name) == 0 {
42+
return info.Object, false, fmt.Errorf("resource name must be provided")
43+
}
44+
45+
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(ctx, info.Name, metav1.GetOptions{})
46+
if apierrors.IsNotFound(initObjGetErr) {
47+
return info.Object, true, nil
48+
}
49+
if initObjGetErr != nil {
50+
// TODO this could do something slightly fancier if we wish
51+
return info.Object, false, initObjGetErr
52+
}
53+
resourceLocation := ResourceLocation{
54+
GroupResource: info.Mapping.Resource.GroupResource(),
55+
Namespace: gottenObj.GetNamespace(),
56+
Name: gottenObj.GetName(),
57+
}
58+
if uid, ok := o.UIDMap[resourceLocation]; ok {
59+
if gottenObj.GetUID() != uid {
60+
return gottenObj, true, nil
61+
}
62+
}
63+
64+
endTime := time.Now().Add(o.Timeout)
65+
timeout := time.Until(endTime)
66+
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
67+
if o.Timeout == 0 {
68+
// If timeout is zero check if the object exists once only
69+
if gottenObj == nil {
70+
return nil, true, nil
71+
}
72+
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
73+
}
74+
if timeout < 0 {
75+
// we're out of time
76+
return info.Object, false, errWaitTimeoutWithName
77+
}
78+
79+
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
80+
lw := &cache.ListWatch{
81+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
82+
options.FieldSelector = fieldSelector
83+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx, options)
84+
},
85+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
86+
options.FieldSelector = fieldSelector
87+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx, options)
88+
},
89+
}
90+
91+
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
92+
preconditionFunc := func(store cache.Store) (bool, error) {
93+
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
94+
if err != nil {
95+
return true, err
96+
}
97+
if !exists {
98+
// since we're looking for it to disappear we just return here if it no longer exists
99+
return true, nil
100+
}
101+
102+
return false, nil
103+
}
104+
105+
intrCtx, cancel := context.WithCancel(ctx)
106+
defer cancel()
107+
intr := interrupt.New(nil, cancel)
108+
err := intr.Run(func() error {
109+
_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
110+
if errors.Is(err, context.DeadlineExceeded) {
111+
return errWaitTimeoutWithName
112+
}
113+
return err
114+
})
115+
if err != nil {
116+
if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
117+
return gottenObj, false, errWaitTimeoutWithName
118+
}
119+
return gottenObj, false, err
120+
}
121+
122+
return gottenObj, true, nil
123+
}
124+
125+
// Wait has helper methods for handling watches, including error handling.
126+
type Wait struct {
127+
errOut io.Writer
128+
}
129+
130+
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
131+
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
132+
switch event.Type {
133+
case watch.Error:
134+
// keep waiting in the event we see an error - we expect the watch to be closed by
135+
// the server if the error is unrecoverable.
136+
err := apierrors.FromObject(event.Object)
137+
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
138+
return false, nil
139+
case watch.Deleted:
140+
return true, nil
141+
default:
142+
return false, nil
143+
}
144+
}

0 commit comments

Comments
 (0)