Skip to content

Commit c28b3b1

Browse files
authored
Merge pull request kubernetes#73937 from smarterclayton/report_errors
Report a watch error instead of eating it when we can't decode
2 parents c5114ee + 89620d5 commit c28b3b1

File tree

5 files changed

+186
-34
lines changed

5 files changed

+186
-34
lines changed

staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,3 +617,46 @@ func ReasonForError(err error) metav1.StatusReason {
617617
}
618618
return metav1.StatusReasonUnknown
619619
}
620+
621+
// ErrorReporter converts generic errors into runtime.Object errors without
622+
// requiring the caller to take a dependency on meta/v1 (where Status lives).
623+
// This prevents circular dependencies in core watch code.
624+
type ErrorReporter struct {
625+
code int
626+
verb string
627+
reason string
628+
}
629+
630+
// NewClientErrorReporter will respond with valid v1.Status objects that report
631+
// unexpected server responses. Primarily used by watch to report errors when
632+
// we attempt to decode a response from the server and it is not in the form
633+
// we expect. Because watch is a dependency of the core api, we can't return
634+
// meta/v1.Status in that package and so much inject this interface to convert a
635+
// generic error as appropriate. The reason is passed as a unique status cause
636+
// on the returned status, otherwise the generic "ClientError" is returned.
637+
func NewClientErrorReporter(code int, verb string, reason string) *ErrorReporter {
638+
return &ErrorReporter{
639+
code: code,
640+
verb: verb,
641+
reason: reason,
642+
}
643+
}
644+
645+
// AsObject returns a valid error runtime.Object (a v1.Status) for the given
646+
// error, using the code and verb of the reporter type. The error is set to
647+
// indicate that this was an unexpected server response.
648+
func (r *ErrorReporter) AsObject(err error) runtime.Object {
649+
status := NewGenericServerResponse(r.code, r.verb, schema.GroupResource{}, "", err.Error(), 0, true)
650+
if status.ErrStatus.Details == nil {
651+
status.ErrStatus.Details = &metav1.StatusDetails{}
652+
}
653+
reason := r.reason
654+
if len(reason) == 0 {
655+
reason = "ClientError"
656+
}
657+
status.ErrStatus.Details.Causes = append(status.ErrStatus.Details.Causes, metav1.StatusCause{
658+
Type: metav1.CauseType(reason),
659+
Message: err.Error(),
660+
})
661+
return &status.ErrStatus
662+
}

staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ limitations under the License.
1717
package watch
1818

1919
import (
20+
"fmt"
2021
"io"
2122
"sync"
2223

24+
"k8s.io/klog"
25+
2326
"k8s.io/apimachinery/pkg/runtime"
2427
"k8s.io/apimachinery/pkg/util/net"
2528
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26-
"k8s.io/klog"
2729
)
2830

2931
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
@@ -39,19 +41,28 @@ type Decoder interface {
3941
Close()
4042
}
4143

44+
// Reporter hides the details of how an error is turned into a runtime.Object for
45+
// reporting on a watch stream since this package may not import a higher level report.
46+
type Reporter interface {
47+
// AsObject must convert err into a valid runtime.Object for the watch stream.
48+
AsObject(err error) runtime.Object
49+
}
50+
4251
// StreamWatcher turns any stream for which you can write a Decoder interface
4352
// into a watch.Interface.
4453
type StreamWatcher struct {
4554
sync.Mutex
46-
source Decoder
47-
result chan Event
48-
stopped bool
55+
source Decoder
56+
reporter Reporter
57+
result chan Event
58+
stopped bool
4959
}
5060

