Skip to content

Commit e0ac3ac

Browse files
authored
xdsclient: Add error type for NACKed resources (#8117)
1 parent 65c6718 commit e0ac3ac

File tree

12 files changed

+56
-33
lines changed

12 files changed

+56
-33
lines changed

xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ func (s) TestResolverError(t *testing.T) {
923923
}
924924

925925
// Push a resource-not-found-error this time around.
926-
resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
926+
resolverErr = xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
927927
r.ReportError(resolverErr)
928928

929929
// Wait for the CDS resource to be not requested anymore, or the connection

xds/internal/xdsclient/channel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res
287287
perResourceErrors[name] = err
288288
// Add place holder in the map so we know this resource name was in
289289
// the response.
290-
ret[name] = ads.DataAndErrTuple{Err: err}
290+
ret[name] = ads.DataAndErrTuple{Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, err.Error())}
291291
}
292292

293293
if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
@@ -299,7 +299,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res
299299
errRet := combineErrors(rType.TypeName(), topLevelErrors, perResourceErrors)
300300
md.ErrState = &xdsresource.UpdateErrorMetadata{
301301
Version: resp.Version,
302-
Err: errRet,
302+
Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, errRet.Error()),
303303
Timestamp: timestamp,
304304
}
305305
return ret, md, errRet

xds/internal/xdsclient/clientimpl_watchers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type wrappingWatcher struct {
3434
}
3535

3636
func (w *wrappingWatcher) OnError(err error, done xdsresource.OnDoneFunc) {
37-
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %v", w.nodeID, err), done)
37+
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done)
3838
}
3939

4040
// WatchResource uses xDS to discover the resource associated with the provided

xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package xdsclient_test
2020

2121
import (
2222
"context"
23-
"errors"
2423
"fmt"
2524
"strings"
2625
"testing"
@@ -162,7 +161,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) {
162161
}
163162
gotResp = r.(*v3discoverypb.DiscoveryResponse)
164163

165-
wantNackErr := errors.New("unexpected http connection manager resource type")
164+
wantNackErr := xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type")
166165
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil {
167166
t.Fatal(err)
168167
}
@@ -309,7 +308,7 @@ func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) {
309308
gotResp := r.(*v3discoverypb.DiscoveryResponse)
310309

311310
// Verify that the error is propagated to the watcher.
312-
var wantNackErr = errors.New("unexpected http connection manager resource type")
311+
var wantNackErr = xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type")
313312
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil {
314313
t.Fatal(err)
315314
}

xds/internal/xdsclient/tests/authority_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,6 @@ func (cw *clusterWatcherV2) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc
370370
// resends resources which are NACKed by the xDS client, using a `Replace()`
371371
// here simplifies tests that want access to the most recently received
372372
// error.
373-
cw.resourceNotFoundCh.Replace(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response"))
373+
cw.resourceNotFoundCh.Replace(xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response"))
374374
onDone()
375375
}

xds/internal/xdsclient/tests/cds_watchers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
8282
}
8383

8484
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85-
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
85+
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
8686
onDone()
8787
}
8888

@@ -670,7 +670,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
670670
// Verify that an empty update with the expected error is received.
671671
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
672672
defer cancel()
673-
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
673+
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
674674
if err := verifyClusterUpdate(ctx, cw.updateCh, clusterUpdateErrTuple{err: wantErr}); err != nil {
675675
t.Fatal(err)
676676
}
@@ -847,7 +847,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
847847

848848
// The first watcher should receive a resource removed error, while the
849849
// second watcher should not receive an update.
850-
if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
850+
if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
851851
t.Fatal(err)
852852
}
853853
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {

xds/internal/xdsclient/tests/eds_watchers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
9191
}
9292

9393
func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
94-
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
94+
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
9595
onDone()
9696
}
9797

@@ -757,7 +757,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
757757
// Verify that an empty update with the expected error is received.
758758
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
759759
defer cancel()
760-
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
760+
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
761761
if err := verifyEndpointsUpdate(ctx, ew.updateCh, endpointsUpdateErrTuple{err: wantErr}); err != nil {
762762
t.Fatal(err)
763763
}

xds/internal/xdsclient/tests/lds_watchers_test.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
8686
}
8787

8888
func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
89-
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
89+
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
9090
onDone()
9191
}
9292

@@ -111,7 +111,7 @@ func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneF
111111
}
112112

