Skip to content

Commit 3bb0827

Browse files
authored
feat(runs): async abort reconciler with retry and crash recovery (#7080)
* abort reconciller Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> * mocs added Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> * fix: workers run before startup scan Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> * fix Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> * review fix Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --------- Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com>
1 parent a8d8f23 commit 3bb0827

File tree

11 files changed

+788
-78
lines changed

11 files changed

+788
-78
lines changed

runs/repository/impl/action.go

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,13 @@ func (r *actionRepo) ListRuns(ctx context.Context, req *workflow.ListRunsRequest
254254

255255
// AbortRun aborts a run and all its actions
256256
func (r *actionRepo) AbortRun(ctx context.Context, runID *common.RunIdentifier, reason string, abortedBy *common.EnrichedIdentity) error {
257-
// Update the run action to aborted
257+
now := time.Now()
258258
updates := map[string]interface{}{
259-
"phase": int32(common.ActionPhase_ACTION_PHASE_ABORTED),
260-
"updated_at": time.Now(),
259+
"phase": int32(common.ActionPhase_ACTION_PHASE_ABORTED),
260+
"updated_at": now,
261+
"abort_requested_at": now,
262+
"abort_attempt_count": 0,
263+
"abort_reason": reason,
261264
}
262265

263266
result := r.db.WithContext(ctx).
@@ -270,7 +273,7 @@ func (r *actionRepo) AbortRun(ctx context.Context, runID *common.RunIdentifier,
270273
return fmt.Errorf("failed to abort run: %w", result.Error)
271274
}
272275

273-
// Notify subscribers
276+
// Notify run subscribers.
274277
r.notifyRunUpdate(ctx, runID)
275278

276279
logger.Infof(ctx, "Aborted run: %s/%s/%s/%s", runID.Org, runID.Project, runID.Domain, runID.Name)
@@ -509,28 +512,81 @@ func (r *actionRepo) UpdateActionPhase(
509512

510513
// AbortAction aborts a specific action
511514
func (r *actionRepo) AbortAction(ctx context.Context, actionID *common.ActionIdentifier, reason string, abortedBy *common.EnrichedIdentity) error {
515+
now := time.Now()
512516
updates := map[string]interface{}{
513-
"phase": int32(common.ActionPhase_ACTION_PHASE_ABORTED),
514-
"updated_at": time.Now(),
517+
"phase": int32(common.ActionPhase_ACTION_PHASE_ABORTED),
518+
"updated_at": now,
519+
"abort_requested_at": now,
520+
"abort_attempt_count": 0,
521+
"abort_reason": reason,
515522
}
516523

517524
result := r.db.WithContext(ctx).
518525
Model(&models.Action{}).
519-
Where("org = ? AND project = ? AND domain = ? AND name = ?",
520-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Name).
526+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
527+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
521528
Updates(updates)
522529

523530
if result.Error != nil {
524531
return fmt.Errorf("failed to abort action: %w", result.Error)
525532
}
526533

527-
// Notify subscribers
534+
// Notify action subscribers.
528535
r.notifyActionUpdate(ctx, actionID)
529536

530537
logger.Infof(ctx, "Aborted action: %s", actionID.Name)
531538
return nil
532539
}
533540

541+
// ListPendingAborts returns all actions that have abort_requested_at set (i.e. awaiting pod termination).
542+
func (r *actionRepo) ListPendingAborts(ctx context.Context) ([]*models.Action, error) {
543+
var actions []*models.Action
544+
result := r.db.WithContext(ctx).
545+
Where("abort_requested_at IS NOT NULL").
546+
Find(&actions)
547+
if result.Error != nil {
548+
return nil, fmt.Errorf("failed to list pending aborts: %w", result.Error)
549+
}
550+
return actions, nil
551+
}
552+
553+
// MarkAbortAttempt increments abort_attempt_count and returns the new value.
554+
// Called by the reconciler before each actionsClient.Abort call.
555+
func (r *actionRepo) MarkAbortAttempt(ctx context.Context, actionID *common.ActionIdentifier) (int, error) {
556+
var action models.Action
557+
result := r.db.WithContext(ctx).
558+
Model(&action).
559+
Clauses(clause.Returning{Columns: []clause.Column{{Name: "abort_attempt_count"}}}).
560+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
561+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
562+
Updates(map[string]interface{}{
563+
"abort_attempt_count": gorm.Expr("abort_attempt_count + 1"),
564+
"updated_at": time.Now(),
565+
})
566+
if result.Error != nil {
567+
return 0, fmt.Errorf("failed to mark abort attempt: %w", result.Error)
568+
}
569+
return action.AbortAttemptCount, nil
570+
}
571+
572+
// ClearAbortRequest clears abort_requested_at (and resets counters) once the pod is confirmed terminated.
573+
func (r *actionRepo) ClearAbortRequest(ctx context.Context, actionID *common.ActionIdentifier) error {
574+
result := r.db.WithContext(ctx).
575+
Model(&models.Action{}).
576+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
577+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
578+
Updates(map[string]interface{}{
579+
"abort_requested_at": nil,
580+
"abort_attempt_count": 0,
581+
"abort_reason": nil,
582+
"updated_at": time.Now(),
583+
})
584+
if result.Error != nil {
585+
return fmt.Errorf("failed to clear abort request: %w", result.Error)
586+
}
587+
return nil
588+
}
589+
534590
// UpdateActionState updates the state of an action
535591
func (r *actionRepo) UpdateActionState(ctx context.Context, actionID *common.ActionIdentifier, state string) error {
536592
// Parse the state JSON to extract the phase
@@ -936,11 +992,11 @@ func (r *actionRepo) startPostgresListener() {
936992
select {
937993
case ch <- notif.Extra:
938994
default:
939-
// Channel full, skip this subscriber
940995
logger.Warnf(context.Background(), "Action subscriber channel full, dropping notification")
941996
}
942997
}
943998
r.mu.RUnlock()
999+
9441000
}
9451001

9461002
case <-time.After(90 * time.Second):

runs/repository/impl/action_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ func setupActionDB(t *testing.T) *gorm.DB {
4646
action_details BLOB,
4747
detailed_info BLOB,
4848
run_spec BLOB,
49+
abort_requested_at DATETIME,
50+
abort_attempt_count INTEGER NOT NULL DEFAULT 0,
51+
abort_reason TEXT,
4952
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
5053
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
5154
ended_at DATETIME,

runs/repository/interfaces/action.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type ActionRepo interface {
2828
UpdateActionPhase(ctx context.Context, actionID *common.ActionIdentifier, phase common.ActionPhase, attempts uint32, cacheStatus core.CatalogCacheStatus, endTime *time.Time) error
2929
AbortAction(ctx context.Context, actionID *common.ActionIdentifier, reason string, abortedBy *common.EnrichedIdentity) error
3030

31+
// Abort reconciliation — used by the background AbortReconciler.
32+
ListPendingAborts(ctx context.Context) ([]*models.Action, error)
33+
MarkAbortAttempt(ctx context.Context, actionID *common.ActionIdentifier) (attemptCount int, err error)
34+
ClearAbortRequest(ctx context.Context, actionID *common.ActionIdentifier) error
35+
3136
// Watch operations (for streaming)
3237
WatchRunUpdates(ctx context.Context, runID *common.RunIdentifier, updates chan<- *models.Run, errs chan<- error)
3338
WatchAllRunUpdates(ctx context.Context, updates chan<- *models.Run, errs chan<- error)

runs/repository/mocks/action_repo.go

Lines changed: 162 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runs/repository/models/action.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ type Action struct {
6565
// interruptible, cluster, etc.) for this action's run.
6666
RunSpec []byte `gorm:"type:bytea" db:"run_spec"`
6767

68+
// Abort tracking — set when a user requests abort; cleared once the pod is confirmed terminated.
69+
AbortRequestedAt *time.Time `gorm:"index:idx_actions_abort_pending" db:"abort_requested_at"`
70+
AbortAttemptCount int `gorm:"not null;default:0" db:"abort_attempt_count"`
71+
AbortReason *string `db:"abort_reason"`
72+
6873
// Timestamps
6974
// CreatedAt is set by the DB (NOW()) on insert — represents action start time.
7075
CreatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP;index:idx_actions_created" db:"created_at"`

0 commit comments

Comments
 (0)