Skip to content

Commit c7db760

Browse files
authored
xdsclient: ensure xDS node ID in included in NACK and connectivity errors (#8103)
1 parent 42fc25a commit c7db760

File tree

2 files changed

+20
-15
lines changed

2 files changed

+20
-15
lines changed

xds/internal/xdsclient/tests/ads_stream_backoff_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"context"
2323
"errors"
2424
"fmt"
25-
"strings"
2625
"testing"
2726
"time"
2827

@@ -104,13 +103,15 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
104103

105104
// Override the backoff implementation to push on a channel that is read by
106105
// the test goroutine.
106+
backoffCtx, backoffCancel := context.WithCancel(ctx)
107107
streamBackoff := func(v int) time.Duration {
108108
select {
109109
case backoffCh <- struct{}{}:
110-
case <-ctx.Done():
110+
case <-backoffCtx.Done():
111111
}
112112
return 0
113113
}
114+
defer backoffCancel()
114115

115116
// Create an xDS client with bootstrap pointing to the above server.
116117
nodeID := uuid.New().String()
@@ -130,13 +131,8 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
130131
}
131132

132133
// Verify that the received stream error is reported to the watcher.
133-
u, err := lw.updateCh.Receive(ctx)
134-
if err != nil {
135-
t.Fatal("Timeout when waiting for an error callback on the listener watcher")
136-
}
137-
gotErr := u.(listenerUpdateErrTuple).err
138-
if !strings.Contains(gotErr.Error(), streamErr.Error()) {
139-
t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr)
134+
if err := verifyListenerError(ctx, lw.updateCh, streamErr.Error(), nodeID); err != nil {
135+
t.Fatal(err)
140136
}
141137

142138
// Verify that the stream is closed.
@@ -157,6 +153,12 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
157153
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
158154
t.Fatal(err)
159155
}
156+
157+
// To prevent indefinite blocking during xDS client close, which is caused
158+
// by a blocking backoff channel write, cancel the backoff context early
159+
// given that the test is complete.
160+
backoffCancel()
161+
160162
}
161163

162164
// Tests the case where a stream breaks because the server goes down. Verifies

xds/internal/xdsclient/tests/lds_watchers_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
180180
return nil
181181
}
182182

183-
func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error {
183+
func verifyListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr, wantNodeID string) error {
184184
u, err := updateCh.Receive(ctx)
185185
if err != nil {
186186
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
@@ -189,6 +189,9 @@ func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel
189189
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
190190
return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr)
191191
}
192+
if !strings.Contains(gotErr.Error(), wantNodeID) {
193+
return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID)
194+
}
192195
return nil
193196
}
194197

@@ -1058,15 +1061,15 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
10581061
}
10591062

10601063
// Verify that the expected error is propagated to the existing watcher.
1061-
if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil {
1064+
if err := verifyListenerError(ctx, lw.updateCh, wantListenerNACKErr, nodeID); err != nil {
10621065
t.Fatal(err)
10631066
}
10641067

10651068
// Verify that the expected error is propagated to the new watcher as well.
10661069
lw2 := newListenerWatcher()
10671070
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
10681071
defer ldsCancel2()
1069-
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
1072+
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
10701073
t.Fatal(err)
10711074
}
10721075
}
@@ -1138,7 +1141,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
11381141
}
11391142

11401143
// Verify that the expected error is propagated to the existing watcher.
1141-
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
1144+
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
11421145
t.Fatal(err)
11431146
}
11441147

@@ -1151,7 +1154,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
11511154
t.Fatal(err)
11521155
}
11531156
// Verify that the expected error is propagated to the existing watcher.
1154-
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
1157+
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
11551158
t.Fatal(err)
11561159
}
11571160
}
@@ -1229,7 +1232,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
12291232
// Verify that the expected error is propagated to the watcher which
12301233
// requested for the bad resource.
12311234
// Verify that the expected error is propagated to the existing watcher.
1232-
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
1235+
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
12331236
t.Fatal(err)
12341237
}
12351238

0 commit comments

Comments
 (0)