Skip to content

Commit 623009e

Browse files
retlehsclaude
andcommitted
Fix "build already running" stuck state on prod
The admin UI's Run Build Now always failed on prod because the handler passed a locked flock fd to the child pipeline process via ExtraFiles. Parent and child shared the same open file description, so if the child became orphaned (e.g. server restart mid-build), the lock was held indefinitely — even across server restarts. Replace the fd-passing design with a DB-based build slot: - Handler atomically INSERTs a "running" row (with server PID as placeholder) before starting the child, using INSERT...WHERE NOT EXISTS to serialize concurrent requests - Child pipeline claims the row via UPDATE with its own PID and validates it exists via RowsAffected - Reaper goroutine marks the row "failed" if the child exits before recording its own outcome, preventing stuck rows - Stale cleanup cancels "running" rows whose PID is dead (ESRCH) - Child keeps its own independent flock for CLI-level protection - Remove PIPELINE_LOCK_FD inheritance mechanism and related tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cc05854 commit 623009e

File tree

3 files changed

+130
-147
lines changed

3 files changed

+130
-147
lines changed

cmd/wpcomposer/cmd/pipeline.go

Lines changed: 35 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10-
"strconv"
1110
"syscall"
1211
"time"
1312

@@ -26,41 +25,8 @@ func acquirePipelineLock() error {
2625
return acquireLock(pipelineLockPath)
2726
}
2827

