Skip to content

Commit e30317e

Browse files
zhoushuguang周曙光
andauthored
feat: consistent hash balancer support (#5246)
Co-authored-by: 周曙光 <[email protected]>
1 parent 568f9ce commit e30317e

File tree

7 files changed

+343
-4
lines changed

7 files changed

+343
-4
lines changed

zrpc/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package zrpc
22

33
import (
4+
"context"
5+
"fmt"
46
"time"
57

68
"github.com/zeromicro/go-zero/core/conf"
79
"github.com/zeromicro/go-zero/core/logx"
810
"github.com/zeromicro/go-zero/zrpc/internal"
911
"github.com/zeromicro/go-zero/zrpc/internal/auth"
12+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
13+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
1014
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
1115
"google.golang.org/grpc"
1216
"google.golang.org/grpc/keepalive"
@@ -67,6 +71,9 @@ func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
6771
})))
6872
}
6973

74+
svcCfg := makeLBServiceConfig(c.BalancerName)
75+
opts = append(opts, WithDialOption(grpc.WithDefaultServiceConfig(svcCfg)))
76+
7077
opts = append(opts, options...)
7178

7279
target, err := c.BuildTarget()
@@ -111,7 +118,20 @@ func SetClientSlowThreshold(threshold time.Duration) {
111118
clientinterceptors.SetSlowThreshold(threshold)
112119
}
113120

121+
// SetHashKey sets the hash key into context.
122+
func SetHashKey(ctx context.Context, key string) context.Context {
123+
return consistenthash.SetHashKey(ctx, key)
124+
}
125+
114126
// WithCallTimeout return a call option with given timeout to make a method call.
115127
func WithCallTimeout(timeout time.Duration) grpc.CallOption {
116128
return clientinterceptors.WithCallTimeout(timeout)
117129
}
130+
131+
func makeLBServiceConfig(balancerName string) string {
132+
if len(balancerName) == 0 {
133+
balancerName = p2c.Name
134+
}
135+
136+
return fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, balancerName)
137+
}

zrpc/client_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/zeromicro/go-zero/core/discov"
1313
"github.com/zeromicro/go-zero/core/logx"
1414
"github.com/zeromicro/go-zero/internal/mock"
15+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
16+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
1517
"google.golang.org/grpc"
1618
"google.golang.org/grpc/codes"
1719
"google.golang.org/grpc/credentials/insecure"
@@ -245,3 +247,42 @@ func TestNewClientWithTarget(t *testing.T) {
245247

246248
assert.NotNil(t, err)
247249
}
250+
251+
func TestMakeLBServiceConfig(t *testing.T) {
252+
tests := []struct {
253+
name string
254+
input string
255+
expected string
256+
}{
257+
{
258+
name: "empty name uses default p2c",
259+
input: "",
260+
expected: fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name),
261+
},
262+
{
263+
name: "custom balancer name",
264+
input: "consistent_hash",
265+
expected: `{"loadBalancingPolicy":"consistent_hash"}`,
266+
},
267+
}
268+
269+
for _, tt := range tests {
270+
t.Run(tt.name, func(t *testing.T) {
271+
got := makeLBServiceConfig(tt.input)
272+
if got != tt.expected {
273+
t.Errorf("expected %q, got %q", tt.expected, got)
274+
}
275+
})
276+
}
277+
}
278+
279+
func TestSetHashKey(t *testing.T) {
280+
ctx := context.Background()
281+
key := "abc123"
282+
283+
ctx = SetHashKey(ctx, key)
284+
got := consistenthash.GetHashKey(ctx)
285+
assert.Equal(t, key, got)
286+
287+
assert.Empty(t, consistenthash.GetHashKey(context.Background()))
288+
}

zrpc/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type (
3131
Timeout int64 `json:",default=2000"`
3232
KeepaliveTime time.Duration `json:",optional"`
3333
Middlewares ClientMiddlewaresConf
34+
BalancerName string `json:",default=p2c_ewma"`
3435
}
3536

3637
// A RpcServerConf is a rpc server config.

