-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Postgres: use pg_try_advisory_lock instead of pg_advisory_lock
#962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
362c98e
cd50933
2f8e47b
5dadc1d
021b8b8
4ff8627
a403d5d
8f10bfc
c989d69
6f06f0a
e858f5e
1644f0c
49bee23
b8e39d0
be32ada
993b736
8580c15
a43c2f1
21469a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,7 @@ import ( | |||||||||||||
|
|
||||||||||||||
| "go.uber.org/atomic" | ||||||||||||||
|
|
||||||||||||||
| "github.com/cenkalti/backoff/v4" | ||||||||||||||
| "github.com/golang-migrate/migrate/v4" | ||||||||||||||
| "github.com/golang-migrate/migrate/v4/database" | ||||||||||||||
| "github.com/golang-migrate/migrate/v4/database/multistmt" | ||||||||||||||
|
|
@@ -34,6 +35,9 @@ var ( | |||||||||||||
|
|
||||||||||||||
| DefaultMigrationsTable = "schema_migrations" | ||||||||||||||
| DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB | ||||||||||||||
|
|
||||||||||||||
| DefaultLockInitialRetryInterval = 100 * time.Millisecond | ||||||||||||||
| DefaultLockMaxRetryInterval = 1000 * time.Millisecond | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| var ( | ||||||||||||||
|
|
@@ -53,6 +57,17 @@ type Config struct { | |||||||||||||
| migrationsTableName string | ||||||||||||||
| StatementTimeout time.Duration | ||||||||||||||
| MultiStatementMaxSize int | ||||||||||||||
| Locking LockConfig | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| type LockConfig struct { | ||||||||||||||
| // InitialRetryInterval the initial (minimum) retry interval used for exponential backoff | ||||||||||||||
| // to try acquire a lock | ||||||||||||||
| InitialRetryInterval time.Duration | ||||||||||||||
|
|
||||||||||||||
| // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this limit, | ||||||||||||||
| // the retry interval remains the same | ||||||||||||||
| MaxRetryInterval time.Duration | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| type Postgres struct { | ||||||||||||||
|
|
@@ -167,7 +182,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { | |||||||||||||
| if s := purl.Query().Get("x-migrations-table-quoted"); len(s) > 0 { | ||||||||||||||
| migrationsTableQuoted, err = strconv.ParseBool(s) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return nil, fmt.Errorf("Unable to parse option x-migrations-table-quoted: %w", err) | ||||||||||||||
| return nil, fmt.Errorf("unable to parse option x-migrations-table-quoted: %w", err) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if (len(migrationsTable) > 0) && (migrationsTableQuoted) && ((migrationsTable[0] != '"') || (migrationsTable[len(migrationsTable)-1] != '"')) { | ||||||||||||||
|
|
@@ -198,7 +213,22 @@ func (p *Postgres) Open(url string) (database.Driver, error) { | |||||||||||||
| if s := purl.Query().Get("x-multi-statement"); len(s) > 0 { | ||||||||||||||
| multiStatementEnabled, err = strconv.ParseBool(s) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err) | ||||||||||||||
| return nil, fmt.Errorf("unable to parse option x-multi-statement: %w", err) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| lockConfig := LockConfig{ | ||||||||||||||
| InitialRetryInterval: DefaultLockInitialRetryInterval, | ||||||||||||||
| MaxRetryInterval: DefaultLockMaxRetryInterval, | ||||||||||||||
| } | ||||||||||||||
| if s := purl.Query().Get("x-lock-retry-max-interval"); len(s) > 0 { | ||||||||||||||
| maxRetryIntervalMillis, err := strconv.Atoi(s) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return nil, fmt.Errorf("unable to parse option x-lock-retry-max-interval: %w", err) | ||||||||||||||
| } | ||||||||||||||
| maxRetryInterval := time.Duration(maxRetryIntervalMillis) | ||||||||||||||
| if maxRetryInterval > DefaultLockInitialRetryInterval { | ||||||||||||||
| lockConfig.MaxRetryInterval = maxRetryInterval | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+230
to
232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This if statement means a user can't set the max retry interval to less than the default. I can't see a good reason to limit it in this way so why not do this?
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the suggestion - the retry strategy here is to use exponential backoff, meaning if we end up retrying, the later retries should wait for longer than the previous ones. If we'd allow the user to set
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. however your comment raises a valid point that the user might not be aware of this initial retry interval and could try setting it to less than that without knowing it would be ignored - I've added a note about this to the readme be32ada |
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -209,6 +239,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { | |||||||||||||
| StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, | ||||||||||||||
| MultiStatementEnabled: multiStatementEnabled, | ||||||||||||||
| MultiStatementMaxSize: multiStatementMaxSize, | ||||||||||||||
| Locking: lockConfig, | ||||||||||||||
| }) | ||||||||||||||
|
|
||||||||||||||
| if err != nil { | ||||||||||||||
|
|
@@ -231,24 +262,45 @@ func (p *Postgres) Close() error { | |||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Lock tries to acquire an advisory lock and retries indefinitely with an exponential backoff strategy | ||||||||||||||
| // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS | ||||||||||||||
| func (p *Postgres) Lock() error { | ||||||||||||||
| return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { | ||||||||||||||
| aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return err | ||||||||||||||
| } | ||||||||||||||
| backOff := p.config.Locking.nonStopBackoff() | ||||||||||||||
| err := backoff.Retry(func() error { | ||||||||||||||
| ok, err := p.tryLock() | ||||||||||||||
| if err != nil { | ||||||||||||||
| return fmt.Errorf("p.tryLock: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // This will wait indefinitely until the lock can be acquired. | ||||||||||||||
| query := `SELECT pg_advisory_lock($1)` | ||||||||||||||
| if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { | ||||||||||||||
| return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} | ||||||||||||||
| } | ||||||||||||||
| if ok { | ||||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return nil | ||||||||||||||
| return fmt.Errorf("could not acquire lock") | ||||||||||||||
AkuSilvenius marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||
| }, backOff) | ||||||||||||||
|
|
||||||||||||||
| return err | ||||||||||||||
| }) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (p *Postgres) tryLock() (bool, error) { | ||||||||||||||
| aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return false, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS | ||||||||||||||
| // should always return true or false | ||||||||||||||
| query := `SELECT pg_try_advisory_lock($1)` | ||||||||||||||
| var ok bool | ||||||||||||||
| if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&ok); err != nil { | ||||||||||||||
| return false, &database.Error{OrigErr: err, Err: "pg_try_advisory_lock failed", Query: []byte(query)} | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return ok, nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (p *Postgres) Unlock() error { | ||||||||||||||
| return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { | ||||||||||||||
| aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) | ||||||||||||||
|
|
@@ -491,3 +543,13 @@ func (p *Postgres) ensureVersionTable() (err error) { | |||||||||||||
|
|
||||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (l *LockConfig) nonStopBackoff() backoff.BackOff { | ||||||||||||||
| b := backoff.NewExponentialBackOff() | ||||||||||||||
| b.InitialInterval = l.InitialRetryInterval | ||||||||||||||
| b.MaxInterval = l.MaxRetryInterval | ||||||||||||||
| b.MaxElapsedTime = 0 // this backoff won't stop | ||||||||||||||
| b.Reset() | ||||||||||||||
|
|
||||||||||||||
| return b | ||||||||||||||
| } | ||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,6 +96,7 @@ func Test(t *testing.T) { | |
| t.Run("testFailToCreateTableWithoutPermissions", testFailToCreateTableWithoutPermissions) | ||
| t.Run("testCheckBeforeCreateTable", testCheckBeforeCreateTable) | ||
| t.Run("testParallelSchema", testParallelSchema) | ||
| t.Run("testPostgresConcurrentMigrations", testPostgresConcurrentMigrations) | ||
| t.Run("testPostgresLock", testPostgresLock) | ||
| t.Run("testWithInstanceConcurrent", testWithInstanceConcurrent) | ||
| t.Run("testWithConnection", testWithConnection) | ||
|
|
@@ -628,6 +629,50 @@ func testParallelSchema(t *testing.T) { | |
| }) | ||
| } | ||
|
|
||
| func testPostgresConcurrentMigrations(t *testing.T) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ran these tests against master and can confirm they fail 🎉. This change makes the tests pass |
||
| dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { | ||
| // GIVEN - a set of concurrent processes running migrations | ||
| const concurrency = 3 | ||
| var wg sync.WaitGroup | ||
|
|
||
| ip, port, err := c.FirstPort() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| addr := pgConnectionString(ip, port, "x-lock-retry-max-interval=2000") | ||
|
|
||
| // WHEN | ||
| for i := 0; i < concurrency; i++ { | ||
| wg.Add(1) | ||
|
|
||
| go func() { | ||
| defer wg.Done() | ||
|
|
||
| p := &Postgres{} | ||
| d, err := p.Open(addr) | ||
| if err != nil { | ||
| t.Error(err) | ||
| } | ||
| defer func() { | ||
| if err := d.Close(); err != nil { | ||
| t.Error(err) | ||
| } | ||
| }() | ||
|
|
||
| m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "postgres", d) | ||
| if err != nil { | ||
| t.Error(err) | ||
| } | ||
| dt.TestMigrate(t, m) | ||
| }() | ||
| } | ||
|
|
||
| wg.Wait() | ||
|
|
||
| // THEN | ||
| }) | ||
| } | ||
|
|
||
| func testPostgresLock(t *testing.T) { | ||
| dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { | ||
| ip, port, err := c.FirstPort() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.