29-
// acquireLock acquires an exclusive file lock at lockPath. If PIPELINE_LOCK_FD is
30-
// set, the lock is expected to have been inherited from the parent process via fd
31-
// passing; the inherited fd is validated for both inode identity and lock ownership.
28+
// acquireLock acquires an exclusive file lock at lockPath.
3229
func acquireLock(lockPath string) error {
33-
if v := os.Getenv("PIPELINE_LOCK_FD"); v != "" {
34-
fd, err := strconv.Atoi(v)
35-
if err != nil {
36-
return fmt.Errorf("invalid PIPELINE_LOCK_FD: %w", err)
37-
}
38-
pipelineLockFile = os.NewFile(uintptr(fd), lockPath)
39-
if pipelineLockFile == nil {
40-
return fmt.Errorf("PIPELINE_LOCK_FD=%d: invalid file descriptor", fd)
41-
}
42-
43-
// Verify the inherited fd points to the actual lock file (inode match).
44-
var fdStat, pathStat syscall.Stat_t
45-
if err := syscall.Fstat(fd, &fdStat); err != nil {
46-
return fmt.Errorf("PIPELINE_LOCK_FD=%d: fstat failed: %w", fd, err)
47-
}
48-
if err := syscall.Stat(lockPath, &pathStat); err != nil {
49-
return fmt.Errorf("pipeline lock stat: %w", err)
50-
}
51-
if fdStat.Dev != pathStat.Dev || fdStat.Ino != pathStat.Ino {
52-
return fmt.Errorf("PIPELINE_LOCK_FD=%d does not refer to %s", fd, lockPath)
53-
}
54-
55-
// Verify this fd actually holds the lock. Re-locking our own fd is a
56-
// no-op in flock, so LOCK_EX|LOCK_NB succeeds if we hold it, and fails
57-
// with EWOULDBLOCK if a different open file description holds it.
58-
if err := syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
59-
return fmt.Errorf("PIPELINE_LOCK_FD=%d does not hold the pipeline lock", fd)
60-
}
61-
return nil
62-
}
63-
6430
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644)
6531
if err != nil {
6632
return fmt.Errorf("pipeline lock: %w", err)
@@ -85,38 +51,53 @@ var pipelineBuildID string
8551

8652
func runPipeline(cmd *cobra.Command, args []string) error {
8753
// Acquire a system-wide file lock so only one pipeline runs at a time.
88-
// When triggered from the admin UI, the parent process passes the already-locked
89-
// fd via PIPELINE_LOCK_FD so the lock transfers atomically with no TOCTOU gap.
9054
if err := acquirePipelineLock(); err != nil {
9155
return err
9256
}
9357

9458
skipDiscover, _ := cmd.Flags().GetBool("skip-discover")
9559
skipDeploy, _ := cmd.Flags().GetBool("skip-deploy")
9660
discoverSource, _ := cmd.Flags().GetString("discover-source")
61+
buildIDFlag, _ := cmd.Flags().GetString("build-id")
9762

9863
ctx := cmd.Context()
9964
started := time.Now().UTC()
10065

10166
// Mark any stale "running" builds (dead PID) as cancelled.
10267
markStaleBuildsCancelled(ctx, application.DB)
10368

104-
// Record this build as "running" before doing any work.
105-
buildID := started.Format("20060102-150405")
69+
// When triggered from the admin UI, a "running" row already exists —
70+
// claim it by updating the PID to our own and verify it exists. When
71+
// invoked from the CLI, insert a new row.
72+
var buildID string
73+
if buildIDFlag != "" {
74+
buildID = buildIDFlag
75+
res, err := application.DB.ExecContext(ctx, `
76+
UPDATE builds SET pid = ? WHERE id = ? AND status = 'running'`,
77+
os.Getpid(), buildID)
78+
if err != nil {
79+
return fmt.Errorf("claiming build %s: %w", buildID, err)
80+
}
81+
if n, _ := res.RowsAffected(); n == 0 {
82+
return fmt.Errorf("build %s not found or not in running state", buildID)
83+
}
84+
} else {
85+
buildID = started.Format("20060102-150405")
86+
_, err := application.DB.ExecContext(ctx, `
87+
INSERT INTO builds (id, started_at, status, pid,
88+
packages_total, packages_changed, packages_skipped,
89+
provider_groups, artifact_count, root_hash, manifest_json)
90+
VALUES (?, ?, 'running', ?, 0, 0, 0, 0, 0, '', '{}')`,
91+
buildID,
92+
started.Format(time.RFC3339),
93+
os.Getpid(),
94+
)
95+
if err != nil {
96+
return fmt.Errorf("recording running build: %w", err)
97+
}
98+
}
10699
pipelineBuildID = buildID
107100
defer func() { pipelineBuildID = "" }()
108-
_, err := application.DB.ExecContext(ctx, `
109-
INSERT INTO builds (id, started_at, status, pid,
110-
packages_total, packages_changed, packages_skipped,
111-
provider_groups, artifact_count, root_hash, manifest_json)
112-
VALUES (?, ?, 'running', ?, 0, 0, 0, 0, 0, '', '{}')`,
113-
buildID,
114-
started.Format(time.RFC3339),
115-
os.Getpid(),
116-
)
117-
if err != nil {
118-
return fmt.Errorf("recording running build: %w", err)
119-
}
120101

121102
if err := executePipelineSteps(cmd, ctx, skipDiscover, skipDeploy, discoverSource); err != nil {
122103
recordFailedBuild(cmd, started, err)
@@ -222,7 +203,8 @@ func recordFailedBuild(cmd *cobra.Command, started time.Time, pipelineErr error)
222203
// markStaleBuildsCancelled finds builds with status "running" whose PID is no
223204
// longer alive and marks them as "cancelled".
224205
func markStaleBuildsCancelled(ctx context.Context, db *sql.DB) {
225-
rows, err := db.QueryContext(ctx, `SELECT id, pid FROM builds WHERE status = 'running' AND pid IS NOT NULL`)
206+
rows, err := db.QueryContext(ctx,
207+
`SELECT id, pid FROM builds WHERE status = 'running' AND pid IS NOT NULL`)
226208
if err != nil {
227209
return
228210
}
@@ -259,5 +241,6 @@ func init() {
259241
pipelineCmd.Flags().String("discover-source", "config", "discovery source (config or svn)")
260242
pipelineCmd.Flags().Bool("skip-discover", false, "skip the discover step")
261243
pipelineCmd.Flags().Bool("skip-deploy", false, "skip the deploy step")
244+
pipelineCmd.Flags().String("build-id", "", "pre-allocated build ID (set by admin UI trigger)")
262245
rootCmd.AddCommand(pipelineCmd)
263246
}
Lines changed: 4 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package cmd
22

33
import (
4-
"fmt"
54
"os"
65
"path/filepath"
76
"syscall"
@@ -14,7 +13,6 @@ func resetLockState() {
1413
_ = pipelineLockFile.Close()
1514
pipelineLockFile = nil
1615
}
17-
_ = os.Unsetenv("PIPELINE_LOCK_FD")
1816
}
1917

2018
func TestAcquireLock_BlocksSecondCaller(t *testing.T) {
@@ -41,85 +39,17 @@ func TestAcquireLock_BlocksSecondCaller(t *testing.T) {
4139
}
4240
}
4341

44-
func TestAcquireLock_InheritedFD_Valid(t *testing.T) {
42+
func TestAcquireLock_SucceedsWhenFree(t *testing.T) {
4543
t.Cleanup(resetLockState)
4644

4745
lockPath := filepath.Join(t.TempDir(), "pipeline.lock")
4846

49-
// Simulate parent: open and lock the file.
50-
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644)
51-
if err != nil {
52-
t.Fatal(err)
53-
}
54-
t.Cleanup(func() {
55-
_ = syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
56-
_ = f.Close()
57-
})
58-
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
59-
t.Fatalf("flock failed: %v", err)
60-
}
61-
62-
t.Setenv("PIPELINE_LOCK_FD", fmt.Sprintf("%d", f.Fd()))
63-
if err := acquireLock(lockPath); err != nil {
64-
t.Fatalf("inherited fd should be accepted: %v", err)
65-
}
66-
}
67-
68-
func TestAcquireLock_InheritedFD_WrongFile(t *testing.T) {
69-
t.Cleanup(resetLockState)
70-
71-
dir := t.TempDir()
72-
lockPath := filepath.Join(dir, "pipeline.lock")
73-
74-
// Create the lock file so stat succeeds.
47+
// Pre-create the file (simulates previous run).
7548
if err := os.WriteFile(lockPath, nil, 0644); err != nil {
7649
t.Fatal(err)
7750
}
7851

79-
// Open a different file and lock it.
80-
other, err := os.CreateTemp(dir, "other")
81-
if err != nil {
82-
t.Fatal(err)
83-
}
84-
t.Cleanup(func() { _ = other.Close() })
85-
_ = syscall.Flock(int(other.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
86-
87-
t.Setenv("PIPELINE_LOCK_FD", fmt.Sprintf("%d", other.Fd()))
88-
err = acquireLock(lockPath)
89-
if err == nil {
90-
t.Fatal("expected rejection for fd pointing to wrong file")
91-
}
92-
}
93-
94-
func TestAcquireLock_InheritedFD_NotLocked(t *testing.T) {
95-
t.Cleanup(resetLockState)
96-
97-
lockPath := filepath.Join(t.TempDir(), "pipeline.lock")
98-
99-
// Open the correct file but don't lock it.
100-
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644)
101-
if err != nil {
102-
t.Fatal(err)
103-
}
104-
t.Cleanup(func() { _ = f.Close() })
105-
106-
// Hold the lock from a *different* fd to simulate a real pipeline running.
107-
holder, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644)
108-
if err != nil {
109-
t.Fatal(err)
110-
}
111-
t.Cleanup(func() {
112-
_ = syscall.Flock(int(holder.Fd()), syscall.LOCK_UN)
113-
_ = holder.Close()
114-
})
115-
if err := syscall.Flock(int(holder.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
116-
t.Fatalf("flock failed: %v", err)
117-
}
118-
119-
// f points to the right file but doesn't hold the lock — should be rejected.
120-
t.Setenv("PIPELINE_LOCK_FD", fmt.Sprintf("%d", f.Fd()))
121-
err = acquireLock(lockPath)
122-
if err == nil {
123-
t.Fatal("expected rejection for fd that doesn't hold the lock")
52+
if err := acquireLock(lockPath); err != nil {
53+
t.Fatalf("lock acquisition should succeed on free file: %v", err)
12454
}
12555
}

internal/http/handlers.go

Lines changed: 91 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"database/sql"
88
"encoding/hex"
99
"encoding/json"
10+
"errors"
1011
"fmt"
12+
"log/slog"
1113
"net/http"
1214
"os"
1315
"os/exec"
@@ -371,8 +373,6 @@ func handleAdminBuilds(a *app.App, tmpl *templateSet) http.HandlerFunc {
371373
}
372374
}
373375

374-
const pipelineLockPath = "storage/pipeline.lock"
375-
376376
func handleTriggerBuild(a *app.App) http.HandlerFunc {
377377
return func(w http.ResponseWriter, r *http.Request) {
378378
self, err := os.Executable()
@@ -382,35 +382,73 @@ func handleTriggerBuild(a *app.App) http.HandlerFunc {
382382
return
383383
}
384384

385-
// Acquire the lock before spawning so we know the status is accurate.
386-
lockFile, err := os.OpenFile(pipelineLockPath, os.O_CREATE|os.O_RDWR, 0644)
385+
// Clean up stale "running" rows (dead PID) before checking.
386+
markStaleBuildsCancelled(r.Context(), a.DB, a.Logger)
387+
388+
// Atomically claim a build slot before starting the child. The row
389+
// is inserted with the server's own PID so that stale cleanup
390+
// (which checks PID liveness) cannot cancel it while we start the
391+
// child. The child will UPDATE the PID to its own once it begins.
392+
buildID := time.Now().UTC().Format("20060102-150405")
393+
res, err := a.DB.ExecContext(r.Context(), `
394+
INSERT INTO builds (id, started_at, status, pid,
395+
packages_total, packages_changed, packages_skipped,
396+
provider_groups, artifact_count, root_hash, manifest_json)
397+
SELECT ?, ?, 'running', ?, 0, 0, 0, 0, 0, '', '{}'
398+
WHERE NOT EXISTS (
399+
SELECT 1 FROM builds WHERE status = 'running'
400+
)`,
401+
buildID,
402+
time.Now().UTC().Format(time.RFC3339),
403+
os.Getpid(),
404+
)
387405
if err != nil {
388-
a.Logger.Error("failed to open pipeline lock", "error", err)
406+
a.Logger.Error("failed to claim build slot", "error", err)
389407
http.Redirect(w, r, "/admin/builds?error=internal+error", http.StatusSeeOther)
390408
return
391409
}
392-
if err := syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
393-
_ = lockFile.Close()
410+
n, _ := res.RowsAffected()
411+
if n == 0 {
394412
http.Redirect(w, r, "/admin/builds?error=build+already+running", http.StatusSeeOther)
395413
return
396414
}
397415

398-
// Pass the locked fd to the child via ExtraFiles (fd 3) so the lock
399-
// transfers atomically. The child detects PIPELINE_LOCK_FD and skips
400-
// its own acquisition.
416+
// Slot claimed — start the child. On failure, mark the row
417+
// failed so the slot is freed.
418+
cmd := exec.Command(self, "pipeline", "--build-id", buildID)
419+
cmd.Stdout = os.Stdout
420+
cmd.Stderr = os.Stderr
421+
if err := cmd.Start(); err != nil {
422+
a.Logger.Error("failed to start pipeline", "error", err)
423+
_, _ = a.DB.ExecContext(r.Context(), `
424+
UPDATE builds SET status = 'failed', finished_at = ?,
425+
error_message = ? WHERE id = ?`,
426+
time.Now().UTC().Format(time.RFC3339),
427+
"failed to start: "+err.Error(), buildID)
428+
http.Redirect(w, r, "/admin/builds?error=internal+error", http.StatusSeeOther)
429+
return
430+
}
431+
432+
// Update the row with the child's actual PID so stale cleanup
433+
// tracks the right process going forward.
434+
if _, err := a.DB.ExecContext(r.Context(),
435+
`UPDATE builds SET pid = ? WHERE id = ?`,
436+
cmd.Process.Pid, buildID); err != nil {
437+
a.Logger.Warn("failed to update build PID", "build_id", buildID, "error", err)
438+
}
439+
440+
// Reap the child in the background. If the child exits with an
441+
// error and the row is still in "running" state (i.e. the child
442+
// never got far enough to record its own outcome), mark it failed
443+
// here so the slot is freed.
401444
go func() {
402-
defer func() {
403-
_ = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_UN)
404-
_ = lockFile.Close()
405-
}()
406-
407-
cmd := exec.Command(self, "pipeline")
408-
cmd.Stdout = os.Stdout
409-
cmd.Stderr = os.Stderr
410-
cmd.ExtraFiles = []*os.File{lockFile} // fd 3 in child
411-
cmd.Env = append(os.Environ(), "PIPELINE_LOCK_FD=3")
412-
if err := cmd.Run(); err != nil {
445+
if err := cmd.Wait(); err != nil {
413446
a.Logger.Error("triggered pipeline failed", "error", err)
447+
now := time.Now().UTC().Format(time.RFC3339)
448+
_, _ = a.DB.ExecContext(context.Background(), `
449+
UPDATE builds SET status = 'failed', finished_at = ?,
450+
error_message = ? WHERE id = ? AND status = 'running'`,
451+
now, err.Error(), buildID)
414452
} else {
415453
a.Logger.Info("triggered pipeline completed")
416454
}
@@ -420,6 +458,38 @@ func handleTriggerBuild(a *app.App) http.HandlerFunc {
420458
}
421459
}
422460

461+
// markStaleBuildsCancelled finds builds with status "running" whose PID is no
462+
// longer alive and marks them as "cancelled".
463+
func markStaleBuildsCancelled(ctx context.Context, db *sql.DB, logger *slog.Logger) {
464+
rows, err := db.QueryContext(ctx,
465+
`SELECT id, pid FROM builds WHERE status = 'running' AND pid IS NOT NULL`)
466+
if err != nil {
467+
return
468+
}
469+
470+
var staleIDs []string
471+
for rows.Next() {
472+
var id string
473+
var pid int
474+
if err := rows.Scan(&id, &pid); err != nil {
475+
continue
476+
}
477+
if err := syscall.Kill(pid, 0); err != nil {
478+
if errors.Is(err, syscall.ESRCH) {
479+
staleIDs = append(staleIDs, id)
480+
} else {
481+
logger.Warn("stale build check: unexpected kill(0) error", "build_id", id, "pid", pid, "error", err)
482+
}
483+
}
484+
}
485+
_ = rows.Close()
486+
487+
now := time.Now().UTC().Format(time.RFC3339)
488+
for _, id := range staleIDs {
489+
_, _ = db.ExecContext(ctx, `UPDATE builds SET status = 'cancelled', finished_at = ? WHERE id = ?`, now, id)
490+
}
491+
}
492+
423493
var logFiles = map[string]string{
424494
"wpcomposer": filepath.Join("storage", "logs", "wpcomposer.log"),
425495
"pipeline": filepath.Join("storage", "logs", "pipeline.log"),

0 commit comments

Comments
 (0)