diff --git a/lock/lock.go b/lock/lock.go new file mode 100644 index 0000000000..c1ffc52cf7 --- /dev/null +++ b/lock/lock.go @@ -0,0 +1,100 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lock + +import ( + "context" + + "github.com/dapr/components-contrib/common/features" + "github.com/dapr/components-contrib/metadata" +) + +// Store is the interface for lock stores. +type Store interface { + metadata.ComponentWithMetadata + + // Init the lock store. + Init(ctx context.Context, metadata Metadata) error + + // Features returns the list of supported features. + Features() []Feature + + // Lock acquires a lock. + // If the lock is owned by someone else, this method blocks until the lock can be acquired or the context is canceled. + Lock(ctx context.Context, req *LockRequest) (*LockResponse, error) + + // TryLock tries to acquire a lock. + // If the lock cannot be acquired, it returns immediately. + TryLock(ctx context.Context, req *LockRequest) (*LockResponse, error) + + // RenewLock attempts to renew a lock if the lock is still valid. + RenewLock(ctx context.Context, req *RenewLockRequest) (*RenewLockResponse, error) + + // Unlock tries to release a lock if the lock is still valid. + Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error) +} + +// Metadata contains a lock store specific set of metadata property. +type Metadata struct { + metadata.Base `json:",inline"` +} + +// Feature names a feature that can be implemented by the lock stores. +type Feature = features.Feature[Store] + +// LockRequest is the request to acquire locks, used by Lock and TryLock. +type LockRequest struct { + ResourceID string `json:"resourceId"` + LockOwner string `json:"lockOwner"` + ExpiryInSeconds int32 `json:"expiryInSeconds"` +} + +// LockResponse is the response used by Lock and TryLock when the operation is completed. +type LockResponse struct { + Success bool `json:"success"` +} + +// RenewLockRequest is a lock renewal request. +type RenewLockRequest struct { + ResourceID string `json:"resourceId"` + LockOwner string `json:"lockOwner"` + ExpiryInSeconds int32 `json:"expiryInSeconds"` +} + +// RenewLockResponse is a lock renewal request. +type RenewLockResponse struct { + Status LockStatus `json:"status"` +} + +// UnlockRequest is a lock release request. +type UnlockRequest struct { + ResourceID string `json:"resourceId"` + LockOwner string `json:"lockOwner"` +} + +// Status when releasing the lock. +type UnlockResponse struct { + Status LockStatus `json:"status"` +} + +// LockStatus is the status included in lock responses. +type LockStatus int32 + +// lock status. +const ( + LockStatusInternalError LockStatus = -1 + LockStatusSuccess LockStatus = 0 + LockStatusNotExist LockStatus = 1 + LockStatusOwnerMismatch LockStatus = 2 +) diff --git a/lock/metadata.go b/lock/metadata.go deleted file mode 100644 index 892c2382cb..0000000000 --- a/lock/metadata.go +++ /dev/null @@ -1,21 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lock - -import "github.com/dapr/components-contrib/metadata" - -// Metadata contains a lock store specific set of metadata property. -type Metadata struct { - metadata.Base `json:",inline"` -} diff --git a/lock/redis/standalone.go b/lock/redis/standalone.go index 24945e604d..ff25d8bfd0 100644 --- a/lock/redis/standalone.go +++ b/lock/redis/standalone.go @@ -18,15 +18,23 @@ import ( "errors" "fmt" "reflect" + "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" + rediscomponent "github.com/dapr/components-contrib/common/component/redis" "github.com/dapr/components-contrib/lock" contribMetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" ) -const unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end` +const ( + unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end` + renewLockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("expire",KEYS[1],ARGV[2]) end` +) + +var ErrComponentClosed = errors.New("component is closed") // Standalone Redis lock store. // Any fail-over related features are not supported, such as Sentinel and Redis Cluster. @@ -34,21 +42,24 @@ type StandaloneRedisLock struct { client rediscomponent.RedisClient clientSettings *rediscomponent.Settings - logger logger.Logger + closed atomic.Bool + runnincCh chan struct{} + logger logger.Logger } // NewStandaloneRedisLock returns a new standalone redis lock. -// Do not use this lock with a redis cluster, which might lead to unexpected lock loss. +// Do not use this lock with a Redis cluster, which might lead to unexpected lock loss. func NewStandaloneRedisLock(logger logger.Logger) lock.Store { s := &StandaloneRedisLock{ - logger: logger, + logger: logger, + runnincCh: make(chan struct{}), } return s } -// Init StandaloneRedisLock. -func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) (err error) { +// Init the lock store. +func (r *StandaloneRedisLock) Init(ctx context.Context, metadata lock.Metadata) (err error) { // Create the client r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.LockStoreType) if err != nil { @@ -79,31 +90,130 @@ func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.M return nil } +// Features returns the list of supported features. +func (r *StandaloneRedisLock) Features() []lock.Feature { + return nil +} + +// Lock tries to acquire a lock. +// If the lock is owned by someone else, this method blocks until the lock can be acquired or the context is canceled. +func (r *StandaloneRedisLock) Lock(ctx context.Context, req *lock.LockRequest) (res *lock.LockResponse, err error) { + if r.closed.Load() { + return nil, ErrComponentClosed + } + + // We try to acquire a lock through periodic polling + // A potentially more efficient way would be to use keyspace notifications to subscribe to changes in the key we subscribe to + // However, keyspace notifications: + // 1. Are not enabled by default in Redis, and require an explicit configuration change, which adds quite a bit of complexity for the user: https://redis.io/docs/manual/keyspace-notifications/ + // 2. When a connection to Redis calls SUBSCRIBE to watch for notifications, it cannot be used for anything else (unless we switch the protocol to RESP3, which must be explicitly chosen and only works with Redis 6+: https://redis.io/commands/hello/) + // So, periodic polling it is + + // We use an exponential backoff here because it supports a randomization factor + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 0 + bo.InitialInterval = 50 * time.Millisecond + bo.MaxInterval = 500 * time.Millisecond + bo.RandomizationFactor = 0.5 + + // Repat until we get the lock, or context is canceled + for { + // Try to acquire the lock + res, err = r.TryLock(ctx, req) + if err != nil { + // If we got an error, return right away + return nil, err + } + + // Let's see if we got the lock + if res.Success { + return res, nil + } + + // Sleep till the next tick and try again + // Stop when context is done or component is closed + t := time.NewTimer(bo.NextBackOff()) + select { + case <-t.C: + // Nop, retry + case <-ctx.Done(): + return nil, ctx.Err() + case <-r.runnincCh: + return nil, ErrComponentClosed + } + } +} + // TryLock tries to acquire a lock. // If the lock cannot be acquired, it returns immediately. -func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) { - // Set a key if doesn't exist with an expiration time +func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.LockRequest) (*lock.LockResponse, error) { + if r.closed.Load() { + return nil, ErrComponentClosed + } + + // Set a key if doesn't exist, with an expiration time nxval, err := r.client.SetNX(ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) if nxval == nil { - return &lock.TryLockResponse{}, fmt.Errorf("setNX returned a nil response") + return &lock.LockResponse{}, fmt.Errorf("setNX returned a nil response") } - if err != nil { - return &lock.TryLockResponse{}, err + return &lock.LockResponse{}, err } - return &lock.TryLockResponse{ + return &lock.LockResponse{ Success: *nxval, }, nil } +// RenewLock attempts to renew a lock if the lock is still valid. +func (r *StandaloneRedisLock) RenewLock(ctx context.Context, req *lock.RenewLockRequest) (*lock.RenewLockResponse, error) { + if r.closed.Load() { + return nil, ErrComponentClosed + } + + // Delegate to client.eval lua script + evalInt, parseErr, err := r.client.EvalInt(ctx, renewLockScript, []string{req.ResourceID}, req.LockOwner, req.ExpiryInSeconds) + if evalInt == nil { + res := &lock.RenewLockResponse{ + Status: lock.LockStatusInternalError, + } + return res, errors.New("eval renew lock script returned a nil response") + } + + // Parse result + if parseErr != nil { + return &lock.RenewLockResponse{ + Status: lock.LockStatusInternalError, + }, err + } + var status lock.LockStatus + switch { + case *evalInt >= 0: + status = lock.LockStatusSuccess + case *evalInt == -1: + status = lock.LockStatusNotExist + case *evalInt == -2: + status = lock.LockStatusOwnerMismatch + default: + status = lock.LockStatusInternalError + } + + return &lock.RenewLockResponse{ + Status: status, + }, nil +} + // Unlock tries to release a lock if the lock is still valid. func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) { + if r.closed.Load() { + return nil, ErrComponentClosed + } + // Delegate to client.eval lua script evalInt, parseErr, err := r.client.EvalInt(ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) if evalInt == nil { res := &lock.UnlockResponse{ - Status: lock.InternalError, + Status: lock.LockStatusInternalError, } return res, errors.New("eval unlock script returned a nil response") } @@ -111,19 +221,19 @@ func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockReques // Parse result if parseErr != nil { return &lock.UnlockResponse{ - Status: lock.InternalError, + Status: lock.LockStatusInternalError, }, err } - var status lock.Status + var status lock.LockStatus switch { case *evalInt >= 0: - status = lock.Success + status = lock.LockStatusSuccess case *evalInt == -1: - status = lock.LockDoesNotExist + status = lock.LockStatusNotExist case *evalInt == -2: - status = lock.LockBelongsToOthers + status = lock.LockStatusOwnerMismatch default: - status = lock.InternalError + status = lock.LockStatusInternalError } return &lock.UnlockResponse{ @@ -133,12 +243,17 @@ func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockReques // Close shuts down the client's redis connections. func (r *StandaloneRedisLock) Close() error { - if r.client != nil { - err := r.client.Close() - r.client = nil - return err + if !r.closed.CompareAndSwap(false, true) { + return nil } - return nil + + close(r.runnincCh) + + if r.client == nil { + return nil + } + + return r.client.Close() } // GetComponentMetadata returns the metadata of the component. diff --git a/lock/redis/standalone_test.go b/lock/redis/standalone_test.go index b4e99f49c3..46c5ee74a8 100644 --- a/lock/redis/standalone_test.go +++ b/lock/redis/standalone_test.go @@ -42,7 +42,7 @@ func TestStandaloneRedisLock_InitError(t *testing.T) { cfg.Properties["redisPassword"] = "" // init - err := comp.InitLockStore(context.Background(), cfg) + err := comp.Init(context.Background(), cfg) require.Error(t, err) }) @@ -58,7 +58,7 @@ func TestStandaloneRedisLock_InitError(t *testing.T) { cfg.Properties["redisPassword"] = "" // init - err := comp.InitLockStore(context.Background(), cfg) + err := comp.Init(context.Background(), cfg) require.Error(t, err) }) @@ -75,7 +75,7 @@ func TestStandaloneRedisLock_InitError(t *testing.T) { cfg.Properties["maxRetries"] = "1 " // init - err := comp.InitLockStore(context.Background(), cfg) + err := comp.Init(context.Background(), cfg) require.Error(t, err) }) } @@ -98,12 +98,12 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) { cfg.Properties["redisPassword"] = "" // Init - err = comp.InitLockStore(context.Background(), cfg) + err = comp.Init(context.Background(), cfg) require.NoError(t, err) // 1. client1 trylock ownerID1 := uuid.New().String() - resp, err := comp.TryLock(context.Background(), &lock.TryLockRequest{ + resp, err := comp.TryLock(context.Background(), &lock.LockRequest{ ResourceID: resourceID, LockOwner: ownerID1, ExpiryInSeconds: 10, @@ -113,7 +113,7 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) { // 2. Client2 tryLock fail owner2 := uuid.New().String() - resp, err = comp.TryLock(context.Background(), &lock.TryLockRequest{ + resp, err = comp.TryLock(context.Background(), &lock.LockRequest{ ResourceID: resourceID, LockOwner: owner2, ExpiryInSeconds: 10, @@ -131,7 +131,7 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) { // 4. Client 2 get lock owner2 = uuid.New().String() - resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{ + resp2, err2 := comp.TryLock(context.Background(), &lock.LockRequest{ ResourceID: resourceID, LockOwner: owner2, ExpiryInSeconds: 10, diff --git a/lock/requests.go b/lock/requests.go deleted file mode 100644 index 9c0b699993..0000000000 --- a/lock/requests.go +++ /dev/null @@ -1,27 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lock - -// TryLockRequest is a lock acquire request. -type TryLockRequest struct { - ResourceID string `json:"resourceId"` - LockOwner string `json:"lockOwner"` - ExpiryInSeconds int32 `json:"expiryInSeconds"` -} - -// UnlockRequest is a lock release request. -type UnlockRequest struct { - ResourceID string `json:"resourceId"` - LockOwner string `json:"lockOwner"` -} diff --git a/lock/responses.go b/lock/responses.go deleted file mode 100644 index c6308b7372..0000000000 --- a/lock/responses.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lock - -// Lock acquire request was successful or not. -type TryLockResponse struct { - Success bool `json:"success"` -} - -// Status when releasing the lock. -type UnlockResponse struct { - Status Status `json:"status"` -} - -type Status int32 - -// lock status. -const ( - Success Status = 0 - LockDoesNotExist Status = 1 - LockBelongsToOthers Status = 2 - InternalError Status = 3 -) diff --git a/lock/store.go b/lock/store.go deleted file mode 100644 index 0fb1758e2c..0000000000 --- a/lock/store.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lock - -import ( - "context" - - "github.com/dapr/components-contrib/metadata" -) - -type Store interface { - metadata.ComponentWithMetadata - - // Init this component. - InitLockStore(ctx context.Context, metadata Metadata) error - - // TryLock tries to acquire a lock. - TryLock(ctx context.Context, req *TryLockRequest) (*TryLockResponse, error) - - // Unlock tries to release a lock. - Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error) -} diff --git a/tests/conformance/lock/lock.go b/tests/conformance/lock/lock.go index acb04e1a16..8719a5ca41 100644 --- a/tests/conformance/lock/lock.go +++ b/tests/conformance/lock/lock.go @@ -59,7 +59,7 @@ func ConformanceTests(t *testing.T, props map[string]string, lockstore lock.Stor t.Run("init", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - err := lockstore.InitLockStore(ctx, lock.Metadata{Base: metadata.Base{ + err := lockstore.Init(ctx, lock.Metadata{Base: metadata.Base{ Properties: props, }}) require.NoError(t, err) @@ -71,18 +71,20 @@ func ConformanceTests(t *testing.T, props map[string]string, lockstore lock.Stor } const lockOwner = "conftest" - lockKey1 := key + "-1" - lockKey2 := key + "-2" - var expirationCh *time.Timer + var expirationChs [2]*time.Timer + lockKeys := [2]string{ + key + "-1", + key + "-2", + } t.Run("TryLock", func(t *testing.T) { // Acquire a lock t.Run("acquire lock1", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - res, err := lockstore.TryLock(ctx, &lock.TryLockRequest{ - ResourceID: lockKey1, + res, err := lockstore.TryLock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[0], LockOwner: lockOwner, ExpiryInSeconds: 15, }) @@ -95,25 +97,25 @@ func ConformanceTests(t *testing.T, props map[string]string, lockstore lock.Stor t.Run("acquire lock2", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - res, err := lockstore.TryLock(ctx, &lock.TryLockRequest{ - ResourceID: lockKey2, + res, err := lockstore.TryLock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[1], LockOwner: lockOwner, - ExpiryInSeconds: 3, + ExpiryInSeconds: 8, }) require.NoError(t, err) require.NotNil(t, res) assert.True(t, res.Success) - // Set expirationCh to when lock2 expires - expirationCh = time.NewTimer(3 * time.Second) + // Set expirationChs[0] to when lock2 expires + expirationChs[0] = time.NewTimer(3 * time.Second) }) // Acquiring the same lock again should fail t.Run("fails to acquire existing lock", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - res, err := lockstore.TryLock(ctx, &lock.TryLockRequest{ - ResourceID: lockKey1, + res, err := lockstore.TryLock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[0], LockOwner: lockOwner, ExpiryInSeconds: 15, }) @@ -133,48 +135,152 @@ func ConformanceTests(t *testing.T, props map[string]string, lockstore lock.Stor }) require.NoError(t, err) require.NotNil(t, res) - assert.Equal(t, lock.LockDoesNotExist, res.Status) + assert.Equal(t, lock.LockStatusNotExist, res.Status) }) t.Run("fails to unlock with wrong owner", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res, err := lockstore.Unlock(ctx, &lock.UnlockRequest{ - ResourceID: lockKey1, + ResourceID: lockKeys[0], LockOwner: "nonowner", }) require.NoError(t, err) require.NotNil(t, res) - assert.Equal(t, lock.LockBelongsToOthers, res.Status) + assert.Equal(t, lock.LockStatusOwnerMismatch, res.Status) }) t.Run("unlocks successfully", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res, err := lockstore.Unlock(ctx, &lock.UnlockRequest{ - ResourceID: lockKey1, + ResourceID: lockKeys[0], LockOwner: lockOwner, }) require.NoError(t, err) require.NotNil(t, res) - assert.Equal(t, lock.Success, res.Status) + assert.Equal(t, lock.LockStatusSuccess, res.Status) + }) + }) + + t.Run("Renew lock", func(t *testing.T) { + // Sleep for 1s to allow some time to advance + time.Sleep(1 * time.Second) + + t.Run("renews locks successfully", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + res, err := lockstore.RenewLock(ctx, &lock.RenewLockRequest{ + ResourceID: lockKeys[1], + LockOwner: lockOwner, + ExpiryInSeconds: 5, + }) + require.NoError(t, err) + require.NotNil(t, res) + assert.Equal(t, lock.LockStatusSuccess, res.Status) + + // Set expirationChs[1] to when lock2 expires (after being updated) + expirationChs[1] = time.NewTimer(3 * time.Second) + }) + + t.Run("fails to renew locks with nonexistent resource ID", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + res, err := lockstore.RenewLock(ctx, &lock.RenewLockRequest{ + ResourceID: "nonexistent", + LockOwner: lockOwner, + ExpiryInSeconds: 15, + }) + require.NoError(t, err) + require.NotNil(t, res) + assert.Equal(t, lock.LockStatusNotExist, res.Status) + }) + + t.Run("fails to renew locks with wrong owner", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + res, err := lockstore.RenewLock(ctx, &lock.RenewLockRequest{ + ResourceID: lockKeys[1], + LockOwner: "nonowner", + ExpiryInSeconds: 15, + }) + require.NoError(t, err) + require.NotNil(t, res) + assert.Equal(t, lock.LockStatusOwnerMismatch, res.Status) }) }) - t.Run("lock expires", func(t *testing.T) { - // Wait until the lock is supposed to expire - <-expirationCh.C + t.Run("Lock expires", func(t *testing.T) { + // Wait until the lock was originally supposed to expire at + <-expirationChs[0].C + + assertLockReleasedFn := func(resourceID string) func() bool { + return func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + res, err := lockstore.TryLock(ctx, &lock.LockRequest{ + ResourceID: resourceID, + LockOwner: lockOwner, + ExpiryInSeconds: 3, + }) + return err == nil && res != nil && res.Success + } + } + + // Assert that the lock is still active - we can't re-acquire it + assert.Never(t, assertLockReleasedFn(lockKeys[1]), time.Second, 100*time.Millisecond, "Lock 2 was released before its expected expiration") + + // Wait for when the lock is now supposed to expire at + <-expirationChs[1].C // Assert that the lock doesn't exist anymore - we should be able to re-acquire it - assert.Eventually(t, func() bool { + assert.Eventually(t, assertLockReleasedFn(lockKeys[1]), 5*time.Second, 100*time.Millisecond, "Lock 2 was not released in time after its expected expiration") + }) + + t.Run("Lock", func(t *testing.T) { + var exp time.Time + t.Run("acquire available lock right away", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - res, err := lockstore.TryLock(ctx, &lock.TryLockRequest{ - ResourceID: lockKey2, + res, err := lockstore.Lock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[0], LockOwner: lockOwner, - ExpiryInSeconds: 3, + ExpiryInSeconds: 8, }) - return err == nil && res != nil && res.Success - }, 5*time.Second, 100*time.Millisecond, "Lock 2 was not released in time after its scheduled expiration") + require.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.Success) + + // Subtract 500ms to give some buffer + exp = time.Now().Add(8*time.Second - 500*time.Millisecond) + }) + + t.Run("times out trying to acquire lock that already exists", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + res, err := lockstore.Lock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[0], + LockOwner: lockOwner, + ExpiryInSeconds: 10, + }) + require.Error(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, res) + }) + + t.Run("acquires the lock after it expires", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + res, err := lockstore.Lock(ctx, &lock.LockRequest{ + ResourceID: lockKeys[0], + LockOwner: lockOwner, + ExpiryInSeconds: 10, + }) + require.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.Success) + + assert.Greater(t, time.Now().UnixMilli(), exp.UnixMilli()) + }) }) }