Skip to content

Commit 2e64704

Browse files
authored
Merge pull request kubernetes#75096 from tnozicka/fix-kubectl-run-watches
Fix kubectl run watches
2 parents 56c7463 + d9e511e commit 2e64704

File tree

2 files changed

+34
-12
lines changed

2 files changed

+34
-12
lines changed

pkg/kubectl/cmd/run/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go_library(
2323
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
2424
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
2525
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
26+
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
2627
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2728
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
2829
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
@@ -31,6 +32,7 @@ go_library(
3132
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
3233
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
3334
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
35+
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
3436
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
3537
"//vendor/github.com/docker/distribution/reference:go_default_library",
3638
"//vendor/github.com/spf13/cobra:go_default_library",

pkg/kubectl/cmd/run/run.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/api/errors"
3030
"k8s.io/apimachinery/pkg/api/meta"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/fields"
3233
"k8s.io/apimachinery/pkg/runtime"
3334
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3435
"k8s.io/apimachinery/pkg/watch"
@@ -37,6 +38,7 @@ import (
3738
"k8s.io/client-go/dynamic"
3839
"k8s.io/client-go/kubernetes"
3940
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
41+
"k8s.io/client-go/tools/cache"
4042
watchtools "k8s.io/client-go/tools/watch"
4143
"k8s.io/kubernetes/pkg/kubectl"
4244
"k8s.io/kubernetes/pkg/kubectl/cmd/attach"
@@ -490,18 +492,41 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
490492

491493
// waitForPod watches the given pod until the exitCondition is true
492494
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
493-
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
494-
if err != nil {
495-
return nil, err
496-
}
497-
498495
// TODO: expose the timeout
499496
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
500497
defer cancel()
498+
499+
preconditionFunc := func(store cache.Store) (bool, error) {
500+
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: ns, Name: name})
501+
if err != nil {
502+
return true, err
503+
}
504+
if !exists {
505+
// We need to make sure we see the object in the cache before we start waiting for events
506+
// or we would be waiting for the timeout if such object didn't exist.
507+
// (e.g. it was deleted before we started informers so they wouldn't even see the delete event)
508+
return true, errors.NewNotFound(corev1.Resource("pods"), name)
509+
}
510+
511+
return false, nil
512+
}
513+
514+
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
515+
lw := &cache.ListWatch{
516+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
517+
options.FieldSelector = fieldSelector
518+
return podClient.Pods(ns).List(options)
519+
},
520+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
521+
options.FieldSelector = fieldSelector
522+
return podClient.Pods(ns).Watch(options)
523+
},
524+
}
525+
501526
intr := interrupt.New(nil, cancel)
502527
var result *corev1.Pod
503-
err = intr.Run(func() error {
504-
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) {
528+
err := intr.Run(func() error {
529+
ev, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, preconditionFunc, func(ev watch.Event) (bool, error) {
505530
return exitCondition(ev)
506531
})
507532
if ev != nil {
@@ -510,11 +535,6 @@ func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitConditio
510535
return err
511536
})
512537

513-
// Fix generic not found error.
514-
if err != nil && errors.IsNotFound(err) {
515-
err = errors.NewNotFound(corev1.Resource("pods"), name)
516-
}
517-
518538
return result, err
519539
}
520540

0 commit comments

Comments
 (0)