Skip to content

Commit 15c488e

Browse files
committed
refactor: update lock release worker to handle all nodes of workflow once
1 parent 52b0461 commit 15c488e

File tree

11 files changed

+57
-37
lines changed

11 files changed

+57
-37
lines changed

backend/cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func addFlags() error {
206206
if err := bindIntFlag(rootCmd, "locks.lock_timeout_in_hours", 24, "Redis lock timeout (hours)"); err != nil {
207207
return fmt.Errorf("failed to bind locks.lock_timeout_in_hours flag: %w", err)
208208
}
209-
if err := bindIntFlag(rootCmd, "locks.locks_release_interval_in_minutes", 5, "Locks release interval (minutes)"); err != nil {
209+
if err := bindIntFlag(rootCmd, "locks.locks_release_interval_in_minutes", 2, "Locks release interval (minutes)"); err != nil {
210210
return fmt.Errorf("failed to bind locks.locks_release_interval_in_minutes flag: %w", err)
211211
}
212212

backend/internal/api/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func (app *App) StartBackgroundWorkers() {
223223
go app.workers.TrackReservedNodeHealth()
224224
go app.workers.CollectGORMMetrics()
225225
go app.workers.CollectGoRuntimeMetrics()
226+
go app.workers.ReleaseWorkflowLocks()
226227
}
227228

228229
// Run starts the server

backend/internal/api/handlers/deployment_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ func (h *DeploymentHandler) HandleDeployCluster(c *gin.Context) {
258258
wfUUID, wfStatus, err := h.svc.AsyncDeployCluster(config, cluster)
259259
if err != nil {
260260
reqLog.Error().Err(err).Msg("failed to start deployment workflow")
261+
err = h.locker.ReleaseLock(c.Request.Context(), nodeIDs, wfUUID)
262+
if err != nil {
263+
reqLog.Error().Err(err).Msg("failed to release nodes locks")
264+
}
261265
InternalServerError(c)
262266
return
263267
}
@@ -439,6 +443,10 @@ func (h *DeploymentHandler) HandleAddNode(c *gin.Context) {
439443
wfUUID, wfStatus, err := h.svc.AsyncAddNode(config, cl, cluster.Nodes[0])
440444
if err != nil {
441445
reqLog.Error().Err(err).Msg("failed to start add node workflow")
446+
err = h.locker.ReleaseLock(c.Request.Context(), []uint32{cluster.Nodes[0].NodeID}, wfUUID)
447+
if err != nil {
448+
reqLog.Error().Err(err).Msg("failed to release nodes locks")
449+
}
442450
InternalServerError(c)
443451
return
444452
}

backend/internal/api/handlers/node_handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ func (h *NodeHandler) ReserveNodeHandler(c *gin.Context) {
310310
wfUUID, err := h.svc.AsyncReserveNode(userID, user.Mnemonic, nodeID)
311311
if err != nil {
312312
reqLog.Error().Err(err).Msg("failed to start workflow to reserve node")
313+
err = h.locker.ReleaseLock(c.Request.Context(), []uint32{nodeID}, wfUUID)
314+
if err != nil {
315+
reqLog.Error().Err(err).Msg("failed to release nodes locks")
316+
}
313317
InternalServerError(c)
314318
return
315319
}

backend/internal/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func applyDefaultValues(config *Configuration) {
394394
config.Locks.LockTimeoutInHours = 24
395395
}
396396
if config.Locks.LocksReleaseIntervalInMinutes == 0 {
397-
config.Locks.LocksReleaseIntervalInMinutes = 5
397+
config.Locks.LocksReleaseIntervalInMinutes = 2
398398
}
399399

400400
if config.Telemetry.OTLPEndpoint == "" {

backend/internal/core/distributed_locks/distributed_locks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ var ErrNodeLocked = errors.New("node is currently locked by another request")
1111
type DistributedLocks interface {
1212
AcquireNodesLocks(ctx context.Context, nodeIDs []uint32) error
1313
AcquireWorkflowLock(ctx context.Context, nodeIDs []uint32, workflowID string) error
14-
ReleaseLock(ctx context.Context, nodeID uint32, workflowID string) error
14+
ReleaseLock(ctx context.Context, nodeIDs []uint32, workflowID string) error
1515
GetAllWorkflowsLocks(ctx context.Context) ([]string, error)
1616
GetLockedNodes(ctx context.Context) ([]uint32, error)
1717
}

backend/internal/core/distributed_locks/redis_locker.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,13 @@ func (l *RedisLocker) rollbackLocks(ctx context.Context, keys []string) error {
101101
return nil
102102
}
103103

104-
func (l *RedisLocker) ReleaseLock(ctx context.Context, nodeID uint32, workflowID string) error {
105-
lockedKey := nodeLockKey(nodeID)
106-
usedKey := workflowLockKey(nodeID, workflowID)
107-
return l.client.Del(ctx, lockedKey, usedKey).Err()
104+
func (l *RedisLocker) ReleaseLock(ctx context.Context, nodeIDs []uint32, workflowID string) error {
105+
lockedKeys := lockKeys(nodeIDs, nodeLockKey)
106+
usedKeys := lockKeys(nodeIDs, func(id uint32) string {
107+
return workflowLockKey(id, workflowID)
108+
})
109+
allWorkflowsLocks := append(lockedKeys, usedKeys...)
110+
return l.client.Del(ctx, allWorkflowsLocks...).Err()
108111
}
109112

110113
// GetAllWorkflowsLocks gets all workflow locks.

backend/internal/core/distributed_locks/redis_locker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestRedisLocker_ReleaseLock(t *testing.T) {
6969
require.NoError(t, client.Set(context.Background(), "locked:1", 1, 0).Err())
7070
require.NoError(t, client.Set(context.Background(), "used:1:wf-1", 1, 0).Err())
7171

72-
err := locker.ReleaseLock(context.Background(), 1, "wf-1")
72+
err := locker.ReleaseLock(context.Background(), []uint32{1}, "wf-1")
7373

7474
require.NoError(t, err)
7575
require.Equal(t, int64(0), client.Exists(context.Background(), "locked:1").Val())

backend/internal/core/services/node_service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func (m *MockDistributedLocks) AcquireWorkflowLock(ctx context.Context, nodeIDs
3636
return args.Error(0)
3737
}
3838

39-
func (m *MockDistributedLocks) ReleaseLock(ctx context.Context, nodeID uint32, workflowID string) error {
40-
args := m.Called(ctx, nodeID, workflowID)
39+
func (m *MockDistributedLocks) ReleaseLock(ctx context.Context, nodeIDs []uint32, workflowID string) error {
40+
args := m.Called(ctx, nodeIDs, workflowID)
4141
return args.Error(0)
4242
}
4343

backend/internal/core/services/workers_service.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -474,32 +474,41 @@ func (svc WorkerService) GetAllWorkflowsLocks() ([]string, error) {
474474
return svc.locker.GetAllWorkflowsLocks(svc.ctx)
475475
}
476476

477-
func (svc WorkerService) ReleaseLocks(key string) error {
478-
keyParts := strings.Split(key, ":")
479-
if len(keyParts) != 3 {
480-
return fmt.Errorf("invalid lock key format: %s", key)
481-
}
482-
483-
nodeID, err := strconv.ParseUint(keyParts[1], 10, 32)
484-
if err != nil {
485-
return fmt.Errorf("invalid node ID: %w", err)
486-
}
487-
488-
workflowID := keyParts[2]
489-
workflow, err := svc.ewfEngine.Store().LoadWorkflowByUUID(svc.ctx, workflowID)
490-
if err != nil {
491-
return fmt.Errorf("failed to load workflow by UUID: %w", err)
492-
}
477+
func (svc WorkerService) ReleaseLocks(keys []string) {
478+
log := logger.ForOperation("locks_worker", "release_locks")
479+
workflowsNodes := map[string][]uint32{}
480+
for _, key := range keys {
481+
parts := strings.Split(key, ":")
482+
if len(parts) != 3 {
483+
log.Error().Str("key", key).Msg("invalid lock key format")
484+
continue
485+
}
493486

494-
if !slices.Contains([]ewf.WorkflowStatus{ewf.StatusCompleted, ewf.StatusFailed}, workflow.Status) {
495-
return nil
487+
workflowID := parts[2]
488+
nodeID, err := strconv.ParseUint(parts[1], 10, 32)
489+
if err != nil {
490+
log.Error().Str("key", key).Msg("invalid node ID")
491+
continue
492+
}
493+
workflowsNodes[workflowID] = append(workflowsNodes[workflowID], uint32(nodeID))
496494
}
497495

498-
if err := svc.locker.ReleaseLock(svc.ctx, uint32(nodeID), workflowID); err != nil {
499-
return fmt.Errorf("failed to release nodes locks: %w", err)
496+
for workflowID := range workflowsNodes {
497+
workflow, err := svc.ewfEngine.Store().LoadWorkflowByUUID(svc.ctx, workflowID)
498+
if err != nil {
499+
log.Error().Str("workflow_id", workflowID).Msg("failed to load workflow")
500+
continue
501+
}
502+
if !slices.Contains([]ewf.WorkflowStatus{ewf.StatusCompleted, ewf.StatusFailed}, workflow.Status) {
503+
continue
504+
}
505+
nodeIDs := workflowsNodes[workflowID]
506+
if err := svc.locker.ReleaseLock(svc.ctx, nodeIDs, workflowID); err != nil {
507+
log.Error().Str("workflow_id", workflow.UUID).Msg("failed to release locks")
508+
continue
509+
}
500510
}
501511

502-
return nil
503512
}
504513

505514
func getHoursOfGivenPeriod(startDate, endDate time.Time) int {

0 commit comments

Comments
 (0)