Skip to content

Commit 8b61e8f

Browse files
authored
xdsclient: do not process updates from closed server channels (#8389)
1 parent 7238ab1 commit 8b61e8f

File tree

1 file changed

+34
-7
lines changed

1 file changed

+34
-7
lines changed

xds/internal/clients/xdsclient/authority.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ func (a *authority) adsResourceUpdate(serverConfig *ServerConfig, rType Resource
333333
//
334334
// Only executed in the context of a serializer callback.
335335
func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType ResourceType, updates map[string]dataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
336-
a.handleRevertingToPrimaryOnUpdate(serverConfig)
336+
if !a.handleRevertingToPrimaryOnUpdate(serverConfig) {
337+
return
338+
}
337339

338340
// We build a list of callback funcs to invoke, and invoke them at the end
339341
// of this method instead of inline (when handling the update for a
@@ -551,23 +553,47 @@ func (a *authority) handleADSResourceDoesNotExist(rType ResourceType, resourceNa
551553
// lower priority servers are closed and the active server is reverted to the
552554
// highest priority server that sent the update.
553555
//
556+
// The return value indicates whether subsequent processing of the resource
557+
// update should continue or not.
558+
//
554559
// This method is only executed in the context of a serializer callback.
555-
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) {
556-
if a.activeXDSChannel != nil && isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) {
560+
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) bool {
561+
if a.activeXDSChannel == nil {
562+
// This can happen only when all watches on this authority have been
563+
// removed, and the xdsChannels have been closed. This update should
564+
// have been received prior to closing of the channel, and therefore
565+
// must be ignored.
566+
return false
567+
}
568+
569+
if isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) {
557570
// If the resource update is from the current active server, nothing
558571
// needs to be done from fallback point of view.
559-
return
572+
return true
560573
}
561574

562575
if a.logger.V(2) {
563576
a.logger.Infof("Received update from non-active server %q", serverConfig)
564577
}
565578

566579
// If the resource update is not from the current active server, it means
567-
// that we have received an update from a higher priority server and we need
568-
// to revert back to it. This method guarantees that when an update is
569-
// received from a server, all lower priority servers are closed.
580+
// that we have received an update either from:
581+
// - a server that has a higher priority than the current active server and
582+
// therefore we need to revert back to it and close all lower priority
583+
// servers, or,
584+
// - a server that has a lower priority than the current active server. This
585+
// can happen when the server close and the response race against each
586+
// other. We can safely ignore this update, since we have already reverted
587+
// to the higher priority server, and closed all lower priority servers.
570588
serverIdx := a.serverIndexForConfig(serverConfig)
589+
activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig)
590+
if activeServerIdx < serverIdx {
591+
return false
592+
}
593+
594+
// At this point, we are guaranteed that we have received a response from a
595+
// higher priority server compared to the current active server. So, we
596+
// revert to the higher priorty server and close all lower priority ones.
571597
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]
572598

573599
// Close all lower priority channels.
@@ -605,6 +631,7 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
605631
}
606632
cfg.channel = nil
607633
}
634+
return true
608635
}
609636

610637
// watchResource registers a new watcher for the specified resource type and

0 commit comments

Comments
 (0)