Skip to content

Commit b0bc6dc

Browse files
authored
xdsclient: revert #8369: delay resource cache deletion (#8527)
The change being reverted here (#8369) is a prime suspect for a race that can show up with the following sequence of events: - create a new gRPC channel with the `xds:///` scheme - make an RPC - close the channel - repeat (possibly from multiple goroutines) The observable behavior from the race is that the xDS client thinks that a Listener resource is removed by the control plane when it clearly is not. This results in the user's gRPC channel moving to TRANSIENT_FAILURE and subsequent RPC failures. The reason the above mentioned PR is not being rolled back using `git revert` is because the xds directory structure has changed significantly since the time the PR was originally merged. Manually performing the revert seemed much easier. RELEASE NOTES: * xdsclient: Revert a change that introduces a race with xDS resource processing, leading to RPC failures
1 parent 01ae4f4 commit b0bc6dc

File tree

6 files changed

+21
-265
lines changed

6 files changed

+21
-265
lines changed

internal/xds/clients/xdsclient/ads_stream.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ type adsStreamEventHandler interface {
7171
onStreamError(error) // Called when the ADS stream breaks.
7272
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
7373
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
74-
onRequest(typeURL string) // Called when a request is about to be sent on the ADS stream.
7574
}
7675

7776
// state corresponding to a resource type.
@@ -445,11 +444,6 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
445444
}
446445
}
447446

448-
// Call the event handler to remove unsubscribed cache entries. It is to
449-
// ensure the cache entries are deleted even if discovery request fails. In
450-
// case of failure when the stream restarts, nonce is reset anyways.
451-
s.eventHandler.onRequest(url)
452-
453447
msg, err := proto.Marshal(req)
454448
if err != nil {
455449
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)

internal/xds/clients/xdsclient/authority.go

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,6 @@ func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool {
294294
// Subscribe to all existing resources from the new management server.
295295
for typ, resources := range a.resources {
296296
for name, state := range resources {
297-
if len(state.watchers) == 0 {
298-
continue
299-
}
300297
if a.logger.V(2) {
301298
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName, name)
302299
}
@@ -686,17 +683,6 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch
686683
}
687684
resources[resourceName] = state
688685
xdsChannel.channel.subscribe(rType, resourceName)
689-
} else if len(state.watchers) == 0 {
690-
if a.logger.V(2) {
691-
a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName)
692-
}
693-
// Add the active channel to the resource's channel configs if not
694-
// already present.
695-
state.xdsChannelConfigs[xdsChannel] = true
696-
// Ensure the resource is subscribed on the active channel. We do this
697-
// even if resource is present in cache as re-watches might occur
698-
// after unsubscribes or channel changes.
699-
xdsChannel.channel.subscribe(rType, resourceName)
700686
}
701687
// Always add the new watcher to the set of watchers.
702688
state.watchers[watcher] = true
@@ -774,16 +760,32 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
774760
}
775761

