Skip to content

Commit 2ec13a8

Browse files
committed
refactor: removing redundant lock parameters and updating lock acquisition methods
1 parent 1c98b46 commit 2ec13a8

File tree

14 files changed

+177
-261
lines changed

14 files changed

+177
-261
lines changed

backend/cmd/root.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,14 +210,10 @@ func addFlags() error {
210210
if err := bindStringFlag(rootCmd, "telemetry.otlp_endpoint", "jaeger:4317", "OpenTelemetry gRPC endpoint"); err != nil {
211211
return fmt.Errorf("failed to bind telemetry.otlp_endpoint flag: %w", err)
212212
}
213-
// === Locks ===
214-
if err := bindIntFlag(rootCmd, "locks.lock_timeout_in_hours", 24, "Redis lock timeout (hours)"); err != nil {
215-
return fmt.Errorf("failed to bind locks.lock_timeout_in_hours flag: %w", err)
213+
// === Lock Timeout In Hours ===
214+
if err := bindIntFlag(rootCmd, "lock_timeout_in_hours", 1, "Redis lock timeout (hours)"); err != nil {
215+
return fmt.Errorf("failed to bind lock_timeout_in_hours flag: %w", err)
216216
}
217-
if err := bindIntFlag(rootCmd, "locks.locks_release_interval_in_minutes", 2, "Locks release interval (minutes)"); err != nil {
218-
return fmt.Errorf("failed to bind locks.locks_release_interval_in_minutes flag: %w", err)
219-
}
220-
221217
return nil
222218
}
223219

backend/internal/api/app/app.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ func (app *App) StartBackgroundWorkers() {
229229
go app.workers.TrackReservedNodeHealth()
230230
go app.workers.CollectGORMMetrics()
231231
go app.workers.CollectGoRuntimeMetrics()
232-
go app.workers.ReleaseWorkflowLocks()
233232
}
234233

235234
// Run starts the server

backend/internal/api/app/app_dependencies.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func createAppCore(ctx context.Context, config cfg.Configuration) (appCore, erro
170170
return appCore{}, fmt.Errorf("failed to init workflow engine: %w", err)
171171
}
172172

173-
locker := distributedlocks.NewRedisLocker(client, time.Duration(config.Locks.LockTimeoutInHours)*time.Hour)
173+
locker := distributedlocks.NewRedisLocker(client, time.Duration(config.LockTimeoutInHours)*time.Hour)
174174

175175
return appCore{
176176
appCtx: ctx,
@@ -382,8 +382,8 @@ func (app *App) createHandlers() appHandlers {
382382
)
383383
statsHandler := handlers.NewStatsHandler(statsService)
384384
notificationHandler := handlers.NewNotificationHandler(notificationAPIService)
385-
nodeHandler := handlers.NewNodeHandler(nodeService, app.core.locker)
386-
deploymentHandler := handlers.NewDeploymentHandler(deploymentService, app.core.locker)
385+
nodeHandler := handlers.NewNodeHandler(nodeService)
386+
deploymentHandler := handlers.NewDeploymentHandler(deploymentService)
387387
invoiceHandler := handlers.NewInvoiceHandler(invoiceService)
388388
adminHandler := handlers.NewAdminHandler(adminService, app.communication.notificationDispatcher, app.communication.mailService)
389389
healthHandler := handlers.NewHealthHandler(app.config.SystemAccount.Network, app.infra.firesquidClient, app.infra.graphql, app.core.db)
@@ -415,9 +415,8 @@ func (app *App) createWorkers() workers.Workers {
415415
app.communication.notificationDispatcher, app.infra.graphql, app.infra.firesquidClient,
416416
app.infra.substrateClient, app.config.Invoice, app.config.SystemAccount.Mnemonic,
417417
app.config.Currency, app.config.ClusterHealthCheckIntervalInHours,
418-
app.config.NodeHealthCheck.ReservedNodeHealthCheckIntervalInHours, app.config.NodeHealthCheck.ReservedNodeHealthCheckTimeoutInMinutes, app.config.NodeHealthCheck.ReservedNodeHealthCheckWorkersNum, app.config.MonitorBalanceIntervalInMinutes, app.config.NotifyAdminsForPendingRecordsInHours, app.config.UsersBalanceCheckIntervalInHours, app.config.CheckUserDebtIntervalInHours,
419-
app.config.Locks.LocksReleaseIntervalInMinutes,
420-
app.core.locker,
418+
app.config.NodeHealthCheck.ReservedNodeHealthCheckIntervalInHours, app.config.NodeHealthCheck.ReservedNodeHealthCheckTimeoutInMinutes, app.config.NodeHealthCheck.ReservedNodeHealthCheckWorkersNum, app.config.MonitorBalanceIntervalInMinutes, app.config.NotifyAdminsForPendingRecordsInHours, app.config.UsersBalanceCheckIntervalInHours,
419+
app.config.CheckUserDebtIntervalInHours,
421420
)
422421

423422
return workers.NewWorkers(app.core.appCtx, workersService, app.core.metrics, app.core.db)

backend/internal/api/handlers/deployment_handler.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,18 @@ import (
66

77
"github.com/gin-gonic/gin"
88

9-
distributedlocks "kubecloud/internal/core/distributed_locks"
109
"kubecloud/internal/core/models"
1110
"kubecloud/internal/core/services"
1211
"kubecloud/internal/deployment/kubedeployer"
1312
)
1413

1514
type DeploymentHandler struct {
16-
svc services.DeploymentService
17-
locker distributedlocks.DistributedLocks
15+
svc services.DeploymentService
1816
}
1917

20-
func NewDeploymentHandler(svc services.DeploymentService, locker distributedlocks.DistributedLocks) DeploymentHandler {
18+
func NewDeploymentHandler(svc services.DeploymentService) DeploymentHandler {
2119
return DeploymentHandler{
22-
svc: svc,
23-
locker: locker,
20+
svc: svc,
2421
}
2522
}
2623

@@ -240,21 +237,6 @@ func (h *DeploymentHandler) HandleDeployCluster(c *gin.Context) {
240237
return
241238
}
242239

243-
nodeIDs := make([]uint32, len(cluster.Nodes))
244-
for i, node := range cluster.Nodes {
245-
nodeIDs[i] = node.NodeID
246-
}
247-
248-
if err = h.locker.AcquireNodesLocks(c.Request.Context(), nodeIDs); err != nil {
249-
reqLog.Error().Err(err).Msg("failed to acquire nodes locks")
250-
if errors.Is(err, distributedlocks.ErrNodeLocked) {
251-
Conflict(c, err.Error())
252-
return
253-
}
254-
InternalServerError(c)
255-
return
256-
}
257-
258240
wfUUID, wfStatus, err := h.svc.AsyncDeployCluster(config, cluster)
259241
if err != nil {
260242
reqLog.Error().Err(err).Msg("failed to start deployment workflow")
@@ -426,15 +408,6 @@ func (h *DeploymentHandler) HandleAddNode(c *gin.Context) {
426408
return
427409
}
428410
}
429-
if err = h.locker.AcquireNodesLocks(c.Request.Context(), []uint32{cluster.Nodes[0].NodeID}); err != nil {
430-
reqLog.Error().Err(err).Msg("failed to acquire nodes locks")
431-
if errors.Is(err, distributedlocks.ErrNodeLocked) {
432-
Conflict(c, err.Error())
433-
return
434-
}
435-
InternalServerError(c)
436-
return
437-
}
438411

439412
wfUUID, wfStatus, err := h.svc.AsyncAddNode(config, cl, cluster.Nodes[0])
440413
if err != nil {

backend/internal/api/handlers/node_handler.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package handlers
33
import (
44
"errors"
55
"fmt"
6-
distributedlocks "kubecloud/internal/core/distributed_locks"
76
"kubecloud/internal/core/models"
87
"math/rand/v2"
98
"net/url"
@@ -18,14 +17,12 @@ import (
1817
)
1918

2019
type NodeHandler struct {
21-
svc services.NodeService
22-
locker distributedlocks.DistributedLocks
20+
svc services.NodeService
2321
}
2422

25-
func NewNodeHandler(svc services.NodeService, locker distributedlocks.DistributedLocks) NodeHandler {
23+
func NewNodeHandler(svc services.NodeService) NodeHandler {
2624
return NodeHandler{
27-
svc: svc,
28-
locker: locker,
25+
svc: svc,
2926
}
3027
}
3128

@@ -297,16 +294,6 @@ func (h *NodeHandler) ReserveNodeHandler(c *gin.Context) {
297294
return
298295
}
299296

300-
if err = h.locker.AcquireNodesLocks(c.Request.Context(), []uint32{nodeID}); err != nil {
301-
reqLog.Error().Err(err).Msg("failed to acquire nodes locks")
302-
if errors.Is(err, distributedlocks.ErrNodeLocked) {
303-
Conflict(c, err.Error())
304-
return
305-
}
306-
InternalServerError(c)
307-
return
308-
}
309-
310297
wfUUID, err := h.svc.AsyncReserveNode(userID, user.Mnemonic, nodeID)
311298
if err != nil {
312299
reqLog.Error().Err(err).Msg("failed to start workflow to reserve node")

backend/internal/config/config.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ type Configuration struct {
3333
DevMode bool `json:"dev_mode"` // When true, allows empty SendGridKey and uses FakeMailService
3434
MonitorBalanceIntervalInMinutes int `json:"monitor_balance_interval_in_minutes" validate:"required,gt=0"`
3535
NotifyAdminsForPendingRecordsInHours int `json:"notify_admins_for_pending_records_in_hours" validate:"required,gt=0"`
36-
Locks LocksConfig `json:"locks" validate:"required,dive"`
3736
ClusterHealthCheckIntervalInHours int `json:"cluster_health_check_interval_in_hours" validate:"gt=0" default:"1"`
3837
UsersBalanceCheckIntervalInHours int `json:"users_balance_check_interval_in_hours" validate:"gt=0" default:"6"`
3938
CheckUserDebtIntervalInHours int `json:"check_user_debt_interval_in_hours" validate:"gt=0" default:"48"`
4039
NodeHealthCheck ReservedNodeHealthCheckConfig `json:"node_health_check" validate:"required,dive"`
40+
LockTimeoutInHours int `json:"lock_timeout_in_hours" validate:"required,gt=0" default:"1"`
4141

4242
Logger LoggerConfig `json:"logger"`
4343
Loki LokiConfig `json:"loki"`
@@ -132,10 +132,6 @@ type TelemetryConfig struct {
132132
OTLPEndpoint string `json:"otlp_endpoint" default:"jaeger:4317"` // gRPC endpoint for OTLP exporter
133133
}
134134

135-
type LocksConfig struct {
136-
LockTimeoutInHours int `json:"lock_timeout_in_hours" validate:"required,gt=0" default:"24"`
137-
LocksReleaseIntervalInMinutes int `json:"locks_release_interval_in_minutes" validate:"required,gt=0" default:"5"`
138-
}
139135
type ReservedNodeHealthCheckConfig struct {
140136
ReservedNodeHealthCheckIntervalInHours int `json:"reserved_node_health_check_interval_in_hours" validate:"required,gt=0" default:"1"`
141137
ReservedNodeHealthCheckTimeoutInMinutes int `json:"reserved_node_health_check_timeout_in_minutes" validate:"required,gt=0" default:"1"`
@@ -392,11 +388,8 @@ func applyDefaultValues(config *Configuration) {
392388
if config.NotifyAdminsForPendingRecordsInHours == 0 {
393389
config.NotifyAdminsForPendingRecordsInHours = 24
394390
}
395-
if config.Locks.LockTimeoutInHours == 0 {
396-
config.Locks.LockTimeoutInHours = 24
397-
}
398-
if config.Locks.LocksReleaseIntervalInMinutes == 0 {
399-
config.Locks.LocksReleaseIntervalInMinutes = 2
391+
if config.LockTimeoutInHours == 0 {
392+
config.LockTimeoutInHours = 24
400393
}
401394

402395
if config.Telemetry.OTLPEndpoint == "" {

backend/internal/core/distributed_locks/distributed_locks.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ var ErrNodeLocked = errors.New("node is currently locked by another request")
99

1010
// DistributedLocks is an interface that defines the methods for distributed locks.
1111
type DistributedLocks interface {
12-
AcquireNodesLocks(ctx context.Context, nodeIDs []uint32) error
13-
AcquireWorkflowLock(ctx context.Context, nodeIDs []uint32, workflowID string) error
14-
ReleaseLock(ctx context.Context, nodeIDs []uint32, workflowID string) error
15-
GetAllWorkflowsLocks(ctx context.Context) ([]string, error)
12+
AcquireNodesLocks(ctx context.Context, nodeIDs []uint32) (map[string]string, error)
13+
ReleaseLock(ctx context.Context, lockedKeys map[string]string) error
1614
GetLockedNodes(ctx context.Context) ([]uint32, error)
1715
}

backend/internal/core/distributed_locks/redis_locker.go

Lines changed: 59 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/redis/go-redis/v9"
1112
)
1213

14+
const (
15+
nodeLockKey = "locked"
16+
)
17+
1318
type RedisLocker struct {
1419
client *redis.Client
1520
lockTimeout time.Duration
@@ -24,77 +29,56 @@ func NewRedisLocker(client *redis.Client, lockTimeout time.Duration) *RedisLocke
2429
}
2530

2631
// AcquireNodesLocks acquires locks for the given node IDs.
27-
func (l *RedisLocker) AcquireNodesLocks(ctx context.Context, nodeIDs []uint32) error {
28-
if err := l.acquireKeys(ctx, lockKeys(nodeIDs, nodeLockKey)); err != nil {
29-
return err
30-
}
31-
32-
return nil
33-
}
34-
35-
// AcquireWorkflowLock acquires a lock for the given workflow ID.
36-
func (l *RedisLocker) AcquireWorkflowLock(ctx context.Context, nodeIDs []uint32, workflowID string) error {
37-
keys := lockKeys(nodeIDs, func(id uint32) string {
38-
return workflowLockKey(id, workflowID)
39-
})
40-
41-
if err := l.acquireKeys(ctx, keys); err != nil {
42-
//rollback nodes locks
43-
nodeLockKeys := lockKeys(nodeIDs, nodeLockKey)
44-
if rollErr := l.rollbackLocks(ctx, nodeLockKeys); rollErr != nil {
45-
return rollErr
46-
}
47-
return err
32+
func (l *RedisLocker) AcquireNodesLocks(ctx context.Context, nodeIDs []uint32) (map[string]string, error) {
33+
lockedKeys, err := l.acquireKeys(ctx, nodeLockKeys(nodeIDs))
34+
if err != nil {
35+
return nil, err
4836
}
49-
50-
return nil
51-
}
52-
53-
func nodeLockKey(nodeID uint32) string {
54-
return fmt.Sprintf("locked:%d", nodeID)
55-
}
56-
57-
func workflowLockKey(nodeID uint32, workflowID string) string {
58-
return fmt.Sprintf("used:%d:%s", nodeID, workflowID)
37+
return lockedKeys, nil
5938
}
6039

61-
func lockKeys(ids []uint32, keyFunc func(uint32) string) []string {
62-
keys := make([]string, len(ids))
63-
for i, id := range ids {
64-
keys[i] = keyFunc(id)
40+
func nodeLockKeys(nodeIDs []uint32) []string {
41+
keys := make([]string, len(nodeIDs))
42+
for i, id := range nodeIDs {
43+
keys[i] = fmt.Sprintf("%s:%d", nodeLockKey, id)
6544
}
6645
return keys
6746
}
6847

69-
func (l *RedisLocker) acquireKeys(ctx context.Context, keys []string) error {
70-
locked := make([]string, 0, len(keys))
48+
func (l *RedisLocker) acquireKeys(ctx context.Context, keys []string) (map[string]string, error) {
49+
locked := make(map[string]string, len(keys))
7150

7251
for _, key := range keys {
73-
ok, err := l.client.SetNX(ctx, key, 1, l.lockTimeout).Result()
52+
keyValue := uuid.New().String()
53+
ok, err := l.client.SetNX(ctx, key, keyValue, l.lockTimeout).Result()
7454
if err != nil {
7555
if rollErr := l.rollbackLocks(ctx, locked); rollErr != nil {
76-
return rollErr
56+
return nil, rollErr
7757
}
78-
return fmt.Errorf("redis error while acquiring lock for key %s: %w", key, err)
58+
return nil, fmt.Errorf("redis error while acquiring lock for key %s: %w", key, err)
7959
}
8060

8161
if !ok {
8262
if rollErr := l.rollbackLocks(ctx, locked); rollErr != nil {
83-
return rollErr
63+
return nil, rollErr
8464
}
85-
return fmt.Errorf("%w: %s", ErrNodeLocked, key)
65+
return nil, fmt.Errorf("%w: %s", ErrNodeLocked, key)
8666
}
8767

88-
locked = append(locked, key)
68+
locked[key] = keyValue
8969
}
9070

91-
return nil
71+
return locked, nil
9272
}
9373

94-
func (l *RedisLocker) rollbackLocks(ctx context.Context, keys []string) error {
95-
if len(keys) == 0 {
74+
func (l *RedisLocker) rollbackLocks(ctx context.Context, locked map[string]string) error {
75+
if len(locked) == 0 {
9676
return nil
9777
}
78+
keys := make([]string, 0, len(locked))
79+
for k := range locked {
80+
keys = append(keys, k)
81+
}
9882

9983
if err := l.client.Del(ctx, keys...).Err(); err != nil {
10084
return fmt.Errorf("redis error while rolling back locks: %w", err)
@@ -103,18 +87,36 @@ func (l *RedisLocker) rollbackLocks(ctx context.Context, keys []string) error {
10387
return nil
10488
}
10589

106-
func (l *RedisLocker) ReleaseLock(ctx context.Context, nodeIDs []uint32, workflowID string) error {
107-
lockedKeys := lockKeys(nodeIDs, nodeLockKey)
108-
usedKeys := lockKeys(nodeIDs, func(id uint32) string {
109-
return workflowLockKey(id, workflowID)
110-
})
111-
allWorkflowsLocks := append(lockedKeys, usedKeys...)
112-
return l.client.Del(ctx, allWorkflowsLocks...).Err()
113-
}
90+
func (l *RedisLocker) ReleaseLock(ctx context.Context, lockedKeys map[string]string) error {
91+
if len(lockedKeys) == 0 {
92+
return nil
93+
}
94+
95+
var failedKeys []string
96+
for key, expectedValue := range lockedKeys {
97+
storedValue, err := l.client.Get(ctx, key).Result()
98+
if err == redis.Nil {
99+
continue
100+
}
101+
if err != nil {
102+
return fmt.Errorf("failed to get lock value for key %s: %w", key, err)
103+
}
114104

115-
// GetAllWorkflowsLocks gets all workflow locks.
116-
func (l *RedisLocker) GetAllWorkflowsLocks(ctx context.Context) ([]string, error) {
117-
return l.client.Keys(ctx, "used:*").Result()
105+
if storedValue != expectedValue {
106+
failedKeys = append(failedKeys, key)
107+
continue
108+
}
109+
110+
if err := l.client.Del(ctx, key).Err(); err != nil {
111+
return fmt.Errorf("failed to delete lock for key %s: %w", key, err)
112+
}
113+
}
114+
115+
if len(failedKeys) > 0 {
116+
return fmt.Errorf("lock value mismatch for keys: %v", failedKeys)
117+
}
118+
119+
return nil
118120
}
119121

120122
func (l *RedisLocker) GetLockedNodes(ctx context.Context) ([]uint32, error) {

0 commit comments

Comments
 (0)