Skip to content

Commit d2f1d3d

Browse files
committed
refactor: add lock release in after workflow hook
1 parent 2ec13a8 commit d2f1d3d

File tree

6 files changed

+45
-2
lines changed

6 files changed

+45
-2
lines changed

backend/internal/api/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (app *App) registerEWFWorkflows() {
9696
app.communication.notificationDispatcher,
9797
app.infra.gridClient.GridProxyClient,
9898
stripeClient,
99+
app.core.locker,
99100
)
100101
}
101102

backend/internal/api/handlers/deployment_handler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/gin-gonic/gin"
88

9+
distributedlocks "kubecloud/internal/core/distributed_locks"
910
"kubecloud/internal/core/models"
1011
"kubecloud/internal/core/services"
1112
"kubecloud/internal/deployment/kubedeployer"
@@ -240,6 +241,10 @@ func (h *DeploymentHandler) HandleDeployCluster(c *gin.Context) {
240241
wfUUID, wfStatus, err := h.svc.AsyncDeployCluster(config, cluster)
241242
if err != nil {
242243
reqLog.Error().Err(err).Msg("failed to start deployment workflow")
244+
if errors.Is(err, distributedlocks.ErrNodeLocked) {
245+
Conflict(c, "Node is busy serving another request")
246+
return
247+
}
243248
InternalServerError(c)
244249
return
245250
}
@@ -412,6 +417,10 @@ func (h *DeploymentHandler) HandleAddNode(c *gin.Context) {
412417
wfUUID, wfStatus, err := h.svc.AsyncAddNode(config, cl, cluster.Nodes[0])
413418
if err != nil {
414419
reqLog.Error().Err(err).Msg("failed to start add node workflow")
420+
if errors.Is(err, distributedlocks.ErrNodeLocked) {
421+
Conflict(c, "Node is busy serving another request")
422+
return
423+
}
415424
InternalServerError(c)
416425
return
417426
}

backend/internal/api/handlers/node_handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package handlers
33
import (
44
"errors"
55
"fmt"
6+
distributedlocks "kubecloud/internal/core/distributed_locks"
67
"kubecloud/internal/core/models"
78
"math/rand/v2"
89
"net/url"
@@ -297,6 +298,10 @@ func (h *NodeHandler) ReserveNodeHandler(c *gin.Context) {
297298
wfUUID, err := h.svc.AsyncReserveNode(userID, user.Mnemonic, nodeID)
298299
if err != nil {
299300
reqLog.Error().Err(err).Msg("failed to start workflow to reserve node")
301+
if errors.Is(err, distributedlocks.ErrNodeLocked) {
302+
Conflict(c, "Node is busy serving another request")
303+
return
304+
}
300305
InternalServerError(c)
301306
return
302307
}

backend/internal/core/workflows/deployer_activities.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
cfg "kubecloud/internal/config"
9+
distributedlocks "kubecloud/internal/core/distributed_locks"
910
"kubecloud/internal/core/models"
1011
"kubecloud/internal/deployment/kubedeployer"
1112
"kubecloud/internal/deployment/statemanager"
@@ -641,7 +642,7 @@ func createAddNodeWorkflowTemplate(notificationDispatcher *notification.Notifica
641642
return template
642643
}
643644

644-
func registerDeploymentActivities(engine *ewf.Engine, metrics *metricsLib.Metrics, clusterRepo models.ClusterRepository, notificationDispatcher *notification.NotificationDispatcher, config cfg.Configuration) {
645+
func registerDeploymentActivities(engine *ewf.Engine, metrics *metricsLib.Metrics, clusterRepo models.ClusterRepository, notificationDispatcher *notification.NotificationDispatcher, config cfg.Configuration, locker distributedlocks.DistributedLocks) {
645646
engine.Register(StepDeployNetwork, DeployNetworkStep(metrics))
646647
engine.Register(StepDeployLeaderNode, DeployLeaderNodeStep(metrics))
647648
engine.Register(StepBatchDeployAllNodes, BatchDeployAllNodesStep(metrics))
@@ -670,6 +671,7 @@ func registerDeploymentActivities(engine *ewf.Engine, metrics *metricsLib.Metric
670671
deployWFTemplate.AfterStepHooks = []ewf.AfterStepHook{
671672
notifyStepHook(notificationDispatcher),
672673
}
674+
deployWFTemplate.AfterWorkflowHooks = append(deployWFTemplate.AfterWorkflowHooks, releaseLocksHook(locker))
673675
engine.RegisterTemplate(WorkflowDeployCluster, &deployWFTemplate)
674676

675677
deleteWFTemplate := createDeployerWorkflowTemplate(notificationDispatcher, engine, metrics)
@@ -695,6 +697,7 @@ func registerDeploymentActivities(engine *ewf.Engine, metrics *metricsLib.Metric
695697
{Name: StepVerifyNewNodes, RetryPolicy: longExponentialRetryPolicy},
696698
{Name: StepStoreDeployment, RetryPolicy: standardRetryPolicy},
697699
}
700+
addNodeWFTemplate.AfterWorkflowHooks = append(addNodeWFTemplate.AfterWorkflowHooks, releaseLocksHook(locker))
698701
engine.RegisterTemplate(WorkflowAddNode, &addNodeWFTemplate)
699702

700703
removeNodeWFTemplate := createDeployerWorkflowTemplate(notificationDispatcher, engine, metrics)

backend/internal/core/workflows/hooks.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package workflows
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78

89
"time"
910

11+
distributedlocks "kubecloud/internal/core/distributed_locks"
1012
"kubecloud/internal/deployment/kubedeployer"
1113
"kubecloud/internal/deployment/statemanager"
1214
"kubecloud/internal/infrastructure/logger"
@@ -241,3 +243,23 @@ func metricsFailureHook(metrics *metricsLib.Metrics) ewf.AfterWorkflowHook {
241243
}
242244
}
243245
}
246+
247+
func releaseLocksHook(locker distributedlocks.DistributedLocks) ewf.AfterWorkflowHook {
248+
return func(ctx context.Context, wf *ewf.Workflow, _ error) {
249+
log := logger.ForOperation("workflow", "release_locks").With().Str("workflow_name", wf.Name).Logger()
250+
lockedKeys, ok := wf.Metadata["locked_keys"]
251+
if !ok {
252+
return
253+
}
254+
var lockedKeysJSON map[string]string
255+
err := json.Unmarshal([]byte(lockedKeys), &lockedKeysJSON)
256+
if err != nil {
257+
log.Error().Err(err).Msg("failed to unmarshal locked keys")
258+
return
259+
}
260+
if err := locker.ReleaseLock(ctx, lockedKeysJSON); err != nil {
261+
log.Error().Err(err).Msg("failed to release locks")
262+
return
263+
}
264+
}
265+
}

backend/internal/core/workflows/workflow.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package workflows
33
import (
44
"kubecloud/internal/billing"
55
cfg "kubecloud/internal/config"
6+
distributedlocks "kubecloud/internal/core/distributed_locks"
67
"kubecloud/internal/core/models"
78
"kubecloud/internal/core/persistence"
89
"kubecloud/internal/infrastructure/kyc"
@@ -31,6 +32,7 @@ func RegisterEWFWorkflows(
3132
notificationDispatcher *notification.NotificationDispatcher,
3233
proxyClient proxy.Client,
3334
stripeClient billing.StripeClient,
35+
locker distributedlocks.DistributedLocks,
3436
) {
3537
userRepo := persistence.NewGormUserRepository(db)
3638
clusterRepo := persistence.NewGormClusterRepository(db)
@@ -127,6 +129,7 @@ func RegisterEWFWorkflows(
127129
{Name: StepReserveNode, RetryPolicy: &ewf.RetryPolicy{MaxAttempts: 2, BackOff: ewf.ConstantBackoff(2 * time.Second)}},
128130
{Name: StepVerifyNodeState, RetryPolicy: &ewf.RetryPolicy{MaxAttempts: 5, BackOff: ewf.ExponentialBackoff(10*time.Second, 2*time.Minute, 2.0)}},
129131
}
132+
reserveNodeTemplate.AfterWorkflowHooks = append(reserveNodeTemplate.AfterWorkflowHooks, releaseLocksHook(locker))
130133
engine.RegisterTemplate(WorkflowReserveNode, &reserveNodeTemplate)
131134

132135
unreserveNodeTemplate := newKubecloudWorkflowTemplate(notificationDispatcher)
@@ -146,7 +149,7 @@ func RegisterEWFWorkflows(
146149
// trackClusterHealthWFTemplate.BeforeWorkflowHooks = []ewf.BeforeWorkflowHook{hookNotificationWorkflowStarted}
147150
engine.RegisterTemplate(WorkflowTrackClusterHealth, &trackClusterHealthWFTemplate)
148151

149-
registerDeploymentActivities(engine, metrics, clusterRepo, notificationDispatcher, config)
152+
registerDeploymentActivities(engine, metrics, clusterRepo, notificationDispatcher, config, locker)
150153

151154
// Email-only workflow for guaranteed email delivery with retries
152155
emailNotificationTemplate := ewf.WorkflowTemplate{

0 commit comments

Comments
 (0)