5161
// NewStreamWatcher creates a StreamWatcher from the given decoder.
52-
func NewStreamWatcher(d Decoder) *StreamWatcher {
62+
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
5363
sw := &StreamWatcher{
54-
source: d,
64+
source: d,
65+
reporter: r,
5566
// It's easy for a consumer to add buffering via an extra
5667
// goroutine/channel, but impossible for them to remove it,
5768
// so nonbuffered is better.
@@ -102,11 +113,13 @@ func (sw *StreamWatcher) receive() {
102113
case io.ErrUnexpectedEOF:
103114
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
104115
default:
105-
msg := "Unable to decode an event from the watch stream: %v"
106116
if net.IsProbableEOF(err) {
107-
klog.V(5).Infof(msg, err)
117+
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
108118
} else {
109-
klog.Errorf(msg, err)
119+
sw.result <- Event{
120+
Type: Error,
121+
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
122+
}
110123
}
111124
}
112125
return

staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package watch_test
1818

1919
import (
20+
"fmt"
2021
"io"
2122
"reflect"
2223
"testing"
@@ -27,9 +28,13 @@ import (
2728

2829
type fakeDecoder struct {
2930
items chan Event
31+
err error
3032
}
3133

3234
func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) {
35+
if f.err != nil {
36+
return "", nil, f.err
37+
}
3338
item, open := <-f.items
3439
if !open {
3540
return action, nil, io.EOF
@@ -38,16 +43,27 @@ func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err erro
3843
}
3944

4045
func (f fakeDecoder) Close() {
41-
close(f.items)
46+
if f.items != nil {
47+
close(f.items)
48+
}
49+
}
50+
51+
type fakeReporter struct {
52+
err error
53+
}
54+
55+
func (f *fakeReporter) AsObject(err error) runtime.Object {
56+
f.err = err
57+
return runtime.Unstructured(nil)
4258
}
4359

4460
func TestStreamWatcher(t *testing.T) {
4561
table := []Event{
4662
{Type: Added, Object: testType("foo")},
4763
}
4864

49-
fd := fakeDecoder{make(chan Event, 5)}
50-
sw := NewStreamWatcher(fd)
65+
fd := fakeDecoder{items: make(chan Event, 5)}
66+
sw := NewStreamWatcher(fd, nil)
5167

5268
for _, item := range table {
5369
fd.items <- item
@@ -66,3 +82,26 @@ func TestStreamWatcher(t *testing.T) {
6682
t.Errorf("Unexpected failure to close")
6783
}
6884
}
85+
86+
func TestStreamWatcherError(t *testing.T) {
87+
fd := fakeDecoder{err: fmt.Errorf("test error")}
88+
fr := &fakeReporter{}
89+
sw := NewStreamWatcher(fd, fr)
90+
evt, ok := <-sw.ResultChan()
91+
if !ok {
92+
t.Fatalf("unexpected close")
93+
}
94+
if evt.Type != Error || evt.Object != runtime.Unstructured(nil) {
95+
t.Fatalf("unexpected object: %#v", evt)
96+
}
97+
_, ok = <-sw.ResultChan()
98+
if ok {
99+
t.Fatalf("unexpected open channel")
100+
}
101+
102+
sw.Stop()
103+
_, ok = <-sw.ResultChan()
104+
if ok {
105+
t.Fatalf("unexpected open channel")
106+
}
107+
}

staging/src/k8s.io/client-go/rest/request.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
595595
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
596596
}
597597
wrapperDecoder := wrapperDecoderFn(resp.Body)
598-
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
598+
return watch.NewStreamWatcher(
599+
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
600+
// use 500 to indicate that the cause of the error is unknown - other error codes
601+
// are more specific to HTTP interactions, and set a reason
602+
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
603+
), nil
599604
}
600605

601606
// updateURLMetrics is a convenience function for pushing metrics.

staging/src/k8s.io/client-go/rest/request_test.go

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737

3838
"k8s.io/klog"
3939

40-
"k8s.io/api/core/v1"
40+
v1 "k8s.io/api/core/v1"
4141
apiequality "k8s.io/apimachinery/pkg/api/equality"
4242
apierrors "k8s.io/apimachinery/pkg/api/errors"
4343
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) {
879879
}
880880
}
881881

882+
type errorReader struct {
883+
err error
884+
}
885+
886+
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
887+
func (r errorReader) Close() error { return nil }
888+
882889
func TestRequestWatch(t *testing.T) {
883890
testCases := []struct {
884891
Request *Request
892+
Expect []watch.Event
885893
Err bool
886894
ErrFn func(error) bool
887895
Empty bool
@@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) {
903911
},
904912
Err: true,
905913
},
914+
{
915+
Request: &Request{
916+
content: defaultContentConfig(),
917+
serializers: defaultSerializers(t),
918+
client: clientFunc(func(req *http.Request) (*http.Response, error) {
919+
resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}}
920+
return resp, nil
921+
}),
922+
baseURL: &url.URL{},
923+
},
924+
Expect: []watch.Event{
925+
{
926+
Type: watch.Error,
927+
Object: &metav1.Status{
928+
Status: "Failure",
929+
Code: 500,
930+
Reason: "InternalError",
931+
Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
932+
Details: &metav1.StatusDetails{
933+
Causes: []metav1.StatusCause{
934+
{
935+
Type: "UnexpectedServerResponse",
936+
Message: "unable to decode an event from the watch stream: test error",
937+
},
938+
{
939+
Type: "ClientWatchDecoding",
940+
Message: "unable to decode an event from the watch stream: test error",
941+
},
942+
},
943+
},
944+
},
945+
},
946+
},
947+
},
906948
{
907949
Request: &Request{
908950
content: defaultContentConfig(),
@@ -999,27 +1041,37 @@ func TestRequestWatch(t *testing.T) {
9991041
},
10001042
}
10011043
for i, testCase := range testCases {
1002-
t.Logf("testcase %v", testCase.Request)
1003-
testCase.Request.backoffMgr = &NoBackoff{}
1004-
watch, err := testCase.Request.Watch()
1005-
hasErr := err != nil
1006-
if hasErr != testCase.Err {
1007-
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
1008-
continue
1009-
}
1010-
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1011-
t.Errorf("%d: error not valid: %v", i, err)
1012-
}
1013-
if hasErr && watch != nil {
1014-
t.Errorf("%d: watch should be nil when error is returned", i)
1015-
continue
1016-
}
1017-
if testCase.Empty {
1018-
_, ok := <-watch.ResultChan()
1019-
if ok {
1020-
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
1044+
t.Run("", func(t *testing.T) {
1045+
testCase.Request.backoffMgr = &NoBackoff{}
1046+
watch, err := testCase.Request.Watch()
1047+
hasErr := err != nil
1048+
if hasErr != testCase.Err {
1049+
t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
10211050
}
1022-
}
1051+
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1052+
t.Errorf("%d: error not valid: %v", i, err)
1053+
}
1054+
if hasErr && watch != nil {
1055+
t.Fatalf("%d: watch should be nil when error is returned", i)
1056+
}
1057+
if testCase.Empty {
1058+
_, ok := <-watch.ResultChan()
1059+
if ok {
1060+
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
1061+
}
1062+
}
1063+
if testCase.Expect != nil {
1064+
for i, evt := range testCase.Expect {
1065+
out, ok := <-watch.ResultChan()
1066+
if !ok {
1067+
t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
1068+
}
1069+
if !reflect.DeepEqual(evt, out) {
1070+
t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out))
1071+
}
1072+
}
1073+
}
1074+
})
10231075
}
10241076
}
10251077

0 commit comments

Comments
 (0)