Skip to content

Commit 9df9e3a

Browse files
committed
create functional zero-change watch
1 parent f785651 commit 9df9e3a

File tree

3 files changed

+139
-28
lines changed

3 files changed

+139
-28
lines changed

pkg/manifestclient/io.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package manifestclient
2+
3+
import (
4+
"io"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
func newDelayedNothingReader(timeout time.Duration) *delayedNothingReaderCloser {
10+
return &delayedNothingReaderCloser{timeout: timeout}
11+
}
12+
13+
type delayedNothingReaderCloser struct {
14+
timeout time.Duration
15+
closed atomic.Bool
16+
}
17+
18+
func (d *delayedNothingReaderCloser) Read(p []byte) (n int, err error) {
19+
if d.closed.Load() {
20+
return 0, io.EOF
21+
}
22+
select {
23+
case <-time.After(d.timeout):
24+
d.Close()
25+
}
26+
if d.closed.Load() {
27+
return 0, io.EOF
28+
}
29+
return 0, nil
30+
}
31+
32+
func (d *delayedNothingReaderCloser) Close() error {
33+
d.closed.Store(true)
34+
return nil
35+
}
36+
37+
var _ io.ReadCloser = &delayedNothingReaderCloser{}

pkg/manifestclient/roundtripper.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"net/http"
1010
"os"
1111
"path/filepath"
12+
"strconv"
13+
"time"
1214

1315
apierrors "k8s.io/apimachinery/pkg/api/errors"
1416
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -123,6 +125,24 @@ func (mrt *manifestRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
123125
// TODO handle label and field selectors
124126
returnBody, returnErr = mrt.list(requestInfo)
125127

128+
case "watch":
129+
// our watches do nothing. We keep the connection alive (I think), but nothing else.
130+
timeoutSecondsString := req.URL.Query().Get("timeoutSeconds")
131+
timeoutDuration := 10 * time.Minute
132+
if len(timeoutSecondsString) > 0 {
133+
currSeconds, err := strconv.ParseInt(timeoutSecondsString, 10, 32)
134+
if err != nil {
135+
returnErr = err
136+
break
137+
}
138+
timeoutDuration = time.Duration(currSeconds) * time.Second
139+
}
140+
resp := &http.Response{}
141+
resp.StatusCode = http.StatusOK
142+
resp.Status = http.StatusText(resp.StatusCode)
143+
resp.Body = newDelayedNothingReader(timeoutDuration)
144+
return resp, nil
145+
126146
default:
127147
return nil, fmt.Errorf("verb %v is not supported by this implementation", requestInfo.Verb)
128148
}

pkg/manifestclienttest/client_test.go

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http"
77
"reflect"
88
"testing"
9+
"time"
910

1011
"github.com/davecgh/go-spew/spew"
1112
configv1 "github.com/openshift/api/config/v1"
@@ -22,29 +23,6 @@ import (
2223
var mustGather01 embed.FS
2324

2425
func TestSimpleChecks(t *testing.T) {
25-
mustGatherRoundTripper, err := manifestclient.NewRoundTripper("testdata/must-gather-01")
26-
if err != nil {
27-
t.Fatal(err)
28-
}
29-
testRoundTripper, err := manifestclient.NewTestingRoundTripper(mustGather01, "testdata/must-gather-01")
30-
if err != nil {
31-
t.Fatal(err)
32-
}
33-
34-
roundTrippers := []struct {
35-
name string
36-
roundTripper http.RoundTripper
37-
}{
38-
{
39-
name: "directory read",
40-
roundTripper: mustGatherRoundTripper,
41-
},
42-
{
43-
name: "embed read",
44-
roundTripper: testRoundTripper,
45-
},
46-
}
47-
4826
tests := []struct {
4927
name string
5028
testFn func(*testing.T, *http.Client)
@@ -178,15 +156,91 @@ func TestSimpleChecks(t *testing.T) {
178156
},
179157
},
180158
}
181-
for _, roundTripperTest := range roundTrippers {
182-
httpClient := &http.Client{
183-
Transport: roundTripperTest.roundTripper,
184-
}
185159

160+
for _, roundTripperTest := range defaultRoundTrippers(t) {
161+
t.Run(roundTripperTest.name, func(t *testing.T) {
162+
for _, test := range tests {
163+
t.Run(test.name, func(t *testing.T) {
164+
test.testFn(t, roundTripperTest.getClient())
165+
})
166+
}
167+
})
168+
}
169+
}
170+
171+
func defaultRoundTrippers(t *testing.T) []*testRoundTrippers {
172+
t.Helper()
173+
174+
mustGatherRoundTripper, err := manifestclient.NewRoundTripper("testdata/must-gather-01")
175+
if err != nil {
176+
t.Fatal(err)
177+
}
178+
testRoundTripper, err := manifestclient.NewTestingRoundTripper(mustGather01, "testdata/must-gather-01")
179+
if err != nil {
180+
t.Fatal(err)
181+
}
182+
183+
return []*testRoundTrippers{
184+
{
185+
name: "directory read",
186+
roundTripper: mustGatherRoundTripper,
187+
},
188+
{
189+
name: "embed read",
190+
roundTripper: testRoundTripper,
191+
},
192+
}
193+
}
194+
195+
type testRoundTrippers struct {
196+
name string
197+
roundTripper http.RoundTripper
198+
}
199+
200+
func (r *testRoundTrippers) getClient() *http.Client {
201+
return &http.Client{
202+
Transport: r.roundTripper,
203+
}
204+
}
205+
206+
func TestWatchChecks(t *testing.T) {
207+
tests := []struct {
208+
name string
209+
testFn func(*testing.T, *http.Client)
210+
}{
211+
{
212+
name: "WATCH-from-individual-file-success-server-close",
213+
testFn: func(t *testing.T, httpClient *http.Client) {
214+
timeout := int64(4)
215+
configClient, err := configclient.NewForConfigAndClient(&rest.Config{}, httpClient)
216+
if err != nil {
217+
t.Fatal(err)
218+
}
219+
watcher, err := configClient.ConfigV1().FeatureGates().Watch(context.TODO(), metav1.ListOptions{
220+
TimeoutSeconds: &timeout,
221+
})
222+
if err != nil {
223+
t.Fatal(err)
224+
}
225+
select {
226+
case <-watcher.ResultChan():
227+
t.Fatal("closed early!")
228+
case <-time.After(500 * time.Millisecond):
229+
}
230+
231+
select {
232+
case <-watcher.ResultChan():
233+
case <-time.After(5 * time.Second):
234+
t.Fatal("closed late!")
235+
}
236+
},
237+
},
238+
}
239+
for _, roundTripperTest := range defaultRoundTrippers(t) {
186240
t.Run(roundTripperTest.name, func(t *testing.T) {
187241
for _, test := range tests {
188242
t.Run(test.name, func(t *testing.T) {
189-
test.testFn(t, httpClient)
243+
test.testFn(t, roundTripperTest.getClient())
190244
})
191245
}
192246
})

0 commit comments

Comments
 (0)