Skip to content

Commit a4c1366

Browse files
authored
Merge branch 'master' into ignore-status-for-grpc-content-type-8486
2 parents bebc8d3 + 9ff80a7 commit a4c1366

File tree

113 files changed

+14680
-2616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+14680
-2616
lines changed

balancer/grpclb/grpc_lb_v1/load_balancer.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

balancer/pickfirst/pickfirst.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
169169
addrs = state.ResolverState.Addresses
170170
if cfg.ShuffleAddressList {
171171
addrs = append([]resolver.Address{}, addrs...)
172-
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
172+
internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
173173
}
174174
}
175175

balancer/pickfirst/pickfirst_ext_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package pickfirst_test
2020

2121
import (
2222
"context"
23+
"encoding/json"
2324
"errors"
2425
"fmt"
2526
"strings"
@@ -28,11 +29,14 @@ import (
2829

2930
"google.golang.org/grpc"
3031
"google.golang.org/grpc/backoff"
32+
"google.golang.org/grpc/balancer"
33+
pfbalancer "google.golang.org/grpc/balancer/pickfirst"
3134
pfinternal "google.golang.org/grpc/balancer/pickfirst/internal"
3235
"google.golang.org/grpc/codes"
3336
"google.golang.org/grpc/connectivity"
3437
"google.golang.org/grpc/credentials/insecure"
3538
"google.golang.org/grpc/internal"
39+
"google.golang.org/grpc/internal/balancer/stub"
3640
"google.golang.org/grpc/internal/channelz"
3741
"google.golang.org/grpc/internal/grpctest"
3842
"google.golang.org/grpc/internal/stubserver"
@@ -463,6 +467,85 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
463467
}
464468
}
465469

470+
// Tests the PF LB policy with shuffling enabled. It explicitly unsets the
471+
// Endpoints field in the resolver update to test the shuffling of the
472+
// Addresses.
473+
func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) {
474+
// Install a shuffler that always reverses two entries.
475+
origShuf := pfinternal.RandShuffle
476+
defer func() { pfinternal.RandShuffle = origShuf }()
477+
pfinternal.RandShuffle = func(n int, f func(int, int)) {
478+
if n != 2 {
479+
t.Errorf("Shuffle called with n=%v; want 2", n)
480+
return
481+
}
482+
f(0, 1) // reverse the two addresses
483+
}
484+
485+
pfBuilder := balancer.Get(pfbalancer.Name)
486+
shuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`))
487+
if err != nil {
488+
t.Fatal(err)
489+
}
490+
noShuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`))
491+
if err != nil {
492+
t.Fatal(err)
493+
}
494+
var activeCfg serviceconfig.LoadBalancingConfig
495+
496+
bf := stub.BalancerFuncs{
497+
Init: func(bd *stub.BalancerData) {
498+
bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
499+
},
500+
Close: func(bd *stub.BalancerData) {
501+
bd.ChildBalancer.Close()
502+
},
503+
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
504+
ccs.BalancerConfig = activeCfg
505+
ccs.ResolverState.Endpoints = nil
506+
return bd.ChildBalancer.UpdateClientConnState(ccs)
507+
},
508+
}
509+
510+
stub.Register(t.Name(), bf)
511+
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
512+
// Set up our backends.
513+
cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg))
514+
addrs := stubBackendsToResolverAddrs(backends)
515+
516+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
517+
defer cancel()
518+
519+
// Push an update with both addresses and shuffling disabled. We should
520+
// connect to backend 0.
521+
activeCfg = noShuffleConfig
522+
resolverState := resolver.State{Addresses: addrs}
523+
r.UpdateState(resolverState)
524+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
525+
t.Fatal(err)
526+
}
527+
528+
// Send a config with shuffling enabled. This will reverse the addresses,
529+
// but the channel should still be connected to backend 0.
530+
activeCfg = shuffleConfig
531+
r.UpdateState(resolverState)
532+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
533+
t.Fatal(err)
534+
}
535+
536+
// Send a resolver update with no addresses. This should push the channel
537+
// into TransientFailure.
538+
r.UpdateState(resolver.State{})
539+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
540+
541+
// Send the same config as last time with shuffling enabled. Since we are
542+
// not connected to backend 0, we should connect to backend 1.
543+
r.UpdateState(resolverState)
544+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
545+
t.Fatal(err)
546+
}
547+
}
548+
466549
// Test config parsing with the env var turned on and off for various scenarios.
467550
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
468551
// Install a shuffler that always reverses two entries.

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
283283
newAddrs = state.ResolverState.Addresses
284284
if cfg.ShuffleAddressList {
285285
newAddrs = append([]resolver.Address{}, newAddrs...)
286-
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
286+
internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
287287
}
288288
}
289289

