Skip to content
This repository was archived by the owner on Nov 19, 2020. It is now read-only.

Commit ab6956b

Browse files
committed
*: allow watches on individual resources
1 parent cee3715 commit ab6956b

File tree

4 files changed

+179
-12
lines changed

4 files changed

+179
-12
lines changed

resource.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,25 @@ func resourceWatchURL(endpoint, namespace string, r Resource, options ...Option)
260260
return "", fmt.Errorf("unregistered type %T", r)
261261
}
262262

263+
// Hack to let watch work on individual resources
264+
name := ""
265+
if meta := r.GetMetadata(); meta != nil && meta.Name != nil {
266+
name = *meta.Name
267+
if meta.Namespace != nil {
268+
// Ensure that namespaces aren't different.
269+
ns := *meta.Namespace
270+
if namespace != "" && ns != namespace {
271+
return "", fmt.Errorf("different namespace provided on resource than to watch call")
272+
}
273+
namespace = ns
274+
}
275+
}
276+
263277
if !t.namespaced && namespace != "" {
264278
return "", fmt.Errorf("type not namespaced")
265279
}
266280

267-
url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, "", options...)
281+
url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, name, options...)
268282
if strings.Contains(url, "?") {
269283
url = url + "&watch=true"
270284
} else {

resource_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,113 @@ func TestResourceURL(t *testing.T) {
161161
})
162162
}
163163
}
164+
165+
func TestResourceWatchURL(t *testing.T) {
166+
tests := []struct {
167+
name string
168+
endpoint string
169+
namespace string
170+
resource Resource
171+
options []Option
172+
want string
173+
wantErr bool
174+
}{
175+
{
176+
name: "watch_pods",
177+
namespace: "my-namespace",
178+
endpoint: "https://k8s.example.com/foo/",
179+
resource: &Pod{},
180+
want: "https://k8s.example.com/foo/api/v1/namespaces/my-namespace/pods?watch=true",
181+
},
182+
{
183+
name: "watch_all_pods",
184+
endpoint: "https://k8s.example.com/foo/",
185+
resource: &Pod{},
186+
want: "https://k8s.example.com/foo/api/v1/pods?watch=true",
187+
},
188+
{
189+
name: "watch_deployments",
190+
namespace: "my-namespace",
191+
endpoint: "https://k8s.example.com/foo/",
192+
resource: &Deployment{},
193+
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?watch=true",
194+
},
195+
{
196+
name: "watch_with_options",
197+
namespace: "my-namespace",
198+
endpoint: "https://k8s.example.com/foo/",
199+
resource: &Deployment{},
200+
options: []Option{
201+
Timeout(time.Minute),
202+
},
203+
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?timeoutSeconds=60&watch=true",
204+
},
205+
{
206+
name: "watch_non_namespaced",
207+
endpoint: "https://k8s.example.com/foo/",
208+
resource: &ClusterRole{},
209+
want: "https://k8s.example.com/foo/apis/rbac.authorization.k8s.io/v1/clusterroles?watch=true",
210+
},
211+
{
212+
name: "watch_non_namespaced_with_namespace",
213+
namespace: "my-namespace",
214+
endpoint: "https://k8s.example.com/foo/",
215+
resource: &ClusterRole{},
216+
wantErr: true, // can't provide a namespace for a non-namespaced resource
217+
},
218+
{
219+
name: "watch_deployment",
220+
endpoint: "https://k8s.example.com/foo/",
221+
resource: &Deployment{
222+
Metadata: &metav1.ObjectMeta{
223+
Namespace: String("my-namespace"),
224+
Name: String("my-deployment"),
225+
},
226+
},
227+
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
228+
},
229+
{
230+
name: "watch_deployment_ns_in_call",
231+
endpoint: "https://k8s.example.com/foo/",
232+
namespace: "my-namespace",
233+
resource: &Deployment{
234+
Metadata: &metav1.ObjectMeta{
235+
Name: String("my-deployment"),
236+
},
237+
},
238+
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
239+
},
240+
{
241+
name: "watch_deployment_mismatched_ns",
242+
endpoint: "https://k8s.example.com/foo/",
243+
namespace: "my-other-namespace",
244+
resource: &Deployment{
245+
Metadata: &metav1.ObjectMeta{
246+
Namespace: String("my-namespace"),
247+
Name: String("my-deployment"),
248+
},
249+
},
250+
wantErr: true,
251+
},
252+
}
253+
for _, test := range tests {
254+
t.Run(test.name, func(t *testing.T) {
255+
got, err := resourceWatchURL(
256+
test.endpoint,
257+
test.namespace,
258+
test.resource,
259+
test.options...,
260+
)
261+
if err != nil {
262+
if !test.wantErr {
263+
t.Fatalf("resourceWatchURL: %v", err)
264+
}
265+
return
266+
}
267+
if got != test.want {
268+
t.Errorf("want: %q", test.want)
269+
t.Errorf("got : %q", got)
270+
}
271+
})
272+
}
273+
}

watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,22 @@ func parseUnknown(b []byte) (*runtime.Unknown, error) {
160160
// fmt.Println(eventType, *cm.Metadata.Name)
161161
// }
162162
//
163+
// To watch an individual resource, provide a resource with pre-populated
164+
// metadata:
165+
//
166+
// // Watch "my-configmap" in "my-namespace"
167+
// configMap := corev1.ConfigMap{
168+
// Metadata: &metav1.ObjectMeta{
169+
// Namespace: String("my-namespace"),
170+
// Name: String("my-configmap"),
171+
// },
172+
// }
173+
// watcher, err := client.Watch(ctx, "", &configMap)
174+
// if err != nil {
175+
// // handle error
176+
// }
177+
// defer watcher.Close() // Always close the returned watcher.
178+
//
163179
func (c *Client) Watch(ctx context.Context, namespace string, r Resource, options ...Option) (*Watcher, error) {
164180
url, err := resourceWatchURL(c.Endpoint, namespace, r, options...)
165181
if err != nil {

watch_test.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,21 @@ func init() {
2424
k8s.Register("", "v1", "configmaps", true, &configMapJSON{})
2525
}
2626

27-
func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
28-
w, err := client.Watch(context.TODO(), namespace, newCM())
27+
func testWatch(t *testing.T, client *k8s.Client, namespace string, r k8s.Resource, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
28+
cm := newCM()
29+
30+
if r.GetMetadata() != nil {
31+
// Individual watch must created beforehand
32+
if err := client.Create(context.TODO(), cm); err != nil {
33+
t.Errorf("create configmap: %v", err)
34+
return
35+
}
36+
}
37+
w, err := client.Watch(context.TODO(), namespace, r)
2938
if err != nil {
30-
t.Errorf("watch configmaps: %v", err)
39+
t.Fatalf("watch configmaps: %v", err)
3140
}
3241
defer w.Close()
33-
34-
cm := newCM()
3542
want := func(eventType string) {
3643
got := newCM()
3744
eT, err := w.Next(got)
@@ -51,11 +58,13 @@ func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func()
5158
}
5259
}
5360

54-
if err := client.Create(context.TODO(), cm); err != nil {
55-
t.Errorf("create configmap: %v", err)
56-
return
61+
if r.GetMetadata() == nil {
62+
if err := client.Create(context.TODO(), cm); err != nil {
63+
t.Errorf("create configmap: %v", err)
64+
return
65+
}
66+
want(k8s.EventAdded)
5767
}
58-
want(k8s.EventAdded)
5968

6069
update(cm)
6170

@@ -86,7 +95,7 @@ func TestWatchConfigMapJSON(t *testing.T) {
8695
updateCM := func(cm k8s.Resource) {
8796
(cm.(*configMapJSON)).Data = map[string]string{"hello": "world"}
8897
}
89-
testWatch(t, client, namespace, newCM, updateCM)
98+
testWatch(t, client, namespace, &configMapJSON{}, newCM, updateCM)
9099
})
91100
}
92101

@@ -104,6 +113,24 @@ func TestWatchConfigMapProto(t *testing.T) {
104113
updateCM := func(cm k8s.Resource) {
105114
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
106115
}
107-
testWatch(t, client, namespace, newCM, updateCM)
116+
testWatch(t, client, namespace, &corev1.ConfigMap{}, newCM, updateCM)
117+
})
118+
}
119+
120+
func TestWatchIndividualConfigMap(t *testing.T) {
121+
withNamespace(t, func(client *k8s.Client, namespace string) {
122+
newCM := func() k8s.Resource {
123+
return &corev1.ConfigMap{
124+
Metadata: &metav1.ObjectMeta{
125+
Name: k8s.String("my-configmap"),
126+
Namespace: &namespace,
127+
},
128+
}
129+
}
130+
131+
updateCM := func(cm k8s.Resource) {
132+
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
133+
}
134+
testWatch(t, client, namespace, newCM(), newCM, updateCM)
108135
})
109136
}

0 commit comments

Comments
 (0)