113113
func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
114-
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
114+
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
115115
onDone()
116116
}
117117

@@ -135,10 +135,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
135135
}
136136
}
137137

138-
// xdsClient is expected to produce an error containing this string when an
139-
// update is received containing a listener created using `badListenerResource`.
140-
const wantListenerNACKErr = "no RouteSpecifier"
141-
142138
// verifyNoListenerUpdate verifies that no listener update is received on the
143139
// provided update channel, and returns an error if an update is received.
144140
//
@@ -195,6 +191,21 @@ func verifyListenerError(ctx context.Context, updateCh *testutils.Channel, wantE
195191
return nil
196192
}
197193

194+
func verifyErrorType(ctx context.Context, updateCh *testutils.Channel, wantErrType xdsresource.ErrorType, wantNodeID string) error {
195+
u, err := updateCh.Receive(ctx)
196+
if err != nil {
197+
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
198+
}
199+
gotErr := u.(listenerUpdateErrTuple).err
200+
if got, want := xdsresource.ErrType(gotErr), wantErrType; got != want {
201+
return fmt.Errorf("update received with error %v of type: %v, want %v", gotErr, got, want)
202+
}
203+
if !strings.Contains(gotErr.Error(), wantNodeID) {
204+
return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID)
205+
}
206+
return nil
207+
}
208+
198209
// TestLDSWatch covers the case where a single watcher exists for a single
199210
// listener resource. The test verifies the following scenarios:
200211
// 1. An update from the management server containing the resource being
@@ -726,7 +737,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
726737
// Verify that an empty update with the expected error is received.
727738
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
728739
defer cancel()
729-
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
740+
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
730741
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantErr}); err != nil {
731742
t.Fatal(err)
732743
}
@@ -900,7 +911,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
900911
// The first watcher should receive a resource removed error, while the
901912
// second watcher should not see an update.
902913
if err := verifyListenerUpdate(ctx, lw1.updateCh, listenerUpdateErrTuple{
903-
err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""),
914+
err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, ""),
904915
}); err != nil {
905916
t.Fatal(err)
906917
}
@@ -1002,7 +1013,7 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
10021013
}
10031014

10041015
// The existing watcher should receive a resource removed error.
1005-
updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}
1016+
updateError := listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")}
10061017
if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil {
10071018
t.Fatal(err)
10081019
}
@@ -1061,15 +1072,15 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
10611072
}
10621073

10631074
// Verify that the expected error is propagated to the existing watcher.
1064-
if err := verifyListenerError(ctx, lw.updateCh, wantListenerNACKErr, nodeID); err != nil {
1075+
if err := verifyErrorType(ctx, lw.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
10651076
t.Fatal(err)
10661077
}
10671078

10681079
// Verify that the expected error is propagated to the new watcher as well.
10691080
lw2 := newListenerWatcher()
10701081
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
10711082
defer ldsCancel2()
1072-
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
1083+
if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
10731084
t.Fatal(err)
10741085
}
10751086
}
@@ -1141,7 +1152,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
11411152
}
11421153

11431154
// Verify that the expected error is propagated to the existing watcher.
1144-
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
1155+
if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
11451156
t.Fatal(err)
11461157
}
11471158

@@ -1154,7 +1165,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
11541165
t.Fatal(err)
11551166
}
11561167
// Verify that the expected error is propagated to the existing watcher.
1157-
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
1168+
if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
11581169
t.Fatal(err)
11591170
}
11601171
}
@@ -1232,7 +1243,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
12321243
// Verify that the expected error is propagated to the watcher which
12331244
// requested for the bad resource.
12341245
// Verify that the expected error is propagated to the existing watcher.
1235-
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
1246+
if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
12361247
t.Fatal(err)
12371248
}
12381249

xds/internal/xdsclient/tests/misc_watchers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFu
9090
}
9191

9292
func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
93-
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
93+
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
9494
onDone()
9595
}
9696

xds/internal/xdsclient/tests/rds_watchers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc)
8181
}
8282

8383
func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
84-
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
84+
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
8585
onDone()
8686
}
8787

@@ -759,7 +759,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
759759
// Verify that an empty update with the expected error is received.
760760
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
761761
defer cancel()
762-
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
762+
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
763763
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, routeConfigUpdateErrTuple{err: wantErr}); err != nil {
764764
t.Fatal(err)
765765
}

0 commit comments

Comments
 (0)