Skip to content

Commit 659e239

Browse files
committed
Cherry-pick #3044
Signed-off-by: ItalyPaleAle <[email protected]>
1 parent 31ccb5f commit 659e239

File tree

3 files changed

+111
-129
lines changed

3 files changed

+111
-129
lines changed

internal/component/redis/redis.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,33 @@ func GetServerVersion(c RedisClient) (string, error) {
221221
return "", fmt.Errorf("could not find redis_version in redis info response")
222222
}
223223

224+
// GetConnectedSlaves returns the number of slaves connected to the Redis master.
225+
func GetConnectedSlaves(ctx context.Context, c RedisClient) (int, error) {
226+
const connectedSlavesReplicas = "connected_slaves:"
227+
228+
res, err := c.DoRead(ctx, "INFO", "replication")
229+
if err != nil {
230+
return 0, err
231+
}
232+
233+
// Response example: https://redis.io/commands/info#return-value
234+
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
235+
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
236+
if len(s) == 0 {
237+
return 0, nil
238+
}
239+
240+
infos := strings.Split(s, "\r\n")
241+
for _, info := range infos {
242+
if strings.HasPrefix(info, connectedSlavesReplicas) {
243+
parsedReplicas, _ := strconv.ParseInt(info[len(connectedSlavesReplicas):], 10, 32)
244+
return int(parsedReplicas), nil
245+
}
246+
}
247+
248+
return 0, nil
249+
}
250+
224251
type RedisError string
225252

226253
func (e RedisError) Error() string { return string(e) }

lock/redis/standalone.go

Lines changed: 45 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ package redis
1515

