Skip to content

Commit e772d6b

Browse files
committed
fix
Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com>
1 parent d5cab28 commit e772d6b

File tree

8 files changed

+50
-226
lines changed

8 files changed

+50
-226
lines changed

runs/repository/impl/action.go

Lines changed: 10 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ type actionRepo struct {
3434
// Subscriber management for LISTEN/NOTIFY
3535
runSubscribers map[chan string]bool
3636
actionSubscribers map[chan string]bool
37-
abortSubscribers map[chan string]bool
3837
mu sync.RWMutex
3938
}
4039

@@ -50,7 +49,6 @@ func NewActionRepo(db *gorm.DB, dbConfig database.DbConfig) interfaces.ActionRep
5049
pgConfig: dbConfig.Postgres,
5150
runSubscribers: make(map[chan string]bool),
5251
actionSubscribers: make(map[chan string]bool),
53-
abortSubscribers: make(map[chan string]bool),
5452
}
5553

5654
// Start LISTEN/NOTIFY for PostgreSQL
@@ -274,9 +272,8 @@ func (r *actionRepo) AbortRun(ctx context.Context, runID *common.RunIdentifier,
274272
return fmt.Errorf("failed to abort run: %w", result.Error)
275273
}
276274

277-
// Notify run subscribers and abort reconciler.
275+
// Notify run subscribers.
278276
r.notifyRunUpdate(ctx, runID)
279-
r.notifyAbortRequest(ctx, &common.ActionIdentifier{Run: runID, Name: runID.Name})
280277

