From 7f43d9eb184a48c6a11a8083a9e2e7e5a9d1a81c Mon Sep 17 00:00:00 2001 From: marek-szews Date: Mon, 13 Oct 2025 13:39:30 +0000 Subject: [PATCH 1/7] Implementation of A68 random_subsetting LB policy. Signed-off-by: marek-szews --- balancer/randomsubsetting/randomsubsetting.go | 201 ++++++++++++++++ .../randomsubsetting/randomsubsetting_test.go | 222 ++++++++++++++++++ 2 files changed, 423 insertions(+) create mode 100644 balancer/randomsubsetting/randomsubsetting.go create mode 100644 balancer/randomsubsetting/randomsubsetting_test.go diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go new file mode 100644 index 000000000000..9b3a9ed54948 --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -0,0 +1,201 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package randomsubsetting defines a random subsetting balancer. +// +// To install random subsetting balancer, import this package as: +// +// import _ "google.golang.org/grpc/balancer/randomsubsetting" +package randomsubsetting + +import ( + "encoding/json" + "errors" + "fmt" + "sort" + "time" + + "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancer/gracefulswitch" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +const ( + // Name is the name of the random subsetting load balancer. + Name = "random_subsetting" +) + +var ( + logger = grpclog.Component(Name) +) + +func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p)) +} + +func init() { + balancer.Register(bb{}) +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &subsettingBalancer{ + cc: cc, + hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), + } + // Create a logger with a prefix specific to this balancer instance. + b.logger = prefixLogger(b) + + b.logger.Infof("Created") + b.child = gracefulswitch.NewBalancer(cc, bOpts) + return b +} + +// LBConfig is the config for the outlier detection balancer. +type LBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + + SubsetSize uint64 `json:"subset_size,omitempty"` + + ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"` +} + +func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + lbCfg := &LBConfig{ + // Default top layer values. + SubsetSize: 10, + } + + if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well. + return nil, fmt.Errorf("subsetting: unable to unmarshal LBconfig: %s, error: %v", string(s), err) + } + + // if someonw needs subsetSize == 1, he should use pick_first instead + if lbCfg.SubsetSize < 2 { + return nil, errors.New("subsetting: subsetSize must be >= 2") + } + + return lbCfg, nil +} + +func (bb) Name() string { + return Name +} + +type subsettingBalancer struct { + cc balancer.ClientConn + logger *internalgrpclog.PrefixLogger + cfg *LBConfig + hashf *xxhash.Digest + child *gracefulswitch.Balancer +} + +func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + // Reject whole config if child policy doesn't exist, don't persist it for + // later. + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("subsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) + } + + if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + err := b.child.SwitchTo(bb) + if err != nil { + return fmt.Errorf("subsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) + } + } + b.cfg = lbCfg + + err := b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: b.prepareChildResolverState(s.ResolverState), + BalancerConfig: b.cfg.ChildPolicy.Config, + }) + + return err +} + +type AddressWithHash struct { + hash uint64 + addr resolver.Address +} + +// implements the subsetting algorithm, as described in A68: https://github.com/grpc/proposal/pull/423 +func (b *subsettingBalancer) prepareChildResolverState(s resolver.State) resolver.State { + addresses := s.Addresses + backendCount := len(addresses) + if backendCount <= int(b.cfg.SubsetSize) { + return s + } + + addressesSet := make([]AddressWithHash, backendCount) + // calculate hash for each endpoint + for i, endpoint := range addresses { + + b.hashf.Write([]byte(s.Addresses[0].String())) + addressesSet[i] = AddressWithHash{ + hash: b.hashf.Sum64(), + addr: endpoint, + } + } + // sort addresses by hash + sort.Slice(addressesSet, func(i, j int) bool { + return addressesSet[i].hash < addressesSet[j].hash + }) + + b.logger.Infof("resulting subset: %v", addressesSet[:b.cfg.SubsetSize]) + + // Convert back to resolver.addresses + addressesSubset := make([]resolver.Address, b.cfg.SubsetSize) + for _, eh := range addressesSet[:b.cfg.SubsetSize] { + addressesSubset = append(addressesSubset, eh.addr) + } + + return resolver.State{ + Addresses: addressesSubset, + ServiceConfig: s.ServiceConfig, + Attributes: s.Attributes, + } +} + +func (b *subsettingBalancer) ResolverError(err error) { + b.child.ResolverError(err) +} + +func (b *subsettingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.child.UpdateSubConnState(sc, state) +} + +func (b *subsettingBalancer) Close() { + b.child.Close() +} + +func (b *subsettingBalancer) ExitIdle() { + b.child.ExitIdle() +} diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go new file mode 100644 index 000000000000..c75ebd1e3efb --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -0,0 +1,222 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package e2e_test contains e2e test cases for the Subsetting LB Policy. +package randomsubsetting_test + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/balancer/randomsubsetting" +) + +var defaultTestTimeout = 5 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func setupBackends(t *testing.T, backendsCount int) ([]resolver.Address, func()) { + t.Helper() + + backends := make([]*stubserver.StubServer, backendsCount) + addresses := make([]resolver.Address, backendsCount) + for i := 0; i < backendsCount; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started good TestService backend at: %q", backend.Address) + backends[i] = backend + addresses[i] = resolver.Address{ + Addr: backend.Address, + } + } + + cancel := func() { + for _, backend := range backends { + backend.Stop() + } + } + return addresses, cancel +} + +func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []resolver.Address) ([]testgrpc.TestServiceClient, func()) { + t.Helper() + + clients := make([]testgrpc.TestServiceClient, clientsCount) + ccs := make([]*grpc.ClientConn, clientsCount) + var err error + + for i := 0; i < clientsCount; i++ { + mr := manual.NewBuilderWithScheme("subsetting-e2e") + jsonConfig := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "random_subsetting": { + "subset_size": %d, + "child_policy": [{"round_robin": {}}] + } + } + ] + }`, i, subsetSize) + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(string(jsonConfig)) + mr.InitialState(resolver.State{ + Addresses: addresses, + ServiceConfig: sc, + }) + + ccs[i], err = grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + clients[i] = testgrpc.NewTestServiceClient(ccs[i]) + } + + cancel := func() { + for _, cc := range ccs { + cc.Close() + } + } + return clients, cancel +} + +func checkRoundRobinRPCs(t *testing.T, ctx context.Context, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) { + clientsPerBackend := map[string]map[int]struct{}{} + + for clientIdx, client := range clients { + // make sure that every client send exactly 1 request to each server in its subset + for i := 0; i < subsetSize; i++ { + var peer peer.Peer + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) + if err != nil { + t.Fatalf("failed to call server: %v", err) + } + if peer.Addr != nil { + if m, ok := clientsPerBackend[peer.Addr.String()]; !ok { + clientsPerBackend[peer.Addr.String()] = map[int]struct{}{clientIdx: {}} + } else if _, ok := m[clientIdx]; !ok { + m[clientIdx] = struct{}{} + } else { + // The backend receives a second request from the same client. This could happen if the client have 1 backend in READY + // state while the other are CONNECTING. In this case round_robbin will pick the same address twice. + // We are going to retry after short timeout. + time.Sleep(10 * time.Microsecond) + i-- + } + } else { + t.Fatalf("peer.Addr == nil, peer: %v", peer) + } + } + } + + minClientsPerBackend := math.MaxInt + maxClientsPerBackend := 0 + for _, v := range clientsPerBackend { + if len(v) < minClientsPerBackend { + minClientsPerBackend = len(v) + } + if len(v) > maxClientsPerBackend { + maxClientsPerBackend = len(v) + } + } + + if maxClientsPerBackend > minClientsPerBackend+maxDiff { + t.Fatalf("the difference between min and max clients per backend should be <= %d, clientsPerBackend: %v", maxDiff, clientsPerBackend) + } +} + +func (s) TestSubsettingE2E(t *testing.T) { + tests := []struct { + name string + subsetSize int + clients int + backends int + maxDiff int + }{ + { + name: "backends could be evenly distributed between clients", + backends: 12, + clients: 8, + subsetSize: 3, + maxDiff: 0, + }, + { + name: "backends could NOT be evenly distributed between clients", + backends: 37, + clients: 22, + subsetSize: 5, + maxDiff: 2, + }, + { + name: "Nbackends %% subsetSize == 0, but there are not enough clients to fill the last round", + backends: 20, + clients: 7, + subsetSize: 5, + maxDiff: 1, + }, + { + name: "last round is completely filled, but there are some excluded backends on every round", + backends: 21, + clients: 8, + subsetSize: 5, + maxDiff: 1, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + addresses, stopBackends := setupBackends(t, test.backends) + defer stopBackends() + + clients, stopClients := setupClients(t, test.clients, test.subsetSize, addresses) + defer stopClients() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + checkRoundRobinRPCs(t, ctx, clients, test.subsetSize, test.maxDiff) + }) + } +} From e8ff4a1ad76c72e0bd3c94c9a4a770f1d6f06b1c Mon Sep 17 00:00:00 2001 From: marek-szews Date: Tue, 14 Oct 2025 17:28:25 +0000 Subject: [PATCH 2/7] Change a coments --- balancer/randomsubsetting/randomsubsetting_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go index c75ebd1e3efb..77be56dd48be 100644 --- a/balancer/randomsubsetting/randomsubsetting_test.go +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -1,6 +1,6 @@ /* * - * Copyright 2019 gRPC authors. + * Copyright 2025 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,7 +100,7 @@ func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []re } } ] - }`, i, subsetSize) + }`, subsetSize) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(string(jsonConfig)) mr.InitialState(resolver.State{ From 3c3f093aa941fc36790fd91efcca9a4ea6e8a21f Mon Sep 17 00:00:00 2001 From: marek-szews Date: Fri, 31 Oct 2025 12:41:43 +0000 Subject: [PATCH 3/7] rework for A68 --- balancer/randomsubsetting/randomsubsetting.go | 105 +++++++++--------- .../randomsubsetting/randomsubsetting_test.go | 89 +++++++++++++-- 2 files changed, 134 insertions(+), 60 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go index 9b3a9ed54948..627fc3892a33 100644 --- a/balancer/randomsubsetting/randomsubsetting.go +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -24,10 +24,10 @@ package randomsubsetting import ( + "cmp" "encoding/json" - "errors" "fmt" - "sort" + "slices" "time" "github.com/cespare/xxhash/v2" @@ -72,28 +72,38 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba return b } -// LBConfig is the config for the outlier detection balancer. +// LBConfig is the config for the random subsetting balancer. type LBConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` - SubsetSize uint64 `json:"subset_size,omitempty"` - + SubsetSize uint64 `json:"subset_size,omitempty"` ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"` } func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { lbCfg := &LBConfig{ - // Default top layer values. - SubsetSize: 10, + SubsetSize: 2, // default value + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, } if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well. - return nil, fmt.Errorf("subsetting: unable to unmarshal LBconfig: %s, error: %v", string(s), err) + return nil, fmt.Errorf("randomsubsetting: unable to unmarshal LBConfig: %s, error: %v", string(s), err) } - // if someonw needs subsetSize == 1, he should use pick_first instead + // if someone needs SubsetSize == 1, he should use pick_first instead if lbCfg.SubsetSize < 2 { - return nil, errors.New("subsetting: subsetSize must be >= 2") + return nil, fmt.Errorf("randomsubsetting: SubsetSize must be >= 2") + } + + if lbCfg.ChildPolicy == nil { + return nil, fmt.Errorf("randomsubsetting: child policy field must be set") + } + + // Reject whole config if child policy doesn't exist, don't persist it for + // later. + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return nil, fmt.Errorf("randomsubsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) } return lbCfg, nil @@ -114,73 +124,68 @@ type subsettingBalancer struct { func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { lbCfg, ok := s.BalancerConfig.(*LBConfig) if !ok { - b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + b.logger.Errorf("randomsubsetting: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) return balancer.ErrBadResolverState } - // Reject whole config if child policy doesn't exist, don't persist it for - // later. - bb := balancer.Get(lbCfg.ChildPolicy.Name) - if bb == nil { - return fmt.Errorf("subsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) - } - if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - err := b.child.SwitchTo(bb) - if err != nil { - return fmt.Errorf("subsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) + + if err := b.child.SwitchTo(balancer.Get(lbCfg.ChildPolicy.Name)); err != nil { + return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } } b.cfg = lbCfg - err := b.child.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: b.prepareChildResolverState(s.ResolverState), + return b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: b.prepareChildResolverState(s), BalancerConfig: b.cfg.ChildPolicy.Config, }) - - return err } -type AddressWithHash struct { +type endpointWithHash struct { hash uint64 - addr resolver.Address + ep resolver.Endpoint } -// implements the subsetting algorithm, as described in A68: https://github.com/grpc/proposal/pull/423 -func (b *subsettingBalancer) prepareChildResolverState(s resolver.State) resolver.State { - addresses := s.Addresses - backendCount := len(addresses) - if backendCount <= int(b.cfg.SubsetSize) { - return s +// implements the subsetting algorithm, +// as described in A68: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md +func (b *subsettingBalancer) prepareChildResolverState(s balancer.ClientConnState) resolver.State { + subsetSize := b.cfg.SubsetSize + endPoints := s.ResolverState.Endpoints + backendCount := len(endPoints) + if backendCount <= int(subsetSize) || subsetSize < 2 { + return s.ResolverState } - addressesSet := make([]AddressWithHash, backendCount) // calculate hash for each endpoint - for i, endpoint := range addresses { - - b.hashf.Write([]byte(s.Addresses[0].String())) - addressesSet[i] = AddressWithHash{ + endpointSet := make([]endpointWithHash, backendCount) + for i, endpoint := range endPoints { + b.hashf.Write([]byte(endpoint.Addresses[0].String())) + endpointSet[i] = endpointWithHash{ hash: b.hashf.Sum64(), - addr: endpoint, + ep: endpoint, } } - // sort addresses by hash - sort.Slice(addressesSet, func(i, j int) bool { - return addressesSet[i].hash < addressesSet[j].hash + + // sort endpoint by hash + slices.SortFunc(endpointSet, func(a, b endpointWithHash) int { + return cmp.Compare(a.hash, b.hash) }) - b.logger.Infof("resulting subset: %v", addressesSet[:b.cfg.SubsetSize]) + if b.logger.V(2) { + b.logger.Infof("randomsubsetting: resulting subset: %v", endpointSet[:subsetSize]) + } - // Convert back to resolver.addresses - addressesSubset := make([]resolver.Address, b.cfg.SubsetSize) - for _, eh := range addressesSet[:b.cfg.SubsetSize] { - addressesSubset = append(addressesSubset, eh.addr) + // Convert back to resolver.Endpoints + endpointSubset := make([]resolver.Endpoint, subsetSize) + for i, endpoint := range endpointSet[:subsetSize] { + endpointSubset[i] = endpoint.ep } return resolver.State{ - Addresses: addressesSubset, - ServiceConfig: s.ServiceConfig, - Attributes: s.Attributes, + Endpoints: endpointSubset, + ServiceConfig: s.ResolverState.ServiceConfig, + Attributes: s.ResolverState.Attributes, } } diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go index 77be56dd48be..b2f42d2a5180 100644 --- a/balancer/randomsubsetting/randomsubsetting_test.go +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -17,15 +17,18 @@ */ // Package e2e_test contains e2e test cases for the Subsetting LB Policy. -package randomsubsetting_test +package randomsubsetting import ( "context" + "encoding/json" "fmt" "math" + "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" @@ -36,13 +39,12 @@ import ( "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" - - _ "google.golang.org/grpc/balancer/randomsubsetting" ) -var defaultTestTimeout = 5 * time.Second +var defaultTestTimeout = 120 * time.Second type s struct { grpctest.Tester @@ -52,6 +54,66 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +func (s) TestParseConfig(t *testing.T) { + parser := bb{} + tests := []struct { + name string + input string + wantCfg serviceconfig.LoadBalancingConfig + wantErr string + }{ + { + name: "happy-case-default", + input: `{}`, + wantCfg: &LBConfig{ + SubsetSize: 2, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + }, + }, + { + name: "happy-case-subset_size-set", + input: `{ "subset_size": 3 }`, + wantCfg: &LBConfig{ + SubsetSize: 3, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + }, + }, + { + name: "subset_size-less-than-2", + input: `{ "subset_size": 1, + "child_policy": [{"round_robin": {}}]}`, + wantErr: "randomsubsetting: SubsetSize must be >= 2", + }, + { + name: "invalid-json", + input: "{{invalidjson{{", + wantErr: "invalid character", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input)) + // Substring match makes this very tightly coupled to the + // internalserviceconfig.BalancerConfig error strings. However, it + // is important to distinguish the different types of error messages + // possible as the parser has a few defined buckets of ways it can + // error out. + if (gotErr != nil) != (test.wantErr != "") { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if test.wantErr != "" { + return + } + if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" { + t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff) + } + }) + } +} + func setupBackends(t *testing.T, backendsCount int) ([]resolver.Address, func()) { t.Helper() @@ -123,7 +185,7 @@ func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []re return clients, cancel } -func checkRoundRobinRPCs(t *testing.T, ctx context.Context, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) { +func checkRoundRobinRPCs(ctx context.Context, t *testing.T, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) { clientsPerBackend := map[string]map[int]struct{}{} for clientIdx, client := range clients { @@ -176,33 +238,40 @@ func (s) TestSubsettingE2E(t *testing.T) { backends int maxDiff int }{ + { + name: "backends could be evenly distributed between small number of clients", + backends: 3, + clients: 2, + subsetSize: 2, + maxDiff: 1, + }, { name: "backends could be evenly distributed between clients", backends: 12, clients: 8, subsetSize: 3, - maxDiff: 0, + maxDiff: 3, }, { name: "backends could NOT be evenly distributed between clients", backends: 37, clients: 22, subsetSize: 5, - maxDiff: 2, + maxDiff: 15, }, { name: "Nbackends %% subsetSize == 0, but there are not enough clients to fill the last round", backends: 20, clients: 7, subsetSize: 5, - maxDiff: 1, + maxDiff: 20, }, { name: "last round is completely filled, but there are some excluded backends on every round", backends: 21, clients: 8, subsetSize: 5, - maxDiff: 1, + maxDiff: 3, }, } for _, test := range tests { @@ -216,7 +285,7 @@ func (s) TestSubsettingE2E(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - checkRoundRobinRPCs(t, ctx, clients, test.subsetSize, test.maxDiff) + checkRoundRobinRPCs(ctx, t, clients, test.subsetSize, test.maxDiff) }) } } From c4f8448cdc4148e2d7f002baee51bf634cba178a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 13 Nov 2025 21:00:18 +0000 Subject: [PATCH 4/7] embed the gracefulswitch.Balancer in subsettingBalancer --- balancer/randomsubsetting/randomsubsetting.go | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go index 627fc3892a33..bf331fc30489 100644 --- a/balancer/randomsubsetting/randomsubsetting.go +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -61,14 +61,13 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &subsettingBalancer{ - cc: cc, - hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), + Balancer: gracefulswitch.NewBalancer(cc, bOpts), + hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), } // Create a logger with a prefix specific to this balancer instance. b.logger = prefixLogger(b) b.logger.Infof("Created") - b.child = gracefulswitch.NewBalancer(cc, bOpts) return b } @@ -114,11 +113,11 @@ func (bb) Name() string { } type subsettingBalancer struct { - cc balancer.ClientConn + *gracefulswitch.Balancer + logger *internalgrpclog.PrefixLogger cfg *LBConfig hashf *xxhash.Digest - child *gracefulswitch.Balancer } func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -130,13 +129,13 @@ func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) e if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - if err := b.child.SwitchTo(balancer.Get(lbCfg.ChildPolicy.Name)); err != nil { + if err := b.Balancer.SwitchTo(balancer.Get(lbCfg.ChildPolicy.Name)); err != nil { return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } } b.cfg = lbCfg - return b.child.UpdateClientConnState(balancer.ClientConnState{ + return b.Balancer.UpdateClientConnState(balancer.ClientConnState{ ResolverState: b.prepareChildResolverState(s), BalancerConfig: b.cfg.ChildPolicy.Config, }) @@ -188,19 +187,3 @@ func (b *subsettingBalancer) prepareChildResolverState(s balancer.ClientConnStat Attributes: s.ResolverState.Attributes, } } - -func (b *subsettingBalancer) ResolverError(err error) { - b.child.ResolverError(err) -} - -func (b *subsettingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - b.child.UpdateSubConnState(sc, state) -} - -func (b *subsettingBalancer) Close() { - b.child.Close() -} - -func (b *subsettingBalancer) ExitIdle() { - b.child.ExitIdle() -} From f5772182c853b1521f1281deb90903c494daa6cd Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 13 Nov 2025 21:07:34 +0000 Subject: [PATCH 5/7] use camelCase instead of snake_case in json annotations for LB config fields --- balancer/randomsubsetting/randomsubsetting.go | 4 ++-- balancer/randomsubsetting/randomsubsetting_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go index bf331fc30489..c893a5c653ab 100644 --- a/balancer/randomsubsetting/randomsubsetting.go +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -75,8 +75,8 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba type LBConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` - SubsetSize uint64 `json:"subset_size,omitempty"` - ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"` + SubsetSize uint64 `json:"subsetSize,omitempty"` + ChildPolicy *iserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` } func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go index b2f42d2a5180..63f1560e61ce 100644 --- a/balancer/randomsubsetting/randomsubsetting_test.go +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -72,7 +72,7 @@ func (s) TestParseConfig(t *testing.T) { }, { name: "happy-case-subset_size-set", - input: `{ "subset_size": 3 }`, + input: `{ "subsetSize": 3 }`, wantCfg: &LBConfig{ SubsetSize: 3, ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, @@ -80,8 +80,8 @@ func (s) TestParseConfig(t *testing.T) { }, { name: "subset_size-less-than-2", - input: `{ "subset_size": 1, - "child_policy": [{"round_robin": {}}]}`, + input: `{ "subsetSize": 1, + "childPolicy": [{"round_robin": {}}]}`, wantErr: "randomsubsetting: SubsetSize must be >= 2", }, { From ddc8d0925e0aab89fde99fc6cef398bfa1394f69 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 13 Nov 2025 21:13:54 +0000 Subject: [PATCH 6/7] do not use the package name prefix in log messages, as we already use a prefix logger --- balancer/randomsubsetting/randomsubsetting.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go index c893a5c653ab..12a9baab3071 100644 --- a/balancer/randomsubsetting/randomsubsetting.go +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -123,7 +123,7 @@ type subsettingBalancer struct { func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { lbCfg, ok := s.BalancerConfig.(*LBConfig) if !ok { - b.logger.Errorf("randomsubsetting: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) return balancer.ErrBadResolverState } @@ -172,7 +172,7 @@ func (b *subsettingBalancer) prepareChildResolverState(s balancer.ClientConnStat }) if b.logger.V(2) { - b.logger.Infof("randomsubsetting: resulting subset: %v", endpointSet[:subsetSize]) + b.logger.Infof("Resulting subset: %v", endpointSet[:subsetSize]) } // Convert back to resolver.Endpoints From 74cfecd18f7a52e1f1bece343317db8e1e4eb5ed Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 13 Nov 2025 21:34:53 +0000 Subject: [PATCH 7/7] improvements to ParseConfig and its tests --- balancer/randomsubsetting/randomsubsetting.go | 53 +++++++------------ .../randomsubsetting/randomsubsetting_test.go | 44 ++++++++------- 2 files changed, 43 insertions(+), 54 deletions(-) diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go index 12a9baab3071..888b0d00c8dc 100644 --- a/balancer/randomsubsetting/randomsubsetting.go +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -16,9 +16,10 @@ * */ -// Package randomsubsetting defines a random subsetting balancer. +// Package randomsubsetting implements the random_subsetting LB policy specified +// here: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md // -// To install random subsetting balancer, import this package as: +// To install the LB policy, import this package as: // // import _ "google.golang.org/grpc/balancer/randomsubsetting" package randomsubsetting @@ -30,7 +31,7 @@ import ( "slices" "time" - "github.com/cespare/xxhash/v2" + xxhash "github.com/cespare/xxhash/v2" "google.golang.org/grpc/balancer" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/balancer/gracefulswitch" @@ -40,14 +41,10 @@ import ( "google.golang.org/grpc/serviceconfig" ) -const ( - // Name is the name of the random subsetting load balancer. - Name = "random_subsetting" -) +// Name is the name of the random subsetting load balancer. +const Name = "random_subsetting" -var ( - logger = grpclog.Component(Name) -) +var logger = grpclog.Component(Name) func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger { return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p)) @@ -64,15 +61,12 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba Balancer: gracefulswitch.NewBalancer(cc, bOpts), hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), } - // Create a logger with a prefix specific to this balancer instance. b.logger = prefixLogger(b) - b.logger.Infof("Created") return b } -// LBConfig is the config for the random subsetting balancer. -type LBConfig struct { +type lbConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` SubsetSize uint64 `json:"subsetSize,omitempty"` @@ -80,29 +74,18 @@ type LBConfig struct { } func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - lbCfg := &LBConfig{ - SubsetSize: 2, // default value - ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, - } + lbCfg := &lbConfig{} - if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well. - return nil, fmt.Errorf("randomsubsetting: unable to unmarshal LBConfig: %s, error: %v", string(s), err) + // Ensure that the specified child policy is registered and validates its + // config, if present. + if err := json.Unmarshal(s, lbCfg); err != nil { + return nil, fmt.Errorf("randomsubsetting: unmarshaling configuration: %s, failed: %v", string(s), err) } - - // if someone needs SubsetSize == 1, he should use pick_first instead - if lbCfg.SubsetSize < 2 { - return nil, fmt.Errorf("randomsubsetting: SubsetSize must be >= 2") + if lbCfg.SubsetSize == 0 { + return nil, fmt.Errorf("randomsubsetting: SubsetSize must be greater than 0") } - if lbCfg.ChildPolicy == nil { - return nil, fmt.Errorf("randomsubsetting: child policy field must be set") - } - - // Reject whole config if child policy doesn't exist, don't persist it for - // later. - bb := balancer.Get(lbCfg.ChildPolicy.Name) - if bb == nil { - return nil, fmt.Errorf("randomsubsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) + return nil, fmt.Errorf("randomsubsetting: ChildPolicy must be specified") } return lbCfg, nil @@ -116,12 +99,12 @@ type subsettingBalancer struct { *gracefulswitch.Balancer logger *internalgrpclog.PrefixLogger - cfg *LBConfig + cfg *lbConfig hashf *xxhash.Digest } func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - lbCfg, ok := s.BalancerConfig.(*LBConfig) + lbCfg, ok := s.BalancerConfig.(*lbConfig) if !ok { b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) return balancer.ErrBadResolverState diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go index 63f1560e61ce..5cf72a904c0d 100644 --- a/balancer/randomsubsetting/randomsubsetting_test.go +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -63,31 +63,37 @@ func (s) TestParseConfig(t *testing.T) { wantErr string }{ { - name: "happy-case-default", - input: `{}`, - wantCfg: &LBConfig{ - SubsetSize: 2, - ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, - }, + name: "invalid-json", + input: "{{invalidjson{{", + wantErr: "invalid character", }, { - name: "happy-case-subset_size-set", - input: `{ "subsetSize": 3 }`, - wantCfg: &LBConfig{ - SubsetSize: 3, - ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, - }, + name: "empty_config", + input: `{}`, + wantErr: "SubsetSize must be greater than 0", }, { - name: "subset_size-less-than-2", - input: `{ "subsetSize": 1, - "childPolicy": [{"round_robin": {}}]}`, - wantErr: "randomsubsetting: SubsetSize must be >= 2", + name: "subset_size_zero", + input: `{ "subsetSize": 0 }`, + wantErr: "SubsetSize must be greater than 0", }, { - name: "invalid-json", - input: "{{invalidjson{{", - wantErr: "invalid character", + name: "child_policy_missing", + input: `{ "subsetSize": 1 }`, + wantErr: "ChildPolicy must be specified", + }, + { + name: "child_policy_not_registered", + input: `{ "subsetSize": 1 , "childPolicy": [{"unregistered_lb": {}}] }`, + wantErr: "no supported policies found", + }, + { + name: "success", + input: `{ "subsetSize": 3, "childPolicy": [{"round_robin": {}}]}`, + wantCfg: &lbConfig{ + SubsetSize: 3, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + }, }, } for _, test := range tests {