1616
import (
1717
"context"
18+
"errors"
1819
"fmt"
1920
"reflect"
20-
"strconv"
21-
"strings"
2221
"time"
2322

2423
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
@@ -27,13 +26,10 @@ import (
2726
"github.com/dapr/kit/logger"
2827
)
2928

30-
const (
31-
unlockScript = "local v = redis.call(\"get\",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call(\"del\",KEYS[1]) end"
32-
connectedSlavesReplicas = "connected_slaves:"
33-
infoReplicationDelimiter = "\r\n"
34-
)
29+
const unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end`
3530

36-
// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
31+
// Standalone Redis lock store.
32+
// Any fail-over related features are not supported, such as Sentinel and Redis Cluster.
3733
type StandaloneRedisLock struct {
3834
client rediscomponent.RedisClient
3935
clientSettings *rediscomponent.Settings
@@ -52,83 +48,46 @@ func NewStandaloneRedisLock(logger logger.Logger) lock.Store {
5248
}
5349

5450
// Init StandaloneRedisLock.
55-
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) error {
56-
// must have `redisHost`
57-
if metadata.Properties["redisHost"] == "" {
58-
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. redisHost is empty")
59-
}
60-
// no failover
61-
if needFailover(metadata.Properties) {
62-
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Failover is not supported")
63-
}
64-
// construct client
65-
var err error
51+
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) (err error) {
52+
// Create the client
6653
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.LockStoreType)
6754
if err != nil {
6855
return err
6956
}
70-
// 3. connect to redis
71-
if _, err = r.client.PingResult(ctx); err != nil {
72-
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err)
73-
}
74-
// no replica
75-
replicas, err := r.getConnectedSlaves(ctx)
76-
// pass the validation if error occurs,
77-
// since some redis versions such as miniredis do not recognize the `INFO` command.
78-
if err == nil && replicas > 0 {
79-
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Replication is not supported")
80-
}
81-
return nil
82-
}
8357

84-
func needFailover(properties map[string]string) bool {
85-
if val, ok := properties["failover"]; ok && val != "" {
86-
parsedVal, err := strconv.ParseBool(val)
87-
if err != nil {
88-
return false
89-
}
90-
return parsedVal
58+
// Ensure we have a host
59+
if r.clientSettings.Host == "" {
60+
return errors.New("metadata property redisHost is empty")
9161
}
92-
return false
93-
}
9462

95-
func (r *StandaloneRedisLock) getConnectedSlaves(ctx context.Context) (int, error) {
96-
res, err := r.client.DoRead(ctx, "INFO", "replication")
97-
if err != nil {
98-
return 0, err
63+
// We do not support failover or having replicas
64+
if r.clientSettings.Failover {
65+
return errors.New("this component does not support connecting to Redis with failover")
9966
}
10067

101-
// Response example: https://redis.io/commands/info#return-value
102-
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
103-
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
104-
if len(s) == 0 {
105-
return 0, nil
68+
// Ping Redis to ensure the connection is uo
69+
if _, err = r.client.PingResult(ctx); err != nil {
70+
return fmt.Errorf("error connecting to Redis: %v", err)
10671
}
10772

108-
return r.parseConnectedSlaves(s), nil
109-
}
110-
111-
func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int {
112-
infos := strings.Split(res, infoReplicationDelimiter)
113-
for _, info := range infos {
114-
if strings.Contains(info, connectedSlavesReplicas) {
115-
parsedReplicas, _ := strconv.ParseUint(info[len(connectedSlavesReplicas):], 10, 32)
116-
117-
return int(parsedReplicas)
118-
}
73+
// Ensure there are no replicas
74+
// Pass the validation if error occurs, since some Redis versions such as miniredis do not recognize the `INFO` command.
75+
replicas, err := rediscomponent.GetConnectedSlaves(ctx, r.client)
76+
if err == nil && replicas > 0 {
77+
return errors.New("replication is not supported")
11978
}
120-
121-
return 0
79+
return nil
12280
}
12381

124-
// Try to acquire a redis lock.
82+
// TryLock tries to acquire a lock.
83+
// If the lock cannot be acquired, it returns immediately.
12584
func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
126-
// 1.Setting redis expiration time
85+
// Set a key if doesn't exist with an expiration time
12786
nxval, err := r.client.SetNX(ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
12887
if nxval == nil {
129-
return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID)
88+
return &lock.TryLockResponse{}, fmt.Errorf("setNX returned a nil response")
13089
}
131-
// 2. check error
90+
13291
if err != nil {
13392
return &lock.TryLockResponse{}, err
13493
}
@@ -138,46 +97,46 @@ func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequ
13897
}, nil
13998
}
14099

141-
// Try to release a redis lock.
100+
// Unlock tries to release a lock if the lock is still valid.
142101
func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
143-
// 1. delegate to client.eval lua script
102+
// Delegate to client.eval lua script
144103
evalInt, parseErr, err := r.client.EvalInt(ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
145-
// 2. check error
146104
if evalInt == nil {
147-
return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID)
105+
res := &lock.UnlockResponse{
106+
Status: lock.InternalError,
107+
}
108+
return res, errors.New("eval unlock script returned a nil response")
148109
}
149-
// 3. parse result
150-
i := *evalInt
151-
status := lock.InternalError
110+
111+
// Parse result
152112
if parseErr != nil {
153113
return &lock.UnlockResponse{
154-
Status: status,
114+
Status: lock.InternalError,
155115
}, err
156116
}
157-
if i >= 0 {
117+
var status lock.Status
118+
switch {
119+
case *evalInt >= 0:
158120
status = lock.Success
159-
} else if i == -1 {
121+
case *evalInt == -1:
160122
status = lock.LockDoesNotExist
161-
} else if i == -2 {
123+
case *evalInt == -2:
162124
status = lock.LockBelongsToOthers
125+
default:
126+
status = lock.InternalError
163127
}
128+
164129
return &lock.UnlockResponse{
165130
Status: status,
166131
}, nil
167132
}
168133

169-
func newInternalErrorUnlockResponse() *lock.UnlockResponse {
170-
return &lock.UnlockResponse{
171-
Status: lock.InternalError,
172-
}
173-
}
174-
175134
// Close shuts down the client's redis connections.
176135
func (r *StandaloneRedisLock) Close() error {
177136
if r.client != nil {
178-
closeErr := r.client.Close()
137+
err := r.client.Close()
179138
r.client = nil
180-
return closeErr
139+
return err
181140
}
182141
return nil
183142
}

lock/redis/standalone_test.go

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ package redis
1515

1616
import (
1717
"context"
18-
"sync"
1918
"testing"
2019

2120
miniredis "github.com/alicebob/miniredis/v2"
2221
"github.com/google/uuid"
2322
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
2424

2525
"github.com/dapr/components-contrib/lock"
2626
"github.com/dapr/components-contrib/metadata"
@@ -84,9 +84,10 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
8484
// 0. prepare
8585
// start redis
8686
s, err := miniredis.Run()
87-
assert.NoError(t, err)
87+
require.NoError(t, err)
8888
defer s.Close()
89-
// construct component
89+
90+
// Construct component
9091
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
9192
defer comp.Close()
9293

@@ -95,59 +96,54 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
9596
}}
9697
cfg.Properties["redisHost"] = s.Addr()
9798
cfg.Properties["redisPassword"] = ""
98-
// init
99+
100+
// Init
99101
err = comp.InitLockStore(context.Background(), cfg)
100-
assert.NoError(t, err)
102+
require.NoError(t, err)
103+
101104
// 1. client1 trylock
102105
ownerID1 := uuid.New().String()
103106
resp, err := comp.TryLock(context.Background(), &lock.TryLockRequest{
104107
ResourceID: resourceID,
105108
LockOwner: ownerID1,
106109
ExpiryInSeconds: 10,
107110
})
108-
assert.NoError(t, err)
111+
require.NoError(t, err)
109112
assert.True(t, resp.Success)
110-
var wg sync.WaitGroup
111-
wg.Add(1)
113+
112114
// 2. Client2 tryLock fail
113-
go func() {
114-
owner2 := uuid.New().String()
115-
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
116-
ResourceID: resourceID,
117-
LockOwner: owner2,
118-
ExpiryInSeconds: 10,
119-
})
120-
assert.NoError(t, err2)
121-
assert.False(t, resp2.Success)
122-
wg.Done()
123-
}()
124-
wg.Wait()
125-
// 3. client 1 unlock
115+
owner2 := uuid.New().String()
116+
resp, err = comp.TryLock(context.Background(), &lock.TryLockRequest{
117+
ResourceID: resourceID,
118+
LockOwner: owner2,
119+
ExpiryInSeconds: 10,
120+
})
121+
require.NoError(t, err)
122+
assert.False(t, resp.Success)
123+
124+
// 3. Client 1 unlock
126125
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
127126
ResourceID: resourceID,
128127
LockOwner: ownerID1,
129128
})
130-
assert.NoError(t, err)
129+
require.NoError(t, err)
131130
assert.True(t, unlockResp.Status == 0, "client1 failed to unlock!")
132-
// 4. client 2 get lock
133-
wg.Add(1)
134-
go func() {
135-
owner2 := uuid.New().String()
136-
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
137-
ResourceID: resourceID,
138-
LockOwner: owner2,
139-
ExpiryInSeconds: 10,
140-
})
141-
assert.NoError(t, err2)
142-
assert.True(t, resp2.Success, "client2 failed to get lock?!")
143-
// 5. client2 unlock
144-
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
145-
ResourceID: resourceID,
146-
LockOwner: owner2,
147-
})
148-
assert.NoError(t, err)
149-
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
150-
wg.Done()
151-
}()
152-
wg.Wait()
131+
132+
// 4. Client 2 get lock
133+
owner2 = uuid.New().String()
134+
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
135+
ResourceID: resourceID,
136+
LockOwner: owner2,
137+
ExpiryInSeconds: 10,
138+
})
139+
require.NoError(t, err2)
140+
assert.True(t, resp2.Success, "client2 failed to get lock?!")
141+
142+
// 5. client2 unlock
143+
unlockResp, err = comp.Unlock(context.Background(), &lock.UnlockRequest{
144+
ResourceID: resourceID,
145+
LockOwner: owner2,
146+
})
147+
require.NoError(t, err)
148+
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
153149
}

0 commit comments

Comments
 (0)