281278
logger.Infof(ctx, "Aborted run: %s/%s/%s/%s", runID.Org, runID.Project, runID.Domain, runID.Name)
282279
return nil
@@ -476,17 +473,16 @@ func (r *actionRepo) AbortAction(ctx context.Context, actionID *common.ActionIde
476473

477474
result := r.db.WithContext(ctx).
478475
Model(&models.Action{}).
479-
Where("org = ? AND project = ? AND domain = ? AND name = ?",
480-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Name).
476+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
477+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
481478
Updates(updates)
482479

483480
if result.Error != nil {
484481
return fmt.Errorf("failed to abort action: %w", result.Error)
485482
}
486483

487-
// Notify action subscribers and abort reconciler.
484+
// Notify action subscribers.
488485
r.notifyActionUpdate(ctx, actionID)
489-
r.notifyAbortRequest(ctx, actionID)
490486

491487
logger.Infof(ctx, "Aborted action: %s", actionID.Name)
492488
return nil
@@ -509,8 +505,8 @@ func (r *actionRepo) ListPendingAborts(ctx context.Context) ([]*models.Action, e
509505
func (r *actionRepo) MarkAbortAttempt(ctx context.Context, actionID *common.ActionIdentifier) (int, error) {
510506
result := r.db.WithContext(ctx).
511507
Model(&models.Action{}).
512-
Where("org = ? AND project = ? AND domain = ? AND name = ?",
513-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Name).
508+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
509+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
514510
Updates(map[string]interface{}{
515511
"abort_attempt_count": gorm.Expr("abort_attempt_count + 1"),
516512
"updated_at": time.Now(),
@@ -523,8 +519,8 @@ func (r *actionRepo) MarkAbortAttempt(ctx context.Context, actionID *common.Acti
523519
var action models.Action
524520
if err := r.db.WithContext(ctx).
525521
Select("abort_attempt_count").
526-
Where("org = ? AND project = ? AND domain = ? AND name = ?",
527-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Name).
522+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
523+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
528524
First(&action).Error; err != nil {
529525
return 0, fmt.Errorf("failed to read abort attempt count: %w", err)
530526
}
@@ -535,8 +531,8 @@ func (r *actionRepo) MarkAbortAttempt(ctx context.Context, actionID *common.Acti
535531
func (r *actionRepo) ClearAbortRequest(ctx context.Context, actionID *common.ActionIdentifier) error {
536532
result := r.db.WithContext(ctx).
537533
Model(&models.Action{}).
538-
Where("org = ? AND project = ? AND domain = ? AND name = ?",
539-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Name).
534+
Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND name = ?",
535+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name).
540536
Updates(map[string]interface{}{
541537
"abort_requested_at": nil,
542538
"abort_attempt_count": 0,
@@ -549,78 +545,6 @@ func (r *actionRepo) ClearAbortRequest(ctx context.Context, actionID *common.Act
549545
return nil
550546
}
551547

552-
// WatchAbortRequests delivers abort_requests NOTIFY payloads to the caller.
553-
// On non-Postgres setups it polls for pending aborts every 5 seconds.
554-
func (r *actionRepo) WatchAbortRequests(ctx context.Context, payloads chan<- string, errs chan<- error) {
555-
if r.isPostgres {
556-
notifCh := make(chan string, 20)
557-
558-
r.mu.Lock()
559-
r.abortSubscribers[notifCh] = true
560-
r.mu.Unlock()
561-
562-
defer func() {
563-
r.mu.Lock()
564-
delete(r.abortSubscribers, notifCh)
565-
close(notifCh)
566-
r.mu.Unlock()
567-
}()
568-
569-
for {
570-
select {
571-
case <-ctx.Done():
572-
return
573-
case payload := <-notifCh:
574-
select {
575-
case payloads <- payload:
576-
case <-ctx.Done():
577-
return
578-
}
579-
}
580-
}
581-
} else {
582-
// SQLite fallback: poll for pending aborts periodically.
583-
ticker := time.NewTicker(5 * time.Second)
584-
defer ticker.Stop()
585-
for {
586-
select {
587-
case <-ctx.Done():
588-
return
589-
case <-ticker.C:
590-
actions, err := r.ListPendingAborts(ctx)
591-
if err != nil {
592-
select {
593-
case errs <- err:
594-
default:
595-
}
596-
continue
597-
}
598-
for _, a := range actions {
599-
payload := fmt.Sprintf("%s/%s/%s/%s/%s", a.Org, a.Project, a.Domain, a.RunName, a.Name)
600-
select {
601-
case payloads <- payload:
602-
case <-ctx.Done():
603-
return
604-
}
605-
}
606-
}
607-
}
608-
}
609-
}
610-
611-
// notifyAbortRequest fires NOTIFY abort_requests so the AbortReconciler wakes immediately.
612-
func (r *actionRepo) notifyAbortRequest(ctx context.Context, actionID *common.ActionIdentifier) {
613-
if !r.isPostgres {
614-
return
615-
}
616-
payload := fmt.Sprintf("%s/%s/%s/%s/%s",
617-
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name)
618-
notifySQL := fmt.Sprintf("NOTIFY abort_requests, '%s'", payload)
619-
if err := r.db.WithContext(ctx).Exec(notifySQL).Error; err != nil {
620-
logger.Errorf(ctx, "Failed to NOTIFY abort_requests: %v", err)
621-
}
622-
}
623-
624548
// UpdateActionState updates the state of an action
625549
func (r *actionRepo) UpdateActionState(ctx context.Context, actionID *common.ActionIdentifier, state string) error {
626550
// Parse the state JSON to extract the phase
@@ -953,11 +877,6 @@ func (r *actionRepo) startPostgresListener() {
953877
return
954878
}
955879

956-
if err := r.listener.Listen("abort_requests"); err != nil {
957-
logger.Errorf(context.Background(), "Failed to listen to abort_requests: %v", err)
958-
return
959-
}
960-
961880
logger.Infof(context.Background(), "PostgreSQL LISTEN/NOTIFY started")
962881

963882
// Process notifications
@@ -994,17 +913,6 @@ func (r *actionRepo) startPostgresListener() {
994913
}
995914
r.mu.RUnlock()
996915

997-
case "abort_requests":
998-
// Broadcast to all abort reconciler subscribers
999-
r.mu.RLock()
1000-
for ch := range r.abortSubscribers {
1001-
select {
1002-
case ch <- notif.Extra:
1003-
default:
1004-
logger.Warnf(context.Background(), "Abort subscriber channel full, dropping notification")
1005-
}
1006-
}
1007-
r.mu.RUnlock()
1008916
}
1009917

1010918
case <-time.After(90 * time.Second):

runs/repository/interfaces/action.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type ActionRepo interface {
3030
ListPendingAborts(ctx context.Context) ([]*models.Action, error)
3131
MarkAbortAttempt(ctx context.Context, actionID *common.ActionIdentifier) (attemptCount int, err error)
3232
ClearAbortRequest(ctx context.Context, actionID *common.ActionIdentifier) error
33-
WatchAbortRequests(ctx context.Context, payloads chan<- string, errs chan<- error)
3433

3534
// Watch operations (for streaming)
3635
WatchRunUpdates(ctx context.Context, runID *common.RunIdentifier, updates chan<- *models.Run, errs chan<- error)

runs/repository/mocks/action_repo.go

Lines changed: 0 additions & 35 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runs/service/abort_reconciler.go

Lines changed: 14 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package service
33
import (
44
"context"
55
"fmt"
6-
"strings"
76
"sync"
87
"time"
98

@@ -85,8 +84,8 @@ func (q *dedupeQueue) push(ctx context.Context, task abortTask) bool {
8584
}
8685

8786
// scheduleRequeue re-enqueues the task after delay.
88-
// The key remains in the set during the wait window so that any NOTIFY arriving during
89-
// the backoff is correctly deduped (no duplicate processing).
87+
// The key remains in the set during the wait window so that any Push call arriving
88+
// during the backoff is correctly deduped (no duplicate processing).
9089
func (q *dedupeQueue) scheduleRequeue(ctx context.Context, task abortTask, delay time.Duration) {
9190
time.AfterFunc(delay, func() {
9291
// Remove then re-push so push's dedup check passes.
@@ -139,6 +138,14 @@ func NewAbortReconciler(repo interfaces.Repository, actionsClient actionsconnect
139138
}
140139
}
141140

141+
// Push enqueues an abort request for the given action. Safe to call concurrently.
142+
// No-op if the key is already queued (dedup).
143+
func (r *AbortReconciler) Push(ctx context.Context, actionID *common.ActionIdentifier, reason string) {
144+
key := fmt.Sprintf("%s/%s/%s/%s/%s",
145+
actionID.Run.Org, actionID.Run.Project, actionID.Run.Domain, actionID.Run.Name, actionID.Name)
146+
r.queue.push(ctx, abortTask{actionID: actionID, reason: reason, key: key})
147+
}
148+
142149
// Run starts the reconciler. It blocks until ctx is cancelled.
143150
func (r *AbortReconciler) Run(ctx context.Context) error {
144151
logger.Infof(ctx, "AbortReconciler starting (%d workers, max %d attempts)", r.cfg.Workers, r.cfg.MaxAttempts)
@@ -155,46 +162,14 @@ func (r *AbortReconciler) Run(ctx context.Context) error {
155162
}()
156163
}
157164

158-
// Startup scan: enqueue any actions that were left pending from before this process started.
165+
// Startup scan: enqueue any actions left pending before this process started (crash recovery).
159166
if err := r.startupScan(ctx); err != nil {
160167
logger.Errorf(ctx, "AbortReconciler startup scan failed: %v", err)
161-
// Non-fatal — the NOTIFY watcher will still pick up new aborts.
162168
}
163169

164-
// Watch for new abort requests via NOTIFY (or polling on SQLite).
165-
payloads := make(chan string, 50)
166-
errs := make(chan error, 10)
167-
go r.repo.ActionRepo().WatchAbortRequests(ctx, payloads, errs)
168-
169-
for {
170-
select {
171-
case <-ctx.Done():
172-
wg.Wait()
173-
return nil
174-
case err := <-errs:
175-
logger.Warnf(ctx, "AbortReconciler watch error: %v", err)
176-
case payload := <-payloads:
177-
task, err := parseAbortPayload(payload)
178-
if err != nil {
179-
logger.Warnf(ctx, "AbortReconciler: invalid payload %q: %v", payload, err)
180-
continue
181-
}
182-
// Fetch current abort_reason from DB (NOTIFY payload does not carry reason).
183-
action, err := r.repo.ActionRepo().GetAction(ctx, task.actionID)
184-
if err != nil {
185-
logger.Warnf(ctx, "AbortReconciler: failed to fetch action %s: %v", task.key, err)
186-
continue
187-
}
188-
if action.AbortRequestedAt == nil {
189-
// Already cleared (race between scan and notify) — skip.
190-
continue
191-
}
192-
if action.AbortReason != nil {
193-
task.reason = *action.AbortReason
194-
}
195-
r.queue.push(ctx, task)
196-
}
197-
}
170+
<-ctx.Done()
171+
wg.Wait()
172+
return nil
198173
}
199174

200175
// startupScan enqueues all actions that have abort_requested_at set.
@@ -297,23 +272,3 @@ func isAlreadyTerminated(err error) bool {
297272
}
298273
return connectErr.Code() == connect.CodeNotFound
299274
}
300-
301-
// parseAbortPayload parses "org/project/domain/run/actionName" into an abortTask.
302-
func parseAbortPayload(payload string) (abortTask, error) {
303-
parts := strings.SplitN(payload, "/", 5)
304-
if len(parts) != 5 {
305-
return abortTask{}, fmt.Errorf("expected 5 parts, got %d", len(parts))
306-
}
307-
return abortTask{
308-
key: payload,
309-
actionID: &common.ActionIdentifier{
310-
Run: &common.RunIdentifier{
311-
Org: parts[0],
312-
Project: parts[1],
313-
Domain: parts[2],
314-
Name: parts[3],
315-
},
316-
Name: parts[4],
317-
},
318-
}, nil
319-
}

runs/service/abort_reconciler_test.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,12 @@ import (
1818
)
1919

2020
// newTestReconciler builds a reconciler wired to mocks with fast timing for tests.
21-
// WatchAbortRequests is set up to block until ctx is cancelled so tests control
22-
// the queue via the startup scan and direct queue pushes.
2321
func newTestReconciler(t *testing.T) (*repoMocks.ActionRepo, *mockActionsClient, *AbortReconciler) {
2422
t.Helper()
2523
actionRepo := repoMocks.NewActionRepo(t)
2624
repo := repoMocks.NewRepository(t)
2725
repo.On("ActionRepo").Return(actionRepo).Maybe()
2826

29-
actionRepo.On("WatchAbortRequests", mock.Anything, mock.Anything, mock.Anything).
30-
Run(func(args mock.Arguments) {
31-
ctx := args.Get(0).(context.Context)
32-
<-ctx.Done()
33-
}).
34-
Return().Maybe()
35-
3627
actionsClient := &mockActionsClient{}
3728

3829
reconciler := NewAbortReconciler(repo, actionsClient, AbortReconcilerConfig{
@@ -168,11 +159,7 @@ func TestAbortReconciler_DeduplicatesQueue(t *testing.T) {
168159

169160
// Push a duplicate while the first is being processed.
170161
time.Sleep(5 * time.Millisecond)
171-
reconciler.queue.push(ctx, abortTask{
172-
actionID: actionID,
173-
key: "org/proj/dev/rtest1/rtest1",
174-
reason: "dup",
175-
})
162+
reconciler.Push(ctx, actionID, "dup")
176163

177164
assert.Eventually(t, cleared.Load, 400*time.Millisecond, 5*time.Millisecond)
178165
// Abort should only have been called once.

0 commit comments

Comments
 (0)