@@ -351,6 +351,13 @@ func (b *pickfirstBalancer) ExitIdle() {
351351
b.mu.Lock()
352352
defer b.mu.Unlock()
353353
if b.state == connectivity.Idle {
354+
// Move the balancer into CONNECTING state immediately. This is done to
355+
// avoid staying in IDLE if a resolver update arrives before the first
356+
// SubConn reports CONNECTING.
357+
b.updateBalancerState(balancer.State{
358+
ConnectivityState: connectivity.Connecting,
359+
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
360+
})
354361
b.startFirstPassLocked()
355362
}
356363
}
@@ -374,13 +381,14 @@ func (b *pickfirstBalancer) closeSubConnsLocked() {
374381

375382
// deDupAddresses ensures that each address appears only once in the slice.
376383
func deDupAddresses(addrs []resolver.Address) []resolver.Address {
377-
seenAddrs := resolver.NewAddressMapV2[*scData]()
384+
seenAddrs := resolver.NewAddressMapV2[bool]()
378385
retAddrs := []resolver.Address{}
379386

380387
for _, addr := range addrs {
381388
if _, ok := seenAddrs.Get(addr); ok {
382389
continue
383390
}
391+
seenAddrs.Set(addr, true)
384392
retAddrs = append(retAddrs, addr)
385393
}
386394
return retAddrs
@@ -604,7 +612,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
604612
if !b.addressList.seekTo(sd.addr) {
605613
// This should not fail as we should have only one SubConn after
606614
// entering READY. The SubConn should be present in the addressList.
607-
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
615+
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
608616
return
609617
}
610618
if !b.healthCheckingEnabled {

balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
11521152
ResolverState: resolver.State{
11531153
Endpoints: []resolver.Endpoint{
11541154
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}},
1155+
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // duplicate, should be ignored.
11551156
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1111"}}},
11561157
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
11571158
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
@@ -1504,6 +1505,102 @@ func (s) TestPickFirstLeaf_AddressUpdateWithMetadata(t *testing.T) {
15041505
}
15051506
}
15061507

1508+
// Tests the scenario where a connection is established and then breaks, leading
1509+
// to a reconnection attempt. While the reconnection is in progress, a resolver
1510+
// update with a new address is received. The test verifies that the balancer
1511+
// creates a new SubConn for the new address and that the ClientConn eventually
1512+
// becomes READY.
1513+
func (s) TestPickFirstLeaf_Reconnection(t *testing.T) {
1514+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1515+
defer cancel()
1516+
cc := testutils.NewBalancerClientConn(t)
1517+
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
1518+
defer bal.Close()
1519+
ccState := balancer.ClientConnState{
1520+
ResolverState: resolver.State{
1521+
Endpoints: []resolver.Endpoint{
1522+
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
1523+
},
1524+
},
1525+
}
1526+
if err := bal.UpdateClientConnState(ccState); err != nil {
1527+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
1528+
}
1529+
1530+
select {
1531+
case state := <-cc.NewStateCh:
1532+
if got, want := state, connectivity.Connecting; got != want {
1533+
t.Fatalf("Received unexpected ClientConn sate: got %v, want %v", got, want)
1534+
}
1535+
case <-ctx.Done():
1536+
t.Fatal("Context timed out waiting for ClientConn state update.")
1537+
}
1538+
1539+
sc1 := <-cc.NewSubConnCh
1540+
select {
1541+
case <-sc1.ConnectCh:
1542+
case <-ctx.Done():
1543+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
1544+
}
1545+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1546+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
1547+
1548+
if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
1549+
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
1550+
}
1551+
1552+
// Simulate a connection breakage, this should result the channel
1553+
// transitioning to IDLE.
1554+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
1555+
if err := cc.WaitForConnectivityState(ctx, connectivity.Idle); err != nil {
1556+
t.Fatalf("Context timed out waiting for ClientConn to enter IDLE.")
1557+
}
1558+
1559+
// Calling the idle picker should result in the SubConn being re-connected.
1560+
picker := <-cc.NewPickerCh
1561+
if _, err := picker.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
1562+
t.Fatalf("picker.Pick() returned error: %v, want %v", err, balancer.ErrNoSubConnAvailable)
1563+
}
1564+
1565+
select {
1566+
case <-sc1.ConnectCh:
1567+
case <-ctx.Done():
1568+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
1569+
}
1570+
1571+
// Send a resolver update, removing the existing SubConn. Since the balancer
1572+
// is connecting, it should create a new SubConn for the new backend
1573+
// address.
1574+
ccState = balancer.ClientConnState{
1575+
ResolverState: resolver.State{
1576+
Endpoints: []resolver.Endpoint{
1577+
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
1578+
},
1579+
},
1580+
}
1581+
if err := bal.UpdateClientConnState(ccState); err != nil {
1582+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
1583+
}
1584+
1585+
var sc2 *testutils.TestSubConn
1586+
select {
1587+
case sc2 = <-cc.NewSubConnCh:
1588+
case <-ctx.Done():
1589+
t.Fatal("Context timed out waiting for new SubConn to be created.")
1590+
}
1591+
1592+
select {
1593+
case <-sc2.ConnectCh:
1594+
case <-ctx.Done():
1595+
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
1596+
}
1597+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1598+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
1599+
if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
1600+
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
1601+
}
1602+
}
1603+
15071604
// healthListenerCapturingCCWrapper is used to capture the health listener so
15081605
// that health updates can be mocked for testing.
15091606
type healthListenerCapturingCCWrapper struct {

balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
201201
ResolverState: resolver.State{
202202
Endpoints: []resolver.Endpoint{
203203
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
204+
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored.
204205
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
206+
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored.
205207
},
206208
},
207209
}
@@ -213,14 +215,35 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
213215
// once.
214216
tfErr := fmt.Errorf("test err: connection refused")
215217
sc1 := <-cc.NewSubConnCh
218+
select {
219+
case <-sc1.ConnectCh:
220+
case <-ctx.Done():
221+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
222+
}
216223
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
217224
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})
218225