776762
// There are no more watchers for this resource. Unsubscribe this
777-
// resource from all channels where it was subscribed to but do not
778-
// delete the state associated with it in case the resource is
779-
// re-requested later before un-subscription request is completed by
780-
// the management server.
763+
// resource from all channels where it was subscribed to and delete
764+
// the state associated with it.
781765
if a.logger.V(2) {
782766
a.logger.Infof("Removing last watch for resource name %q", resourceName)
783767
}
784768
for xcc := range state.xdsChannelConfigs {
785769
xcc.channel.unsubscribe(rType, resourceName)
786770
}
771+
delete(resources, resourceName)
772+
773+
// If there are no more watchers for this resource type, delete the
774+
// resource type from the top-level map.
775+
if len(resources) == 0 {
776+
if a.logger.V(2) {
777+
a.logger.Infof("Removing last watch for resource type %q", rType.TypeName)
778+
}
779+
delete(a.resources, rType)
780+
}
781+
// If there are no more watchers for any resource type, release the
782+
// reference to the xdsChannels.
783+
if len(a.resources) == 0 {
784+
if a.logger.V(2) {
785+
a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel")
786+
}
787+
a.closeXDSChannels()
788+
}
787789
}, func() { close(done) })
788790
<-done
789791
})
@@ -835,7 +837,7 @@ func (a *authority) closeXDSChannels() {
835837
func (a *authority) watcherExistsForUncachedResource() bool {
836838
for _, resourceStates := range a.resources {
837839
for _, state := range resourceStates {
838-
if len(state.watchers) > 0 && state.md.Status == xdsresource.ServiceStatusRequested {
840+
if state.md.Status == xdsresource.ServiceStatusRequested {
839841
return true
840842
}
841843
}
@@ -867,9 +869,6 @@ func (a *authority) resourceConfig() []*v3statuspb.ClientConfig_GenericXdsConfig
867869
for rType, resourceStates := range a.resources {
868870
typeURL := rType.TypeURL
869871
for name, state := range resourceStates {
870-
if len(state.watchers) == 0 {
871-
continue
872-
}
873872
var raw *anypb.Any
874873
if state.cache != nil {
875874
raw = &anypb.Any{TypeUrl: typeURL, Value: state.cache.Bytes()}
@@ -903,43 +902,6 @@ func (a *authority) close() {
903902
}
904903
}
905904

906-
// removeUnsubscribedCacheEntries iterates through all resources of the given type and
907-
// removes the state for resources that have no active watchers. This is called
908-
// after sending a discovery request to ensure that resources that were
909-
// unsubscribed (and thus have no watchers) are eventually removed from the
910-
// authority's cache.
911-
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
912-
a.xdsClientSerializer.TrySchedule(func(context.Context) {
913-
resources := a.resources[rType]
914-
if resources == nil {
915-
return
916-
}
917-
918-
for name, state := range resources {
919-
if len(state.watchers) == 0 {
920-
if a.logger.V(2) {
921-
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers", name, rType.TypeName)
922-
}
923-
delete(resources, name)
924-
}
925-
}
926-
927-
if len(resources) == 0 {
928-
if a.logger.V(2) {
929-
a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName)
930-
}
931-
delete(a.resources, rType)
932-
}
933-
934-
if len(a.resources) == 0 {
935-
if a.logger.V(2) {
936-
a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels")
937-
}
938-
a.closeXDSChannels()
939-
}
940-
})
941-
}
942-
943905
func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
944906
switch serviceStatus {
945907
case xdsresource.ServiceStatusUnknown:

internal/xds/clients/xdsclient/channel.go

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ type xdsChannelEventHandler interface {
5959
// adsResourceDoesNotExist is called when the xdsChannel determines that a
6060
// requested ADS resource does not exist.
6161
adsResourceDoesNotExist(ResourceType, string)
62-
63-
// adsResourceRemoveUnsubscribedCacheEntries is called when the xdsChannel
64-
// needs to remove unsubscribed cache entries.
65-
adsResourceRemoveUnsubscribedCacheEntries(ResourceType)
6662
}
6763

6864
// xdsChannelOpts holds the options for creating a new xdsChannel.
@@ -140,32 +136,8 @@ type xdsChannel struct {
140136
}
141137

142138
func (xc *xdsChannel) close() {
143-
if xc.closed.HasFired() {
144-
return
145-
}
146139
xc.closed.Fire()
147-
148-
// Get the resource types that this specific ADS stream was handling
149-
// before stopping it.
150-
//
151-
// TODO: Revisit if we can avoid acquiring the lock of ads (another type).
152-
xc.ads.mu.Lock()
153-
typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState))
154-
for typ := range xc.ads.resourceTypeState {
155-
typesHandledByStream = append(typesHandledByStream, typ)
156-
}
157-
xc.ads.mu.Unlock()
158-
159140
xc.ads.Stop()
160-
161-
// Schedule removeUnsubscribedCacheEntries for the types this stream was handling,
162-
// on all authorities that were interested in this channel.
163-
if _, ok := xc.eventHandler.(*channelState); ok {
164-
for _, typ := range typesHandledByStream {
165-
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(typ)
166-
}
167-
}
168-
169141
xc.transport.Close()
170142
xc.logger.Infof("Shutdown")
171143
}
@@ -256,26 +228,6 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
256228
return names, err
257229
}
258230

259-
// onRequest invoked when a request is about to be sent on the ADS stream. It
260-
// removes the cache entries for the resource type that are no longer subscribed to.
261-
func (xc *xdsChannel) onRequest(typeURL string) {
262-
if xc.closed.HasFired() {
263-
if xc.logger.V(2) {
264-
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
265-
}
266-
return
267-
}
268-
269-
// Lookup the resource parser based on the resource type.
270-
rType, ok := xc.clientConfig.ResourceTypes[typeURL]
271-
if !ok {
272-
logger.Warningf("Resource type URL %q unknown in response from server", typeURL)
273-
return
274-
}
275-
276-
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(rType)
277-
}
278-
279231
// decodeResponse decodes the resources in the given ADS response.
280232
//
281233
// The opts parameter provides configuration options for decoding the resources.

internal/xds/clients/xdsclient/channel_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,3 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
772772
}
773773
return typ, name, nil
774774
}
775-
776-
func (*testEventHandler) adsResourceRemoveUnsubscribedCacheEntries(ResourceType) {
777-
}

