Skip to content

Commit 37f733a

Browse files
authored
Merge pull request kubernetes#125868 from soltysh/wait_for
Add --for=create option to kubectl wait
2 parents c20aa76 + aaf1fb5 commit 37f733a

File tree

8 files changed

+629
-373
lines changed

8 files changed

+629
-373
lines changed

staging/src/k8s.io/cli-runtime/pkg/genericclioptions/builder_flags_fake.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,40 @@ import (
2121
)
2222

2323
// NewSimpleFakeResourceFinder builds a super simple ResourceFinder that just iterates over the objects you provided
24-
func NewSimpleFakeResourceFinder(infos ...*resource.Info) ResourceFinder {
25-
return &fakeResourceFinder{
24+
func NewSimpleFakeResourceFinder(infos ...*resource.Info) *FakeResourceFinder {
25+
return &FakeResourceFinder{
2626
Infos: infos,
2727
}
2828
}
2929

30-
type fakeResourceFinder struct {
30+
func (f *FakeResourceFinder) WithError(err error) *FakeResourceFinder {
31+
f.err = err
32+
return f
33+
}
34+
35+
type FakeResourceFinder struct {
3136
Infos []*resource.Info
37+
err error
3238
}
3339

3440
// Do implements the interface
35-
func (f *fakeResourceFinder) Do() resource.Visitor {
41+
func (f *FakeResourceFinder) Do() resource.Visitor {
3642
return &fakeResourceResult{
3743
Infos: f.Infos,
44+
err: f.err,
3845
}
3946
}
4047

4148
type fakeResourceResult struct {
4249
Infos []*resource.Info
50+
err error
4351
}
4452

4553
// Visit just iterates over info
4654
func (r *fakeResourceResult) Visit(fn resource.VisitorFunc) error {
55+
if r.err != nil {
56+
return r.err
57+
}
4758
for _, info := range r.Infos {
4859
err := fn(info, nil)
4960
if err != nil {
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package wait
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
"strings"
25+
"time"
26+
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30+
"k8s.io/apimachinery/pkg/fields"
31+
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/util/wait"
33+
"k8s.io/apimachinery/pkg/watch"
34+
"k8s.io/cli-runtime/pkg/resource"
35+
"k8s.io/client-go/tools/cache"
36+
watchtools "k8s.io/client-go/tools/watch"
37+
"k8s.io/kubectl/pkg/util/interrupt"
38+
)
39+
40+
// ConditionalWait hold information to check an API status condition
41+
type ConditionalWait struct {
42+
conditionName string
43+
conditionStatus string
44+
// errOut is written to if an error occurs
45+
errOut io.Writer
46+
}
47+
48+
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
49+
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
50+
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
51+
}
52+
53+
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
54+
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
55+
if err != nil {
56+
return false, err
57+
}
58+
if !found {
59+
return false, nil
60+
}
61+
for _, conditionUncast := range conditions {
62+
condition := conditionUncast.(map[string]interface{})
63+
name, found, err := unstructured.NestedString(condition, "type")
64+
if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
65+
continue
66+
}
67+
status, found, err := unstructured.NestedString(condition, "status")
68+
if !found || err != nil {
69+
continue
70+
}
71+
generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
72+
if found {
73+
observedGeneration, found := getObservedGeneration(obj, condition)
74+
if found && observedGeneration < generation {
75+
return false, nil
76+
}
77+
}
78+
return strings.EqualFold(status, w.conditionStatus), nil
79+
}
80+
81+
return false, nil
82+
}
83+
84+
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
85+
if event.Type == watch.Error {
86+
// keep waiting in the event we see an error - we expect the watch to be closed by
87+
// the server
88+
err := apierrors.FromObject(event.Object)
89+
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
90+
return false, nil
91+
}
92+
if event.Type == watch.Deleted {
93+
// this will chain back out, result in another get and an return false back up the chain
94+
return false, nil
95+
}
96+
obj := event.Object.(*unstructured.Unstructured)
97+
return w.checkCondition(obj)
98+
}
99+
100+
type isCondMetFunc func(event watch.Event) (bool, error)
101+
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
102+
103+
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
104+
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
105+
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
106+
if len(info.Name) == 0 {
107+
return info.Object, false, fmt.Errorf("resource name must be provided")
108+
}
109+
110+
endTime := time.Now().Add(o.Timeout)
111+
timeout := time.Until(endTime)
112+
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
113+
if o.Timeout == 0 {
114+
// If timeout is zero we will fetch the object(s) once only and check
115+
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
116+
if initObjGetErr != nil {
117+
return nil, false, initObjGetErr
118+
}
119+
if gottenObj == nil {
120+
return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
121+
}
122+
conditionCheck, err := check(gottenObj)
123+
if err != nil {
124+
return gottenObj, false, err
125+
}
126+
if !conditionCheck {
127+
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
128+
}
129+
return gottenObj, true, nil
130+
}
131+
if timeout < 0 {
132+
// we're out of time
133+
return info.Object, false, errWaitTimeoutWithName
134+
}
135+
136+
mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
137+
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
138+
lw := &cache.ListWatch{
139+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
140+
options.FieldSelector = fieldSelector
141+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
142+
},
143+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
144+
options.FieldSelector = fieldSelector
145+
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
146+
},
147+
}
148+
149+
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
150+
preconditionFunc := func(store cache.Store) (bool, error) {
151+
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
152+
if err != nil {
153+
return true, err
154+
}
155+
if !exists {
156+
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
157+
}
158+
159+
return false, nil
160+
}
161+
162+
intrCtx, cancel := context.WithCancel(ctx)
163+
defer cancel()
164+
var result runtime.Object
165+
intr := interrupt.New(nil, cancel)
166+
err := intr.Run(func() error {
167+
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
168+
if ev != nil {
169+
result = ev.Object
170+
}
171+
if errors.Is(err, context.DeadlineExceeded) {
172+
return errWaitTimeoutWithName
173+
}
174+
return err
175+
})
176+
if err != nil {
177+
if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
178+
return result, false, errWaitTimeoutWithName
179+
}
180+
return result, false, err
181+
}
182+
183+
return result, true, nil
184+
}
185+
186+
func extendErrWaitTimeout(err error, info *resource.Info) error {
187+
return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
188+
}
189+
190+
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
191+
conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
192+
if found {
193+
return conditionObservedGeneration, true
194+
}
195+
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
196+
return statusObservedGeneration, found
197+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package wait
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/apimachinery/pkg/runtime"
24+
"k8s.io/cli-runtime/pkg/resource"
25+
)
26+
27+
// IsCreated is a condition func for waiting for something to be created
28+
func IsCreated(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
29+
if len(info.Name) == 0 || info.Object == nil {
30+
return nil, false, fmt.Errorf("resource name must be provided")
31+
}
32+
return info.Object, true, nil
33+
}

0 commit comments

Comments
 (0)