Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,21 +234,40 @@ func (p *Postgres) Close() error {
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
return err
}
for {
ok, err := p.tryLock()
if err != nil {
return fmt.Errorf("p.tryLock: %w", err)
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
if ok {
break
}

time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the backoff library with an exponential backoff + jitter as the default. Note, this may cause other nodes/hosts to take longer to deploy due to the longer wait period so the backoff should be configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhui thanks for the feedback, I've added a configurable exponential retry.

this may cause other nodes/hosts to take longer to deploy due to the longer wait period

I've added defaults and kept them quite small to address the above

}

return nil
})
}

func (p *Postgres) tryLock() (bool, error) {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
return false, err
}

// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
// should always return true or false
query := `SELECT pg_try_advisory_lock($1)`
var ok bool
if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&ok); err != nil {
return false, &database.Error{OrigErr: err, Err: "pg_try_advisory_lock failed", Query: []byte(query)}
}

return ok, nil
}

func (p *Postgres) Unlock() error {
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
Expand Down
44 changes: 44 additions & 0 deletions database/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,50 @@ func TestParallelSchema(t *testing.T) {
})
}

func TestPostgres_ConcurrentMigrations(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
// GIVEN - a set of concurrent processes running migrations
const concurrency = 3
var wg sync.WaitGroup

ip, port, err := c.FirstPort()
if err != nil {
t.Fatal(err)
}
addr := pgConnectionString(ip, port)

// WHEN
for i := 0; i < concurrency; i++ {
wg.Add(1)

go func() {
defer wg.Done()

p := &Postgres{}
d, err := p.Open(addr)
if err != nil {
t.Error(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()

m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "postgres", d)
if err != nil {
t.Error(err)
}
dt.TestMigrate(t, m)
}()
}

wg.Wait()

// THEN
})
}

func TestPostgres_Lock(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.FirstPort()
Expand Down