internal/xds/clients/xdsclient/test/misc_watchers_test.go

Lines changed: 0 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -527,139 +527,3 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
527527
}
528528
}
529529
}
530-
531-
// TestUnsubscribeAndResubscribe tests the scenario where the client is busy
532-
// processing a response (simulating a pending ACK at a higher level by holding
533-
// the onDone callback from watchers). During this busy state, a resource is
534-
// unsubscribed and then immediately resubscribed which causes the
535-
// unsubscription and new subscription requests to be buffered due to flow
536-
// control.
537-
//
538-
// The test verifies the following:
539-
// - The resubscribed resource is served from the cache.
540-
// - No "resource does not exist" error is generated for the resubscribed
541-
// resource.
542-
func (s) TestRaceUnsubscribeResubscribe(t *testing.T) {
543-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
544-
defer cancel()
545-
546-
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
547-
nodeID := uuid.New().String()
548-
549-
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
550-
si := clients.ServerIdentifier{
551-
ServerURI: mgmtServer.Address,
552-
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
553-
}
554-
555-
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
556-
xdsClientConfig := xdsclient.Config{
557-
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
558-
Node: clients.Node{ID: nodeID},
559-
TransportBuilder: grpctransport.NewBuilder(configs),
560-
ResourceTypes: resourceTypes,
561-
// Xdstp resource names used in this test do not specify an
562-
// authority. These will end up looking up an entry with the
563-
// empty key in the authorities map. Having an entry with an
564-
// empty key and empty configuration, results in these
565-
// resources also using the top-level configuration.
566-
Authorities: map[string]xdsclient.Authority{
567-
"": {XDSServers: []xdsclient.ServerConfig{}},
568-
},
569-
}
570-
571-
// Create an xDS client with the above config.
572-
client, err := xdsclient.New(xdsClientConfig)
573-
if err != nil {
574-
t.Fatalf("Failed to create xDS client: %v", err)
575-
}
576-
defer client.Close()
577-
578-
const ldsResourceName1 = "test-listener-resource1"
579-
const ldsResourceName2 = "test-listener-resource2"
580-
const rdsName1 = "test-route-configuration-resource1"
581-
const rdsName2 = "test-route-configuration-resource2"
582-
listenerResource1 := e2e.DefaultClientListener(ldsResourceName1, rdsName1)
583-
listenerResource2 := e2e.DefaultClientListener(ldsResourceName2, rdsName2)
584-
585-
// Watch ldsResourceName1 with a regular watcher to ensure it's in cache
586-
// and ACKed.
587-
watcherInitial := newListenerWatcher()
588-
cancelInitial := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherInitial)
589-
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listenerResource1}, SkipValidation: true}); err != nil {
590-
t.Fatalf("mgmtServer.Update() for %s failed: %v", ldsResourceName1, err)
591-
}
592-
if err := verifyListenerUpdate(ctx, watcherInitial.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
593-
t.Fatalf("watcherR1Initial did not receive update for %s: %v", ldsResourceName1, err)
594-
}
595-
cancelInitial()
596-
597-
// Watch ldsResourceName1 and ldsResourceName2 using blocking watchers.
598-
// - Server sends {ldsResourceName1, ldsResourceName2}.
599-
// - Watchers for both resources get the update but we HOLD on to their
600-
// onDone callbacks.
601-
blockingWatcherR1 := newBLockingListenerWatcher()
602-
cancelR1 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, blockingWatcherR1)
603-
// defer cancelR1 later to create the race
604-
605-
blockingWatcherR2 := newBLockingListenerWatcher()
606-
cancelR2 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName2, blockingWatcherR2)
607-
defer cancelR2()
608-
609-
// Configure the listener resources on the management server.
610-
resources := e2e.UpdateOptions{
611-
NodeID: nodeID,
612-
Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2},
613-
SkipValidation: true}
614-
if err := mgmtServer.Update(ctx, resources); err != nil {
615-
t.Fatalf("mgmtServer.Update() for %s and %s failed: %v", ldsResourceName1, ldsResourceName2, err)
616-
}
617-
618-
var onDoneR1, onDoneR2 func()
619-
select {
620-
case <-blockingWatcherR1.updateCh:
621-
onDoneR1 = <-blockingWatcherR1.doneNotifierCh
622-
case <-ctx.Done():
623-
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR1: %v", ldsResourceName1, ctx.Err())
624-
}
625-
select {
626-
case <-blockingWatcherR2.updateCh:
627-
onDoneR2 = <-blockingWatcherR2.doneNotifierCh
628-
case <-ctx.Done():
629-
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR2: %v", ldsResourceName2, ctx.Err())
630-
}
631-
632-
// At this point, ACK for {listenerResource1,listenerResource2} has been
633-
// sent by the client but s.fc.pending.Load() is true because onDoneR1 and
634-
// onDoneR2 are held.
635-
//
636-
// Unsubscribe listenerResource1. This request should be buffered by
637-
// adsStreamImpl because s.fc.pending.Load() is true.
638-
cancelR1()
639-
640-
// Resubscribe listenerResource1 with a new regular watcher, which should
641-
// be served from cache.
642-
watcherR1New := newListenerWatcher()
643-
cancelR1New := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherR1New)
644-
defer cancelR1New()
645-
646-
if err := verifyListenerUpdate(ctx, watcherR1New.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
647-
t.Fatalf("watcherR1New did not receive cached update for %s: %v", ldsResourceName1, err)
648-
}
649-
650-
// Release the onDone callbacks.
651-
if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast.
652-
onDoneR1()
653-
}
654-
onDoneR2()
655-
656-
// Verify watcherR1New does not get a "resource does not exist" error.
657-
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout*10) // Slightly longer to catch delayed errors
658-
defer sCancel()
659-
if err := verifyNoListenerUpdate(sCtx, watcherR1New.resourceErrCh); err != nil {
660-
t.Fatalf("watcherR1New received unexpected resource error for %s: %v", ldsResourceName1, err)
661-
}
662-
if err := verifyNoListenerUpdate(sCtx, watcherR1New.ambientErrCh); err != nil {
663-
t.Fatalf("watcherR1New received unexpected ambient error for %s: %v", ldsResourceName1, err)
664-
}
665-
}

internal/xds/clients/xdsclient/xdsclient.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -437,19 +437,6 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
437437
}
438438
}
439439

440-
func (cs *channelState) adsResourceRemoveUnsubscribedCacheEntries(rType ResourceType) {
441-
if cs.parent.done.HasFired() {
442-
return
443-
}
444-
445-
cs.parent.channelsMu.Lock()
446-
defer cs.parent.channelsMu.Unlock()
447-
448-
for authority := range cs.interestedAuthorities {
449-
authority.removeUnsubscribedCacheEntries(rType)
450-
}
451-
}
452-
453440
func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
454441
n := xdsresource.ParseName(resourceName)
455442
a := c.getAuthorityForResource(n)

0 commit comments

Comments
 (0)