Skip to content

Commit 8345485

Browse files
committed
Merge branch 'master' into v9
Signed-off-by: monkey <[email protected]>
2 parents 916da5b + 704212e commit 8345485

21 files changed

+414
-71
lines changed

.github/dependabot.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
version: 2
2+
updates:
3+
- package-ecosystem: gomod
4+
directory: /
5+
schedule:
6+
interval: weekly
7+
- package-ecosystem: github-actions
8+
directory: /
9+
schedule:
10+
interval: weekly

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,11 @@ Lastly, run:
166166
```
167167
go test
168168
```
169+
170+
## Contributors
171+
172+
Thanks to all the people who already contributed!
173+
174+
<a href="https://github.com/go-redis/redis/graphs/contributors">
175+
<img src="https://contributors-img.web.app/image?repo=go-redis/redis" />
176+
</a>

cluster.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type ClusterOptions struct {
6868
ReadTimeout time.Duration
6969
WriteTimeout time.Duration
7070

71+
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
72+
PoolFIFO bool
73+
7174
// PoolSize applies per cluster node and not for the whole cluster.
7275
PoolSize int
7376
MinIdleConns int
@@ -146,6 +149,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
146149
ReadTimeout: opt.ReadTimeout,
147150
WriteTimeout: opt.WriteTimeout,
148151

152+
PoolFIFO: opt.PoolFIFO,
149153
PoolSize: opt.PoolSize,
150154
MinIdleConns: opt.MinIdleConns,
151155
MaxConnAge: opt.MaxConnAge,
@@ -591,8 +595,16 @@ func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
591595
if len(nodes) == 0 {
592596
return c.nodes.Random()
593597
}
594-
n := rand.Intn(len(nodes))
595-
return nodes[n], nil
598+
if len(nodes) == 1 {
599+
return nodes[0], nil
600+
}
601+
randomNodes := rand.Perm(len(nodes))
602+
for _, idx := range randomNodes {
603+
if node := nodes[idx]; !node.Failing() {
604+
return node, nil
605+
}
606+
}
607+
return nodes[randomNodes[0]], nil
596608
}
597609

598610
func (c *clusterState) slotNodes(slot int) []*clusterNode {

cluster_commands.go

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,55 +8,63 @@ import (
88

99
func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
1010
cmd := NewIntCmd(ctx, "dbsize")
11-
var size int64
12-
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
13-
n, err := master.DBSize(ctx).Result()
11+
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
12+
var size int64
13+
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
14+
n, err := master.DBSize(ctx).Result()
15+
if err != nil {
16+
return err
17+
}
18+
atomic.AddInt64(&size, n)
19+
return nil
20+
})
1421
if err != nil {
15-
return err
22+
cmd.SetErr(err)
23+
} else {
24+
cmd.val = size
1625
}
17-
atomic.AddInt64(&size, n)
1826
return nil
1927
})
20-
if err != nil {
21-
cmd.SetErr(err)
22-
return cmd
23-
}
24-
cmd.val = size
2528
return cmd
2629
}
2730

2831
func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
2932
cmd := NewStringCmd(ctx, "script", "load", script)
30-
mu := &sync.Mutex{}
31-
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
32-
val, err := shard.ScriptLoad(ctx, script).Result()
33+
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
34+
mu := &sync.Mutex{}
35+
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
36+
val, err := shard.ScriptLoad(ctx, script).Result()
37+
if err != nil {
38+
return err
39+
}
40+
41+
mu.Lock()
42+
if cmd.Val() == "" {
43+
cmd.val = val
44+
}
45+
mu.Unlock()
46+
47+
return nil
48+
})
3349
if err != nil {
34-
return err
35-
}
36-
37-
mu.Lock()
38-
if cmd.Val() == "" {
39-
cmd.val = val
50+
cmd.SetErr(err)
4051
}
41-
mu.Unlock()
42-
4352
return nil
4453
})
45-
if err != nil {
46-
cmd.SetErr(err)
47-
}
48-
4954
return cmd
5055
}
5156

5257
func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
5358
cmd := NewStatusCmd(ctx, "script", "flush")
54-
_ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
55-
shard.ScriptFlush(ctx)
56-
59+
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
60+
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
61+
return shard.ScriptFlush(ctx).Err()
62+
})
63+
if err != nil {
64+
cmd.SetErr(err)
65+
}
5766
return nil
5867
})
59-
6068
return cmd
6169
}
6270

