Skip to content

Commit b4000c8

Browse files
xds/clusterimpl: Convert existing unit tests to e2e style (1/N) (#8549)
1 parent bb71072 commit b4000c8

File tree

2 files changed

+296
-393
lines changed

2 files changed

+296
-393
lines changed

internal/xds/balancer/clusterimpl/balancer_test.go

Lines changed: 0 additions & 341 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"encoding/json"
2424
"errors"
2525
"fmt"
26-
"sort"
2726
"strings"
2827
"sync"
2928
"testing"
@@ -64,7 +63,6 @@ const (
6463

6564
var (
6665
testBackendEndpoints = []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}}
67-
cmpOpts = cmp.Options{cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), sortDataSlice}
6866
toleranceCmpOpt = cmp.Options{cmpopts.EquateApprox(0, 1e-5), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{})}
6967
)
7068

@@ -279,345 +277,6 @@ func init() {
279277
NewRandomWRR = testutils.NewTestWRR
280278
}
281279

282-
var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*loadData) []*loadData {
283-
out := append([]*loadData(nil), in...) // Copy input to avoid mutating it
284-
sort.Slice(out,
285-
func(i, j int) bool {
286-
if out[i].cluster < out[j].cluster {
287-
return true
288-
}
289-
if out[i].cluster == out[j].cluster {
290-
return out[i].service < out[j].service
291-
}
292-
return false
293-
},
294-
)
295-
return out
296-
})
297-
298-
func verifyLoadStoreData(wantStoreData, gotStoreData *loadData) error {
299-
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpOpts); diff != "" {
300-
return fmt.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
301-
}
302-
return nil
303-
}
304-
305-
// TestDropByCategory verifies that the balancer correctly drops the picks, and
306-
// that the drops are reported.
307-
func (s) TestDropByCategory(t *testing.T) {
308-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
309-
defer cancel()
310-
311-
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
312-
xdsC := fakeclient.NewClient()
313-
314-
builder := balancer.Get(Name)
315-
cc := testutils.NewBalancerClientConn(t)
316-
b := builder.Build(cc, balancer.BuildOptions{})
317-
defer b.Close()
318-
319-
const (
320-
dropReason = "test-dropping-category"
321-
dropNumerator = 1
322-
dropDenominator = 2
323-
)
324-
testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
325-
URI: "trafficdirector.googleapis.com:443",
326-
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
327-
})
328-
if err != nil {
329-
t.Fatalf("Failed to create LRS server config for testing: %v", err)
330-
}
331-
if err := b.UpdateClientConnState(balancer.ClientConnState{
332-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
333-
BalancerConfig: &LBConfig{
334-
Cluster: testClusterName,
335-
EDSServiceName: testServiceName,
336-
LoadReportingServer: testLRSServerConfig,
337-
DropCategories: []DropConfig{{
338-
Category: dropReason,
339-
RequestsPerMillion: million * dropNumerator / dropDenominator,
340-
}},
341-
ChildPolicy: &internalserviceconfig.BalancerConfig{
342-
Name: roundrobin.Name,
343-
},
344-
},
345-
}); err != nil {
346-
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
347-
}
348-
349-
got, err := xdsC.WaitForReportLoad(ctx)
350-
if err != nil {
351-
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
352-
}
353-
if got.Server != testLRSServerConfig {
354-
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
355-
}
356-
357-
sc1 := <-cc.NewSubConnCh
358-
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
359-
// This should get the connecting picker.
360-
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
361-
t.Fatal(err.Error())
362-
}
363-
364-
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
365-
// Test pick with one backend.
366-
367-
testClusterLoadReporter := &testLoadReporter{cluster: testClusterName, service: testServiceName, drops: make(map[string]uint64), localityRPCCount: make(map[clients.Locality]*rpcCountData)}
368-
369-
const rpcCount = 24
370-
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
371-
// Override the loadStore in the picker with testClusterLoadReporter.
372-
picker := p.(*picker)
373-
originalLoadStore := picker.loadStore
374-
picker.loadStore = testClusterLoadReporter
375-
defer func() { picker.loadStore = originalLoadStore }()
376-
377-
for i := 0; i < rpcCount; i++ {
378-
gotSCSt, err := p.Pick(balancer.PickInfo{})
379-
// Even RPCs are dropped.
380-
if i%2 == 0 {
381-
if err == nil || !strings.Contains(err.Error(), "dropped") {
382-
return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
383-
}
384-
continue
385-
}
386-
if err != nil || gotSCSt.SubConn != sc1 {
387-
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
388-
}
389-
if gotSCSt.Done == nil {
390-
continue
391-
}
392-
// Fail 1/4th of the requests that are not dropped.
393-
if i%8 == 1 {
394-
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("test error")})
395-
} else {
396-
gotSCSt.Done(balancer.DoneInfo{})
397-
}
398-
}
399-
return nil
400-
}); err != nil {
401-
t.Fatal(err.Error())
402-
}
403-
404-
// Dump load data from the store and compare with expected counts.
405-
const dropCount = rpcCount * dropNumerator / dropDenominator
406-
wantStatsData0 := &loadData{
407-
cluster: testClusterName,
408-
service: testServiceName,
409-
totalDrops: dropCount,
410-
drops: map[string]uint64{dropReason: dropCount},
411-
localityStats: map[clients.Locality]localityData{
412-
{}: {requestStats: requestData{
413-
succeeded: (rpcCount - dropCount) * 3 / 4,
414-
errored: (rpcCount - dropCount) / 4,
415-
issued: rpcCount - dropCount,
416-
}},
417-
},
418-
}
419-
420-
gotStatsData0 := testClusterLoadReporter.stats()
421-
if err := verifyLoadStoreData(wantStatsData0, gotStatsData0); err != nil {
422-
t.Fatal(err)
423-
}
424-
425-
// Send an update with new drop configs.
426-
const (
427-
dropReason2 = "test-dropping-category-2"
428-
dropNumerator2 = 1
429-
dropDenominator2 = 4
430-
)
431-
if err := b.UpdateClientConnState(balancer.ClientConnState{
432-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
433-
BalancerConfig: &LBConfig{
434-
Cluster: testClusterName,
435-
EDSServiceName: testServiceName,
436-
LoadReportingServer: testLRSServerConfig,
437-
DropCategories: []DropConfig{{
438-
Category: dropReason2,
439-
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
440-
}},
441-
ChildPolicy: &internalserviceconfig.BalancerConfig{
442-
Name: roundrobin.Name,
443-
},
444-
},
445-
}); err != nil {
446-
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
447-
}
448-
449-
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
450-
// Override the loadStore in the picker with testClusterLoadReporter.
451-
picker := p.(*picker)
452-
originalLoadStore := picker.loadStore
453-
picker.loadStore = testClusterLoadReporter
454-
defer func() { picker.loadStore = originalLoadStore }()
455-
for i := 0; i < rpcCount; i++ {
456-
gotSCSt, err := p.Pick(balancer.PickInfo{})
457-
// Even RPCs are dropped.
458-
if i%4 == 0 {
459-
if err == nil || !strings.Contains(err.Error(), "dropped") {
460-
return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
461-
}
462-
continue
463-
}
464-
if err != nil || gotSCSt.SubConn != sc1 {
465-
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
466-
}
467-
if gotSCSt.Done != nil {
468-
gotSCSt.Done(balancer.DoneInfo{})
469-
}
470-
}
471-
return nil
472-
}); err != nil {
473-
t.Fatal(err.Error())
474-
}
475-
476-
const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
477-
wantStatsData1 := &loadData{
478-
cluster: testClusterName,
479-
service: testServiceName,
480-
totalDrops: dropCount2,
481-
drops: map[string]uint64{dropReason2: dropCount2},
482-
localityStats: map[clients.Locality]localityData{
483-
{}: {requestStats: requestData{
484-
succeeded: rpcCount - dropCount2,
485-
issued: rpcCount - dropCount2,
486-
}},
487-
},
488-
}
489-
490-
gotStatsData1 := testClusterLoadReporter.stats()
491-
if err := verifyLoadStoreData(wantStatsData1, gotStatsData1); err != nil {
492-
t.Fatal(err)
493-
}
494-
}
495-
496-
// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
497-
// due to circuit breaking, and that the drops are reported.
498-
func (s) TestDropCircuitBreaking(t *testing.T) {
499-
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
500-
xdsC := fakeclient.NewClient()
501-
502-
builder := balancer.Get(Name)
503-
cc := testutils.NewBalancerClientConn(t)
504-
b := builder.Build(cc, balancer.BuildOptions{})
505-
defer b.Close()
506-
507-
var maxRequest uint32 = 50
508-
testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
509-
URI: "trafficdirector.googleapis.com:443",
510-
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
511-
})
512-
if err != nil {
513-
t.Fatalf("Failed to create LRS server config for testing: %v", err)
514-
}
515-
if err := b.UpdateClientConnState(balancer.ClientConnState{
516-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
517-
BalancerConfig: &LBConfig{
518-
Cluster: testClusterName,
519-
EDSServiceName: testServiceName,
520-
LoadReportingServer: testLRSServerConfig,
521-
MaxConcurrentRequests: &maxRequest,
522-
ChildPolicy: &internalserviceconfig.BalancerConfig{
523-
Name: roundrobin.Name,
524-
},
525-
},
526-
}); err != nil {
527-
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
528-
}
529-
530-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
531-
defer cancel()
532-
533-
got, err := xdsC.WaitForReportLoad(ctx)
534-
if err != nil {
535-
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
536-
}
537-
if got.Server != testLRSServerConfig {
538-
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
539-
}
540-
541-
sc1 := <-cc.NewSubConnCh
542-
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
543-
// This should get the connecting picker.
544-
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
545-
t.Fatal(err.Error())
546-
}
547-
548-
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
549-
// Test pick with one backend.
550-
testClusterLoadReporter := &testLoadReporter{cluster: testClusterName, service: testServiceName, drops: make(map[string]uint64), localityRPCCount: make(map[clients.Locality]*rpcCountData)}
551-
const rpcCount = 100
552-
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
553-
dones := []func(){}
554-
// Override the loadStore in the picker with testClusterLoadReporter.
555-
picker := p.(*picker)
556-
originalLoadStore := picker.loadStore
557-
picker.loadStore = testClusterLoadReporter
558-
defer func() { picker.loadStore = originalLoadStore }()
559-
560-
for i := 0; i < rpcCount; i++ {
561-
gotSCSt, err := p.Pick(balancer.PickInfo{})
562-
if i < 50 && err != nil {
563-
return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
564-
} else if i > 50 && err == nil {
565-
return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
566-
}
567-
dones = append(dones, func() {
568-
if gotSCSt.Done != nil {
569-
gotSCSt.Done(balancer.DoneInfo{})
570-
}
571-
})
572-
}
573-
for _, done := range dones {
574-
done()
575-
}
576-
577-
dones = []func(){}
578-
// Pick without drops.
579-
for i := 0; i < 50; i++ {
580-
gotSCSt, err := p.Pick(balancer.PickInfo{})
581-
if err != nil {
582-
t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
583-
}
584-
dones = append(dones, func() {
585-
if gotSCSt.Done != nil {
586-
// Fail these requests to test error counts in the load
587-
// report.
588-
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("test error")})
589-
}
590-
})
591-
}
592-
for _, done := range dones {
593-
done()
594-
}
595-
596-
return nil
597-
}); err != nil {
598-
t.Fatal(err.Error())
599-
}
600-
601-
// Dump load data from the store and compare with expected counts.
602-
wantStatsData0 := &loadData{
603-
cluster: testClusterName,
604-
service: testServiceName,
605-
totalDrops: uint64(maxRequest),
606-
localityStats: map[clients.Locality]localityData{
607-
{}: {requestStats: requestData{
608-
succeeded: uint64(rpcCount - maxRequest),
609-
errored: 50,
610-
issued: uint64(rpcCount - maxRequest + 50),
611-
}},
612-
},
613-
}
614-
615-
gotStatsData0 := testClusterLoadReporter.stats()
616-
if err := verifyLoadStoreData(wantStatsData0, gotStatsData0); err != nil {
617-
t.Fatal(err)
618-
}
619-
}
620-
621280
// TestPickerUpdateAfterClose covers the case where a child policy sends a
622281
// picker update after the cluster_impl policy is closed. Because picker updates
623282
// are handled in the run() goroutine, which exits before Close() returns, we

0 commit comments

Comments
 (0)