Skip to content

Commit 856a208

Browse files
committed
improve code readability
- replace two similar functions `appendUniqueNode` and `appendIfNotExists` with a generic function. - simplify the implementation of the `get` method in `clusterNodes` - minimize line breaks in function definitions. - keep the member name `_generation` of `clusterNodes` consistent with other types. Signed-off-by: Xiaolong Chen <[email protected]>
1 parent 7bc12bb commit 856a208

File tree

7 files changed

+37
-91
lines changed

7 files changed

+37
-91
lines changed

geo_commands.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ func (c cmdable) GeoRadiusStore(
6161
}
6262

6363
// GeoRadiusByMember is a read-only GEORADIUSBYMEMBER_RO command.
64-
func (c cmdable) GeoRadiusByMember(
65-
ctx context.Context, key, member string, query *GeoRadiusQuery,
66-
) *GeoLocationCmd {
64+
func (c cmdable) GeoRadiusByMember(ctx context.Context, key, member string, query *GeoRadiusQuery) *GeoLocationCmd {
6765
cmd := NewGeoLocationCmd(ctx, query, "georadiusbymember_ro", key, member)
6866
if query.Store != "" || query.StoreDist != "" {
6967
cmd.SetErr(errors.New("GeoRadiusByMember does not support Store or StoreDist"))
@@ -74,9 +72,7 @@ func (c cmdable) GeoRadiusByMember(
7472
}
7573

7674
// GeoRadiusByMemberStore is a writing GEORADIUSBYMEMBER command.
77-
func (c cmdable) GeoRadiusByMemberStore(
78-
ctx context.Context, key, member string, query *GeoRadiusQuery,
79-
) *IntCmd {
75+
func (c cmdable) GeoRadiusByMemberStore(ctx context.Context, key, member string, query *GeoRadiusQuery) *IntCmd {
8076
args := geoLocationArgs(query, "georadiusbymember", key, member)
8177
cmd := NewIntCmd(ctx, args...)
8278
if query.Store == "" && query.StoreDist == "" {
@@ -96,9 +92,7 @@ func (c cmdable) GeoSearch(ctx context.Context, key string, q *GeoSearchQuery) *
9692
return cmd
9793
}
9894

99-
func (c cmdable) GeoSearchLocation(
100-
ctx context.Context, key string, q *GeoSearchLocationQuery,
101-
) *GeoSearchLocationCmd {
95+
func (c cmdable) GeoSearchLocation(ctx context.Context, key string, q *GeoSearchLocationQuery) *GeoSearchLocationCmd {
10296
args := make([]interface{}, 0, 16)
10397
args = append(args, "geosearch", key)
10498
args = geoSearchLocationArgs(q, args)
@@ -119,9 +113,7 @@ func (c cmdable) GeoSearchStore(ctx context.Context, key, store string, q *GeoSe
119113
return cmd
120114
}
121115

122-
func (c cmdable) GeoDist(
123-
ctx context.Context, key string, member1, member2, unit string,
124-
) *FloatCmd {
116+
func (c cmdable) GeoDist(ctx context.Context, key string, member1, member2, unit string) *FloatCmd {
125117
if unit == "" {
126118
unit = "km"
127119
}

internal/pool/conn.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ func (cn *Conn) RemoteAddr() net.Addr {
6969
return nil
7070
}
7171

72-
func (cn *Conn) WithReader(
73-
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
74-
) error {
72+
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
7573
if timeout >= 0 {
7674
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
7775
return err
@@ -80,9 +78,7 @@ func (cn *Conn) WithReader(
8078
return fn(cn.rd)
8179
}
8280

83-
func (cn *Conn) WithWriter(
84-
ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
85-
) error {
81+
func (cn *Conn) WithWriter(ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error) error {
8682
if timeout >= 0 {
8783
if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
8884
return err

osscluster.go

Lines changed: 22 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,7 @@ type clusterNode struct {
342342
failing uint32 // atomic
343343
loaded uint32 // atomic
344344

345-
// last time the latency measurement was performed for the node, stored in nanoseconds
346-
// from epoch
345+
// last time the latency measurement was performed for the node, stored in nanoseconds from epoch
347346
lastLatencyMeasurement int64 // atomic
348347
}
349348

@@ -480,13 +479,12 @@ type clusterNodes struct {
480479
closed bool
481480
onNewNode []func(rdb *Client)
482481

483-
_generation uint32 // atomic
482+
generation uint32 // atomic
484483
}
485484

486485
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
487486
return &clusterNodes{
488-
opt: opt,
489-
487+
opt: opt,
490488
addrs: opt.Addrs,
491489
nodes: make(map[string]*clusterNode),
492490
}
@@ -546,12 +544,11 @@ func (c *clusterNodes) Addrs() ([]string, error) {
546544
}
547545

548546
func (c *clusterNodes) NextGeneration() uint32 {
549-
return atomic.AddUint32(&c._generation, 1)
547+
return atomic.AddUint32(&c.generation, 1)
550548
}
551549

552550
// GC removes unused nodes.
553551
func (c *clusterNodes) GC(generation uint32) {
554-
//nolint:prealloc
555552
var collected []*clusterNode
556553

557554
c.mu.Lock()
@@ -604,23 +601,20 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
604601
fn(node.Client)
605602
}
606603

607-
c.addrs = appendIfNotExists(c.addrs, addr)
604+
c.addrs = appendIfNotExist(c.addrs, addr)
608605
c.nodes[addr] = node
609606

610607
return node, nil
611608
}
612609

613610
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
614-
var node *clusterNode
615-
var err error
616611
c.mu.RLock()
612+
defer c.mu.RUnlock()
613+
617614
if c.closed {
618-
err = pool.ErrClosed
619-
} else {
620-
node = c.nodes[addr]
615+
return nil, pool.ErrClosed
621616
}
622-
c.mu.RUnlock()
623-
return node, err
617+
return c.nodes[addr], nil
624618
}
625619

626620
func (c *clusterNodes) All() ([]*clusterNode, error) {
@@ -651,8 +645,9 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
651645
//------------------------------------------------------------------------------
652646

653647
type clusterSlot struct {
654-
start, end int
655-
nodes []*clusterNode
648+
start int
649+
end int
650+
nodes []*clusterNode
656651
}
657652

658653
type clusterSlotSlice []*clusterSlot
@@ -680,9 +675,7 @@ type clusterState struct {
680675
createdAt time.Time
681676
}
682677

683-
func newClusterState(
684-
nodes *clusterNodes, slots []ClusterSlot, origin string,
685-
) (*clusterState, error) {
678+
func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) {
686679
c := clusterState{
687680
nodes: nodes,
688681

@@ -712,9 +705,9 @@ func newClusterState(
712705
nodes = append(nodes, node)
713706

714707
if i == 0 {
715-
c.Masters = appendUniqueNode(c.Masters, node)
708+
c.Masters = appendIfNotExist(c.Masters, node)
716709
} else {
717-
c.Slaves = appendUniqueNode(c.Slaves, node)
710+
c.Slaves = appendIfNotExist(c.Slaves, node)
718711
}
719712
}
720713

@@ -1273,7 +1266,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
12731266
continue
12741267
}
12751268

1276-
return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1269+
return newClusterState(c.nodes, slots, addr)
12771270
}
12781271

12791272
/*
@@ -1932,11 +1925,7 @@ func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
19321925
return hashtag.Slot(firstKey)
19331926
}
19341927

1935-
func (c *ClusterClient) cmdNode(
1936-
ctx context.Context,
1937-
cmdName string,
1938-
slot int,
1939-
) (*clusterNode, error) {
1928+
func (c *ClusterClient) cmdNode(ctx context.Context, cmdName string, slot int) (*clusterNode, error) {
19401929
state, err := c.state.Get(ctx)
19411930
if err != nil {
19421931
return nil, err
@@ -2005,26 +1994,13 @@ func (c *ClusterClient) context(ctx context.Context) context.Context {
20051994
return context.Background()
20061995
}
20071996

2008-
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
2009-
for _, n := range nodes {
2010-
if n == node {
2011-
return nodes
2012-
}
2013-
}
2014-
return append(nodes, node)
2015-
}
2016-
2017-
func appendIfNotExists(ss []string, es ...string) []string {
2018-
loop:
2019-
for _, e := range es {
2020-
for _, s := range ss {
2021-
if s == e {
2022-
continue loop
2023-
}
1997+
func appendIfNotExist[T comparable](vals []T, newVal T) []T {
1998+
for _, v := range vals {
1999+
if v == newVal {
2000+
return vals
20242001
}
2025-
ss = append(ss, e)
20262002
}
2027-
return ss
2003+
return append(vals, newVal)
20282004
}
20292005

20302006
//------------------------------------------------------------------------------

redis.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
383383

384384
// for redis-server versions that do not support the HELLO command,
385385
// RESP2 will continue to be used.
386-
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
386+
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
387387
// Authentication successful with HELLO command
388388
} else if !isRedisError(err) {
389389
// When the server responds with the RESP protocol and the result is not a normal
@@ -460,9 +460,7 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
460460
}
461461
}
462462

463-
func (c *baseClient) withConn(
464-
ctx context.Context, fn func(context.Context, *pool.Conn) error,
465-
) error {
463+
func (c *baseClient) withConn(ctx context.Context, fn func(context.Context, *pool.Conn) error) error {
466464
cn, err := c.getConn(ctx)
467465
if err != nil {
468466
return err
@@ -610,9 +608,7 @@ func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error
610608

611609
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
612610

613-
func (c *baseClient) generalProcessPipeline(
614-
ctx context.Context, cmds []Cmder, p pipelineProcessor,
615-
) error {
611+
func (c *baseClient) generalProcessPipeline(ctx context.Context, cmds []Cmder, p pipelineProcessor) error {
616612
var lastErr error
617613
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
618614
if attempt > 0 {
@@ -636,9 +632,7 @@ func (c *baseClient) generalProcessPipeline(
636632
return lastErr
637633
}
638634

639-
func (c *baseClient) pipelineProcessCmds(
640-
ctx context.Context, cn *pool.Conn, cmds []Cmder,
641-
) (bool, error) {
635+
func (c *baseClient) pipelineProcessCmds(ctx context.Context, cn *pool.Conn, cmds []Cmder) (bool, error) {
642636
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
643637
return writeCmds(wr, cmds)
644638
}); err != nil {
@@ -668,9 +662,7 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
668662
return cmds[0].Err()
669663
}
670664

671-
func (c *baseClient) txPipelineProcessCmds(
672-
ctx context.Context, cn *pool.Conn, cmds []Cmder,
673-
) (bool, error) {
665+
func (c *baseClient) txPipelineProcessCmds(ctx context.Context, cn *pool.Conn, cmds []Cmder) (bool, error) {
674666
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
675667
return writeCmds(wr, cmds)
676668
}); err != nil {

ring.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -667,10 +667,7 @@ func (c *Ring) OnNewNode(fn func(rdb *Client)) {
667667

668668
// ForEachShard concurrently calls the fn on each live shard in the ring.
669669
// It returns the first error if any.
670-
func (c *Ring) ForEachShard(
671-
ctx context.Context,
672-
fn func(ctx context.Context, client *Client) error,
673-
) error {
670+
func (c *Ring) ForEachShard(ctx context.Context, fn func(ctx context.Context, client *Client) error) error {
674671
// note: `c.List()` return a shadow copy of `[]*ringShard`.
675672
shards := c.sharding.List()
676673
var wg sync.WaitGroup
@@ -779,9 +776,7 @@ func (c *Ring) TxPipeline() Pipeliner {
779776
return &pipe
780777
}
781778

782-
func (c *Ring) generalProcessPipeline(
783-
ctx context.Context, cmds []Cmder, tx bool,
784-
) error {
779+
func (c *Ring) generalProcessPipeline(ctx context.Context, cmds []Cmder, tx bool) error {
785780
if tx {
786781
// Trim multi .. exec.
787782
cmds = cmds[1 : len(cmds)-1]

sentinel.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
447447
return rdb
448448
}
449449

450-
func masterReplicaDialer(
451-
failover *sentinelFailover,
452-
) func(ctx context.Context, network, addr string) (net.Conn, error) {
450+
func masterReplicaDialer(failover *sentinelFailover) func(ctx context.Context, network, addr string) (net.Conn, error) {
453451
return func(ctx context.Context, network, _ string) (net.Conn, error) {
454452
var addr string
455453
var err error

stream_commands.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,7 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
382382
// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
383383
//
384384
// The redis-server version is lower than 6.2, please set limit to 0.
385-
func (c cmdable) xTrim(
386-
ctx context.Context, key, strategy string,
387-
approx bool, threshold interface{}, limit int64,
388-
) *IntCmd {
385+
func (c cmdable) xTrim(ctx context.Context, key, strategy string, approx bool, threshold interface{}, limit int64) *IntCmd {
389386
args := make([]interface{}, 0, 7)
390387
args = append(args, "xtrim", key, strategy)
391388
if approx {

0 commit comments

Comments
 (0)