@@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
7482
result[i] = true
7583
}
7684

77-
mu := &sync.Mutex{}
78-
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
79-
val, err := shard.ScriptExists(ctx, hashes...).Result()
85+
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
86+
mu := &sync.Mutex{}
87+
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
88+
val, err := shard.ScriptExists(ctx, hashes...).Result()
89+
if err != nil {
90+
return err
91+
}
92+
93+
mu.Lock()
94+
for i, v := range val {
95+
result[i] = result[i] && v
96+
}
97+
mu.Unlock()
98+
99+
return nil
100+
})
80101
if err != nil {
81-
return err
82-
}
83-
84-
mu.Lock()
85-
for i, v := range val {
86-
result[i] = result[i] && v
102+
cmd.SetErr(err)
103+
} else {
104+
cmd.val = result
87105
}
88-
mu.Unlock()
89-
90106
return nil
91107
})
92-
if err != nil {
93-
cmd.SetErr(err)
94-
}
95-
96-
cmd.val = result
97-
98108
return cmd
99109
}

commands.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ type Cmdable interface {
190190
LSet(ctx context.Context, key string, index int64, value interface{}) *StatusCmd
191191
LTrim(ctx context.Context, key string, start, stop int64) *StatusCmd
192192
RPop(ctx context.Context, key string) *StringCmd
193+
RPopCount(ctx context.Context, key string, count int) *StringSliceCmd
193194
RPopLPush(ctx context.Context, source, destination string) *StringCmd
194195
RPush(ctx context.Context, key string, values ...interface{}) *IntCmd
195196
RPushX(ctx context.Context, key string, values ...interface{}) *IntCmd
@@ -1452,6 +1453,12 @@ func (c cmdable) RPop(ctx context.Context, key string) *StringCmd {
14521453
return cmd
14531454
}
14541455

1456+
func (c cmdable) RPopCount(ctx context.Context, key string, count int) *StringSliceCmd {
1457+
cmd := NewStringSliceCmd(ctx, "rpop", key, count)
1458+
_ = c(ctx, cmd)
1459+
return cmd
1460+
}
1461+
14551462
func (c cmdable) RPopLPush(ctx context.Context, source, destination string) *StringCmd {
14561463
cmd := NewStringCmd(ctx, "rpoplpush", source, destination)
14571464
_ = c(ctx, cmd)

commands_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2282,6 +2282,20 @@ var _ = Describe("Commands", func() {
22822282
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))
22832283
})
22842284

2285+
It("should RPopCount", func() {
2286+
rPush := client.RPush(ctx, "list", "one", "two", "three", "four")
2287+
Expect(rPush.Err()).NotTo(HaveOccurred())
2288+
Expect(rPush.Val()).To(Equal(int64(4)))
2289+
2290+
rPopCount := client.RPopCount(ctx, "list", 2)
2291+
Expect(rPopCount.Err()).NotTo(HaveOccurred())
2292+
Expect(rPopCount.Val()).To(Equal([]string{"four", "three"}))
2293+
2294+
lRange := client.LRange(ctx, "list", 0, -1)
2295+
Expect(lRange.Err()).NotTo(HaveOccurred())
2296+
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))
2297+
})
2298+
22852299
It("should RPopLPush", func() {
22862300
rPush := client.RPush(ctx, "list", "one")
22872301
Expect(rPush.Err()).NotTo(HaveOccurred())
@@ -4113,6 +4127,45 @@ var _ = Describe("Commands", func() {
41134127
}))
41144128
})
41154129