zrpc/config_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7+
zconf "github.com/zeromicro/go-zero/core/conf"
78
"github.com/zeromicro/go-zero/core/discov"
89
"github.com/zeromicro/go-zero/core/service"
910
"github.com/zeromicro/go-zero/core/stores/redis"
11+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
1012
)
1113

1214
func TestRpcClientConf(t *testing.T) {
@@ -39,6 +41,13 @@ func TestRpcClientConf(t *testing.T) {
3941
_, err := conf.BuildTarget()
4042
assert.Error(t, err)
4143
})
44+
45+
t.Run("default balancer name", func(t *testing.T) {
46+
var conf RpcClientConf
47+
err := zconf.FillDefault(&conf)
48+
assert.NoError(t, err)
49+
assert.Equal(t, p2c.Name, conf.BalancerName)
50+
})
4251
}
4352

4453
func TestRpcServerConf(t *testing.T) {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package consistenthash
2+
3+
import (
4+
"context"
5+
6+
"github.com/zeromicro/go-zero/core/hash"
7+
"google.golang.org/grpc/balancer"
8+
"google.golang.org/grpc/balancer/base"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
)
12+
13+
const (
14+
Name = "consistent_hash"
15+
16+
defaultReplicaCount = 100
17+
)
18+
19+
var emptyPickResult balancer.PickResult
20+
21+
func init() {
22+
balancer.Register(newBuilder())
23+
}
24+
25+
type (
26+
// hashKey is the key type for consistent hash in context.
27+
hashKey struct{}
28+
// pickerBuilder is a builder for picker.
29+
pickerBuilder struct{}
30+
// picker is a picker that uses consistent hash to pick a sub connection.
31+
picker struct {
32+
hashRing *hash.ConsistentHash
33+
conns map[string]balancer.SubConn
34+
}
35+
)
36+
37+
func (b *pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
38+
readySCs := info.ReadySCs
39+
if len(readySCs) == 0 {
40+
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
41+
}
42+
43+
conns := make(map[string]balancer.SubConn, len(readySCs))
44+
hashRing := hash.NewCustomConsistentHash(defaultReplicaCount, hash.Hash)
45+
for conn, connInfo := range readySCs {
46+
addr := connInfo.Address.Addr
47+
conns[addr] = conn
48+
hashRing.Add(addr)
49+
}
50+
51+
return &picker{
52+
hashRing: hashRing,
53+
conns: conns,
54+
}
55+
}
56+
57+
func newBuilder() balancer.Builder {
58+
return base.NewBalancerBuilder(Name, &pickerBuilder{}, base.Config{HealthCheck: true})
59+
}
60+
61+
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
62+
hashKey := GetHashKey(info.Ctx)
63+
if len(hashKey) == 0 {
64+
return emptyPickResult, status.Error(codes.InvalidArgument,
65+
"[consistent_hash] missing hash key in context")
66+
}
67+
68+
if addrAny, ok := p.hashRing.Get(hashKey); ok {
69+
addr, ok := addrAny.(string)
70+
if !ok {
71+
return emptyPickResult, status.Error(codes.Internal,
72+
"[consistent_hash] invalid addr type in consistent hash")
73+
}
74+
75+
subConn, ok := p.conns[addr]
76+
if !ok {
77+
return emptyPickResult, status.Errorf(codes.Internal,
78+
"[consistent_hash] no subConn for addr: %s", addr)
79+
}
80+
81+
return balancer.PickResult{SubConn: subConn}, nil
82+
}
83+
84+
return emptyPickResult, status.Errorf(codes.Unavailable,
85+
"[consistent_hash] no matching conn for hashKey: %s", hashKey)
86+
}
87+
88+
// SetHashKey sets the hash key into context.
89+
func SetHashKey(ctx context.Context, key string) context.Context {
90+
return context.WithValue(ctx, hashKey{}, key)
91+
}
92+
93+
// GetHashKey gets the hash key from context.
94+
func GetHashKey(ctx context.Context) string {
95+
v, _ := ctx.Value(hashKey{}).(string)
96+
return v
97+
}

0 commit comments

Comments
 (0)