Skip to content

Commit 1059e84

Browse files
authored
xdsclient: improve fallback test involving three servers (#8604)
The existing fallback test that involves three servers is flaky. The reason for the flake is because some of the resources have the same name in different servers. The listener resource is expected to have the same name across the different management servers, but we generally expect the other resources to have different names. See the following from the gRFC: - In https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md#reservations-about-using-the-fallback-server-data, we have the following: ``` We have no guarantee that a combination of resources from different xDS servers form a valid cohesive configuration, so we cannot make this determination on a per-resource basis. We need any given gRPC channel or server listener to only use the resources from a single server. ``` - In https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md#config-tears, we have the following: ``` Config tears happen when the client winds up using some combination of resources from the primary and fallback servers at the same time, even though that combination of resources was never validated to work together. In theory, this can cause correctness issues where we might send traffic to the wrong location or the wrong way, or it can cause RPCs to fail. Note that this can happen only when the primary and fallback server use the same resource names. ``` This PR ensures that all the different management servers have different resource names for all resources except the listener. Also, ran the test on forge 100K times with no failures. This PR also improves a couple of logs that I found useful when debugging the failures. RELEASE NOTES: none
1 parent 8ae3c07 commit 1059e84

File tree

2 files changed

+54
-38
lines changed

2 files changed

+54
-38
lines changed

internal/xds/clients/xdsclient/authority.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -570,10 +570,6 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
570570
return true
571571
}
572572

573-
if a.logger.V(2) {
574-
a.logger.Infof("Received update from non-active server %q", serverConfig)
575-
}
576-
577573
// If the resource update is not from the current active server, it means
578574
// that we have received an update either from:
579575
// - a server that has a higher priority than the current active server and
@@ -586,8 +582,14 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
586582
serverIdx := a.serverIndexForConfig(serverConfig)
587583
activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig)
588584
if activeServerIdx < serverIdx {
585+
if a.logger.V(2) {
586+
a.logger.Infof("Ignoring update from lower priority server [%d] %q", serverIdx, serverConfig)
587+
}
589588
return false
590589
}
590+
if a.logger.V(2) {
591+
a.logger.Infof("Received update from higher priority server [%d] %q", serverIdx, serverConfig)
592+
}
591593

592594
// At this point, we are guaranteed that we have received a response from a
593595
// higher priority server compared to the current active server. So, we
@@ -622,7 +624,7 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
622624
// Release the reference to the channel.
623625
if cfg.cleanup != nil {
624626
if a.logger.V(2) {
625-
a.logger.Infof("Closing lower priority server %q", cfg.serverConfig)
627+
a.logger.Infof("Closing lower priority server [%d] %q", i, cfg.serverConfig)
626628
}
627629
cfg.cleanup()
628630
cfg.cleanup = nil

internal/xds/xdsclient/tests/fallback/fallback_test.go

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -742,14 +742,13 @@ func (s) TestFallback_OnStartup_RPCSuccess(t *testing.T) {
742742
}
743743
}
744744

745-
// TestXDSFallback_ThreeServerPromotion verifies that when the primary
746-
// management server is unavailable, the system attempts to connect to the
747-
// first fallback server, and if that is also down, to the second fallback
748-
// server. It also ensures that the system switches back to the first fallback
749-
// server once it becomes available again, and eventually returns to the
750-
// primary server when it comes back online, closing connections to the
751-
// fallback servers accordingly.
752-
func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
745+
// Test verifies that when the primary management server is unavailable, the
746+
// system attempts to connect to the first fallback server, and if that is also
747+
// down, to the second fallback server. It also ensures that the system switches
748+
// back to the first fallback server once it becomes available again, and
749+
// eventually returns to the primary server when it comes back online, closing
750+
// connections to the fallback servers accordingly.
751+
func (s) TestFallback_ThreeServerPromotion(t *testing.T) {
753752
ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout)
754753
defer cancel()
755754

@@ -770,34 +769,49 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
770769

771770
// Start three test service backends.
772771
backend1 := stubserver.StartTestService(t, nil)
772+
backend1Port := testutils.ParsePort(t, backend1.Address)
773773
defer backend1.Stop()
774774
backend2 := stubserver.StartTestService(t, nil)
775+
backend2Port := testutils.ParsePort(t, backend2.Address)
775776
defer backend2.Stop()
776777
backend3 := stubserver.StartTestService(t, nil)
778+
backend3Port := testutils.ParsePort(t, backend3.Address)
777779
defer backend3.Stop()
778780