4130+
It("should ZUnion", func() {
4131+
err := client.ZAddArgs(ctx, "zset1", redis.ZAddArgs{
4132+
Members: []redis.Z{
4133+
{Score: 1, Member: "one"},
4134+
{Score: 2, Member: "two"},
4135+
},
4136+
}).Err()
4137+
Expect(err).NotTo(HaveOccurred())
4138+
4139+
err = client.ZAddArgs(ctx, "zset2", redis.ZAddArgs{
4140+
Members: []redis.Z{
4141+
{Score: 1, Member: "one"},
4142+
{Score: 2, Member: "two"},
4143+
{Score: 3, Member: "three"},
4144+
},
4145+
}).Err()
4146+
Expect(err).NotTo(HaveOccurred())
4147+
4148+
union, err := client.ZUnion(ctx, redis.ZStore{
4149+
Keys: []string{"zset1", "zset2"},
4150+
Weights: []float64{2, 3},
4151+
Aggregate: "sum",
4152+
}).Result()
4153+
Expect(err).NotTo(HaveOccurred())
4154+
Expect(union).To(Equal([]string{"one", "three", "two"}))
4155+
4156+
unionScores, err := client.ZUnionWithScores(ctx, redis.ZStore{
4157+
Keys: []string{"zset1", "zset2"},
4158+
Weights: []float64{2, 3},
4159+
Aggregate: "sum",
4160+
}).Result()
4161+
Expect(err).NotTo(HaveOccurred())
4162+
Expect(unionScores).To(Equal([]redis.Z{
4163+
{Score: 5, Member: "one"},
4164+
{Score: 9, Member: "three"},
4165+
{Score: 10, Member: "two"},
4166+
}))
4167+
})
4168+
41164169
It("should ZUnionStore", func() {
41174170
err := client.ZAdd(ctx, "zset1", redis.Z{Score: 1, Member: "one"}).Err()
41184171
Expect(err).NotTo(HaveOccurred())
@@ -4339,6 +4392,33 @@ var _ = Describe("Commands", func() {
43394392
Expect(n).To(Equal(int64(3)))
43404393
})
43414394

4395+
// TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter.
4396+
// TODO Don't test it for now.
4397+
// TODO link: https://github.com/redis/redis/issues/9046
4398+
It("should XTrimMaxLen", func() {
4399+
n, err := client.XTrimMaxLen(ctx, "stream", 0).Result()
4400+
Expect(err).NotTo(HaveOccurred())
4401+
Expect(n).To(Equal(int64(3)))
4402+
})
4403+
4404+
It("should XTrimMaxLenApprox", func() {
4405+
n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result()
4406+
Expect(err).NotTo(HaveOccurred())
4407+
Expect(n).To(Equal(int64(3)))
4408+
})
4409+
4410+
It("should XTrimMinID", func() {
4411+
n, err := client.XTrimMinID(ctx, "stream", "4-0").Result()
4412+
Expect(err).NotTo(HaveOccurred())
4413+
Expect(n).To(Equal(int64(3)))
4414+
})
4415+
4416+
It("should XTrimMinIDApprox", func() {
4417+
n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result()
4418+
Expect(err).NotTo(HaveOccurred())
4419+
Expect(n).To(Equal(int64(3)))
4420+
})
4421+
43424422
It("should XAdd", func() {
43434423
id, err := client.XAdd(ctx, &redis.XAddArgs{
43444424
Stream: "stream",

example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ func ExampleClient_Watch() {
417417
// Actual opperation (local in optimistic lock).
418418
n++
419419

420-
// Operation is commited only if the watched keys remain unchanged.
420+
// Operation is committed only if the watched keys remain unchanged.
421421
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
422422
pipe.Set(ctx, key, n, 0)
423423
return nil

internal/pool/conncheck.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
2+
3+
package pool
4+
5+
import (
6+
"errors"
7+
"io"
8+
"net"
9+
"syscall"
10+
)
11+
12+
var errUnexpectedRead = errors.New("unexpected read from socket")
13+
14+
func connCheck(conn net.Conn) error {
15+
sysConn, ok := conn.(syscall.Conn)
16+
if !ok {
17+
return nil
18+
}
19+
rawConn, err := sysConn.SyscallConn()
20+
if err != nil {
21+
return err
22+
}
23+
24+
var sysErr error
25+
err = rawConn.Read(func(fd uintptr) bool {
26+
var buf [1]byte
27+
n, err := syscall.Read(int(fd), buf[:])
28+
switch {
29+
case n == 0 && err == nil:
30+
sysErr = io.EOF
31+
case n > 0:
32+
sysErr = errUnexpectedRead
33+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
34+
sysErr = nil
35+
default:
36+
sysErr = err
37+
}
38+
return true
39+
})
40+
if err != nil {
41+
return err
42+
}
43+
44+
return sysErr
45+
}

internal/pool/conncheck_dummy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
2+
3+
package pool
4+
5+
import "net"
6+
7+
func connCheck(conn net.Conn) error {
8+
return nil
9+
}

0 commit comments

Comments
 (0)