Skip to content

Commit 0551d43

Browse files
authored
chore: retry on migration lock err (#508)
* fix: retry on migrator lock failure * chore: comment
1 parent ab0e63c commit 0551d43

File tree

2 files changed

+175
-18
lines changed

2 files changed

+175
-18
lines changed

internal/app/app.go

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"os"
77
"os/signal"
8+
"strings"
89
"sync"
910
"syscall"
11+
"time"
1012

1113
"github.com/hookdeck/outpost/internal/config"
1214
"github.com/hookdeck/outpost/internal/infra"
@@ -191,33 +193,119 @@ func constructServices(
191193
return services, nil
192194
}
193195

196+
// runMigration handles database schema migrations with retry logic for lock conflicts.
197+
//
198+
// MIGRATION LOCK BEHAVIOR:
199+
// - Database locks are only acquired when migrations need to be performed
200+
// - When multiple nodes start simultaneously and migrations are pending:
201+
// 1. One node acquires the lock and performs migrations (ideally < 5 seconds)
202+
// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock")
203+
// 3. Failed nodes wait 5 seconds and retry
204+
// 4. On retry, migrations are complete and nodes proceed successfully
205+
//
206+
// RETRY STRATEGY:
207+
// - Max 3 attempts with 5-second delays between retries
208+
// - 5 seconds is sufficient because most migrations complete quickly
209+
// - If no migrations are needed (common case), all nodes proceed immediately without lock contention
194210
func runMigration(ctx context.Context, cfg *config.Config, logger *logging.Logger) error {
195-
migrator, err := migrator.New(cfg.ToMigratorOpts())
196-
if err != nil {
197-
return err
198-
}
211+
const (
212+
maxRetries = 3
213+
retryDelay = 5 * time.Second
214+
)
199215

200-
defer func() {
216+
var lastErr error
217+
218+
for attempt := 1; attempt <= maxRetries; attempt++ {
219+
migrator, err := migrator.New(cfg.ToMigratorOpts())
220+
if err != nil {
221+
return err
222+
}
223+
224+
version, versionJumped, err := migrator.Up(ctx, -1)
225+
226+
// Always close the migrator after each attempt
201227
sourceErr, dbErr := migrator.Close(ctx)
202228
if sourceErr != nil {
203-
logger.Error("failed to close migrator", zap.Error(sourceErr))
229+
logger.Error("failed to close migrator source", zap.Error(sourceErr))
204230
}
205231
if dbErr != nil {
206-
logger.Error("failed to close migrator", zap.Error(dbErr))
232+
logger.Error("failed to close migrator database connection", zap.Error(dbErr))
207233
}
208-
}()
209234

210-
version, versionJumped, err := migrator.Up(ctx, -1)
211-
if err != nil {
212-
return err
235+
if err == nil {
236+
// Migration succeeded
237+
if versionJumped > 0 {
238+
logger.Info("migrations applied",
239+
zap.Int("version", version),
240+
zap.Int("version_applied", versionJumped))
241+
} else {
242+
logger.Info("no migrations applied", zap.Int("version", version))
243+
}
244+
return nil
245+
}
246+
247+
// Check if this is a lock-related error
248+
// Lock errors can manifest as:
249+
// - "can't acquire lock" (database.ErrLocked)
250+
// - "try lock failed" (postgres advisory lock failure)
251+
// - "pg_advisory_lock" (postgres lock function errors)
252+
isLockError := isLockRelatedError(err)
253+
lastErr = err
254+
255+
if !isLockError {
256+
// Not a lock error, fail immediately
257+
logger.Error("migration failed", zap.Error(err))
258+
return err
259+
}
260+
261+
// Lock error - retry if we have attempts remaining
262+
if attempt < maxRetries {
263+
logger.Warn("migration lock conflict, retrying",
264+
zap.Int("attempt", attempt),
265+
zap.Int("max_retries", maxRetries),
266+
zap.Duration("retry_delay", retryDelay),
267+
zap.Error(err))
268+
269+
select {
270+
case <-ctx.Done():
271+
return ctx.Err()
272+
case <-time.After(retryDelay):
273+
// Continue to next attempt
274+
}
275+
} else {
276+
// Exhausted all retries
277+
logger.Error("migration failed after retries",
278+
zap.Int("attempts", maxRetries),
279+
zap.Error(err))
280+
}
213281
}
214-
if versionJumped > 0 {
215-
logger.Info("migrations applied",
216-
zap.Int("version", version),
217-
zap.Int("version_applied", versionJumped))
218-
} else {
219-
logger.Info("no migrations applied", zap.Int("version", version))
282+
283+
return lastErr
284+
}
285+
286+
// isLockRelatedError checks if an error is related to database migration lock acquisition.
287+
// This includes errors from golang-migrate's locking mechanism.
288+
func isLockRelatedError(err error) bool {
289+
if err == nil {
290+
return false
220291
}
221292

222-
return nil
293+
errMsg := err.Error()
294+
295+
// Check for lock-related error messages from golang-migrate:
296+
// 1. "can't acquire lock" - database.ErrLocked from golang-migrate/migrate/v4/database
297+
// 2. "try lock failed" - returned by postgres driver when pg_advisory_lock() fails
298+
// See: https://github.com/golang-migrate/migrate/blob/master/database/postgres/postgres.go
299+
lockIndicators := []string{
300+
"can't acquire lock",
301+
"try lock failed",
302+
}
303+
304+
for _, indicator := range lockIndicators {
305+
if strings.Contains(errMsg, indicator) {
306+
return true
307+
}
308+
}
309+
310+
return false
223311
}

internal/app/lock_error_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package app
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
// TestIsLockRelatedError verifies lock error detection for all known lock error patterns
11+
func TestIsLockRelatedError(t *testing.T) {
12+
tests := []struct {
13+
name string
14+
err error
15+
shouldMatch bool
16+
}{
17+
// Lock errors that should be retried
18+
{
19+
name: "database.ErrLocked",
20+
err: errors.New("can't acquire lock"),
21+
shouldMatch: true,
22+
},
23+
{
24+
name: "postgres advisory lock failure with pg_advisory_lock",
25+
err: errors.New("migrate.New: failed to open database: try lock failed in line 0: SELECT pg_advisory_lock($1) (details: pq: unnamed prepared statement does not exist)"),
26+
shouldMatch: true,
27+
},
28+
{
29+
name: "try lock failed",
30+
err: errors.New("try lock failed"),
31+
shouldMatch: true,
32+
},
33+
34+
// Non-lock errors that should NOT be retried
35+
{
36+
name: "connection refused",
37+
err: errors.New("connection refused"),
38+
shouldMatch: false,
39+
},
40+
{
41+
name: "SQL syntax error",
42+
err: errors.New("syntax error at or near"),
43+
shouldMatch: false,
44+
},
45+
{
46+
name: "authentication error",
47+
err: errors.New("password authentication failed"),
48+
shouldMatch: false,
49+
},
50+
{
51+
name: "timeout error",
52+
err: errors.New("context deadline exceeded"),
53+
shouldMatch: false,
54+
},
55+
{
56+
name: "nil error",
57+
err: nil,
58+
shouldMatch: false,
59+
},
60+
}
61+
62+
for _, tt := range tests {
63+
t.Run(tt.name, func(t *testing.T) {
64+
result := isLockRelatedError(tt.err)
65+
assert.Equal(t, tt.shouldMatch, result,
66+
"isLockRelatedError should return %v for: %v", tt.shouldMatch, tt.err)
67+
})
68+
}
69+
}

0 commit comments

Comments
 (0)