779781
nodeID := uuid.New().String()
780-
const serviceName = "my-service-fallback-xds"
782+
const (
783+
serviceName = "my-service-fallback-xds"
784+
primaryRouteConfigName = "primary-route-" + serviceName
785+
secondaryRouteConfigName = "secondary-route-" + serviceName
786+
tertiaryRouteConfigName = "tertiary-route-" + serviceName
787+
primaryClusterName = "primary-cluster-" + serviceName
788+
secondaryClusterName = "secondary-cluster-" + serviceName
789+
tertiaryClusterName = "tertiary-cluster-" + serviceName
790+
primaryEndpointsName = "primary-endpoints-" + serviceName
791+
secondaryEndpointsName = "secondary-endpoints-" + serviceName
792+
tertiaryEndpointsName = "tertiary-endpoints-" + serviceName
793+
)
781794

782795
// Configure partial resources on the primary and secondary
783796
// management servers.
784797
primaryManagementServer.Update(ctx, e2e.UpdateOptions{
785798
NodeID: nodeID,
786-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-p")},
799+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, primaryRouteConfigName)},
787800
})
788801
secondaryManagementServer.Update(ctx, e2e.UpdateOptions{
789802
NodeID: nodeID,
790-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-s1")},
803+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, secondaryRouteConfigName)},
791804
})
792805

793806
// Configure full resources on tertiary management server.
794-
tertiaryManagementServer.Update(ctx, e2e.DefaultClientResources(e2e.ResourceParams{
795-
DialTarget: serviceName,
796-
NodeID: nodeID,
797-
Host: "localhost",
798-
Port: testutils.ParsePort(t, backend3.Address),
799-
SecLevel: e2e.SecurityLevelNone,
800-
}))
807+
updateOpts := e2e.UpdateOptions{
808+
NodeID: nodeID,
809+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, tertiaryRouteConfigName)},
810+
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(tertiaryRouteConfigName, serviceName, tertiaryClusterName)},
811+
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(tertiaryClusterName, tertiaryEndpointsName, e2e.SecurityLevelNone)},
812+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(tertiaryEndpointsName, "localhost", []uint32{backend3Port})},
813+
}
814+
tertiaryManagementServer.Update(ctx, updateOpts)
801815

802816
// Create bootstrap configuration for all three management servers.
803817
bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
@@ -874,15 +888,14 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
874888

875889
// Secondary1 becomes available, RPCs go to backend2.
876890
secondaryLis.Restart()
877-
secondaryManagementServer.Update(ctx, e2e.UpdateOptions{
891+
updateOpts = e2e.UpdateOptions{
878892
NodeID: nodeID,
879-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-s1")},
880-
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig("route-s1", serviceName, "cluster-s1")},
881-
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster("cluster-s1", "endpoints-s1", e2e.SecurityLevelNone)},
882-
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
883-
e2e.DefaultEndpoint("endpoints-s1", "localhost", []uint32{testutils.ParsePort(t, backend2.Address)}),
884-
},
885-
})
893+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, secondaryRouteConfigName)},
894+
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(secondaryRouteConfigName, serviceName, secondaryClusterName)},
895+
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(secondaryClusterName, secondaryEndpointsName, e2e.SecurityLevelNone)},
896+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(secondaryEndpointsName, "localhost", []uint32{backend2Port})},
897+
}
898+
secondaryManagementServer.Update(ctx, updateOpts)
886899

887900
secondaryConn, err := secondaryWrappedLis.NewConnCh.Receive(ctx)
888901
if err != nil {
@@ -897,13 +910,14 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
897910

898911
// Primary becomes available, RPCs go to backend1.
899912
primaryLis.Restart()
900-
primaryManagementServer.Update(ctx, e2e.DefaultClientResources(e2e.ResourceParams{
901-
DialTarget: serviceName,
902-
NodeID: nodeID,
903-
Host: "localhost",
904-
Port: testutils.ParsePort(t, backend1.Address),
905-
SecLevel: e2e.SecurityLevelNone,
906-
}))
913+
updateOpts = e2e.UpdateOptions{
914+
NodeID: nodeID,
915+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, primaryRouteConfigName)},
916+
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(primaryRouteConfigName, serviceName, primaryClusterName)},
917+
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(primaryClusterName, primaryEndpointsName, e2e.SecurityLevelNone)},
918+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(primaryEndpointsName, "localhost", []uint32{backend1Port})},
919+
}
920+
primaryManagementServer.Update(ctx, updateOpts)
907921

908922
if _, err := primaryWrappedLis.NewConnCh.Receive(ctx); err != nil {
909923
t.Fatalf("Timeout when waiting for new connection to primary: %v", err)

0 commit comments

Comments
 (0)