226+
// Move the subconn back to IDLE, it should not be re-connected until the
227+
// first pass is complete.
228+
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
229+
defer shortCancel()
230+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
231+
select {
232+
case <-sc1.ConnectCh:
233+
t.Fatal("Connect() unexpectedly called on sc1.")
234+
case <-shortCtx.Done():
235+
}
236+
219237
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
220238
t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", balancer.ErrNoSubConnAvailable, err)
221239
}
222240

223241
sc2 := <-cc.NewSubConnCh
242+
select {
243+
case <-sc2.ConnectCh:
244+
case <-ctx.Done():
245+
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
246+
}
224247
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
225248
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})
226249

@@ -230,6 +253,29 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
230253

231254
// Subsequent TRANSIENT_FAILUREs should be reported only after seeing "# of SubConns"
232255
// TRANSIENT_FAILUREs.
256+
257+
// Both the subconns should be connected in parallel.
258+
select {
259+
case <-sc1.ConnectCh:
260+
case <-ctx.Done():
261+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
262+
}
263+
264+
shortCtx, shortCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
265+
defer shortCancel()
266+
select {
267+
case <-sc2.ConnectCh:
268+
t.Fatal("Connect() called on sc2 before it completed backing-off.")
269+
case <-shortCtx.Done():
270+
}
271+
272+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
273+
select {
274+
case <-sc2.ConnectCh:
275+
case <-ctx.Done():
276+
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
277+
}
278+
233279
newTfErr := fmt.Errorf("test err: unreachable")
234280
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: newTfErr})
235281
select {

balancer_wrapper.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,13 +450,14 @@ func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(
450450
if acbw.ccb.cc.dopts.disableHealthCheck {
451451
return noOpRegisterHealthListenerFn
452452
}
453+
cfg := acbw.ac.cc.healthCheckConfig()
454+
if cfg == nil {
455+
return noOpRegisterHealthListenerFn
456+
}
453457
regHealthLisFn := internal.RegisterClientHealthCheckListener
454458
if regHealthLisFn == nil {
455459
// The health package is not imported.
456-
return noOpRegisterHealthListenerFn
457-
}
458-
cfg := acbw.ac.cc.healthCheckConfig()
459-
if cfg == nil {
460+
channelz.Error(logger, acbw.ac.channelz, "Health check is requested but health package is not imported.")
460461
return noOpRegisterHealthListenerFn
461462
}
462463
return func(ctx context.Context, listener func(balancer.SubConnState)) func() {

0 commit comments

Comments
 (0)