Skip to content

Commit 83014e9

Browse files
committed
Added support for pgxv5 locking table
In order to support running migrations through PgBouncer which does not support advisory locks.
1 parent 39ab04d commit 83014e9

File tree

3 files changed

+182
-21
lines changed

3 files changed

+182
-21
lines changed

database/pgx/v5/pgx.go

Lines changed: 155 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121
"github.com/jackc/pgerrcode"
2222
"github.com/jackc/pgx/v5/pgconn"
2323
_ "github.com/jackc/pgx/v5/stdlib"
24+
"github.com/lib/pq"
25+
)
26+
27+
const (
28+
LockStrategyAdvisory = "advisory"
29+
LockStrategyTable = "table"
2430
)
2531

2632
func init() {
@@ -33,18 +39,23 @@ var (
3339

3440
DefaultMigrationsTable = "schema_migrations"
3541
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
42+
DefaultLockTable = "schema_lock"
43+
DefaultLockStrategy = LockStrategyAdvisory
3644
)
3745

3846
var (
3947
ErrNilConfig = fmt.Errorf("no config")
4048
ErrNoDatabaseName = fmt.Errorf("no database name")
4149
ErrNoSchema = fmt.Errorf("no schema")
50+
ErrDatabaseDirty = fmt.Errorf("database is dirty")
4251
)
4352

4453
type Config struct {
4554
MigrationsTable string
4655
DatabaseName string
4756
SchemaName string
57+
LockTable string
58+
LockStrategy string
4859
migrationsSchemaName string
4960
migrationsTableName string
5061
StatementTimeout time.Duration
@@ -104,6 +115,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
104115
config.MigrationsTable = DefaultMigrationsTable
105116
}
106117

118+
if len(config.LockTable) == 0 {
119+
config.LockTable = DefaultLockTable
120+
}
121+
122+
if len(config.LockStrategy) == 0 {
123+
config.LockStrategy = DefaultLockStrategy
124+
}
125+
107126
config.migrationsSchemaName = config.SchemaName
108127
config.migrationsTableName = config.MigrationsTable
109128
if config.MigrationsTableQuoted {
@@ -129,6 +148,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
129148
config: config,
130149
}
131150

151+
if err := px.ensureLockTable(); err != nil {
152+
return nil, err
153+
}
154+
132155
if err := px.ensureVersionTable(); err != nil {
133156
return nil, err
134157
}
@@ -192,13 +215,18 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
192215
}
193216
}
194217

218+
lockStrategy := purl.Query().Get("x-lock-strategy")
219+
lockTable := purl.Query().Get("x-lock-table")
220+
195221
px, err := WithInstance(db, &Config{
196222
DatabaseName: purl.Path,
197223
MigrationsTable: migrationsTable,
198224
MigrationsTableQuoted: migrationsTableQuoted,
199225
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
200226
MultiStatementEnabled: multiStatementEnabled,
201227
MultiStatementMaxSize: multiStatementMaxSize,
228+
LockStrategy: lockStrategy,
229+
LockTable: lockTable,
202230
})
203231

204232
if err != nil {
@@ -217,36 +245,116 @@ func (p *Postgres) Close() error {
217245
return nil
218246
}
219247

220-
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
221248
func (p *Postgres) Lock() error {
222249
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
223-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
224-
if err != nil {
225-
return err
226-
}
227-
228-
// This will wait indefinitely until the lock can be acquired.
229-
query := `SELECT pg_advisory_lock($1)`
230-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
231-
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
250+
switch p.config.LockStrategy {
251+
case LockStrategyAdvisory:
252+
return p.applyAdvisoryLock()
253+
case LockStrategyTable:
254+
return p.applyTableLock()
255+
default:
256+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
232257
}
233-
return nil
234258
})
235259
}
236260

237261
func (p *Postgres) Unlock() error {
238262
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
239-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
240-
if err != nil {
241-
return err
263+
switch p.config.LockStrategy {
264+
case LockStrategyAdvisory:
265+
return p.releaseAdvisoryLock()
266+
case LockStrategyTable:
267+
return p.releaseTableLock()
268+
default:
269+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
242270
}
271+
})
272+
}
243273

244-
query := `SELECT pg_advisory_unlock($1)`
245-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
246-
return &database.Error{OrigErr: err, Query: []byte(query)}
274+
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
275+
func (p *Postgres) applyAdvisoryLock() error {
276+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
277+
if err != nil {
278+
return err
279+
}
280+
281+
// This will wait indefinitely until the lock can be acquired.
282+
query := `SELECT pg_advisory_lock($1)`
283+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
284+
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
285+
}
286+
return nil
287+
}
288+
289+
func (p *Postgres) applyTableLock() error {
290+
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
291+
if err != nil {
292+
return &database.Error{OrigErr: err, Err: "transaction start failed"}
293+
}
294+
defer func() {
295+
errRollback := tx.Rollback()
296+
if errRollback != nil {
297+
err = multierror.Append(err, errRollback)
247298
}
248-
return nil
249-
})
299+
}()
300+
301+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
302+
if err != nil {
303+
return err
304+
}
305+
306+
query := "SELECT * FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
307+
rows, err := tx.Query(query, aid)
308+
if err != nil {
309+
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
310+
}
311+
312+
defer func() {
313+
if errClose := rows.Close(); errClose != nil {
314+
err = multierror.Append(err, errClose)
315+
}
316+
}()
317+
318+
// If row exists at all, lock is present
319+
locked := rows.Next()
320+
if locked {
321+
return database.ErrLocked
322+
}
323+
324+
query = "INSERT INTO " + pq.QuoteIdentifier(p.config.LockTable) + " (lock_id) VALUES ($1)"
325+
if _, err := tx.Exec(query, aid); err != nil {
326+
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
327+
}
328+
329+
return tx.Commit()
330+
}
331+
332+
func (p *Postgres) releaseAdvisoryLock() error {
333+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
334+
if err != nil {
335+
return err
336+
}
337+
338+
query := `SELECT pg_advisory_unlock($1)`
339+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
340+
return &database.Error{OrigErr: err, Query: []byte(query)}
341+
}
342+
343+
return nil
344+
}
345+
346+
func (p *Postgres) releaseTableLock() error {
347+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
348+
if err != nil {
349+
return err
350+
}
351+
352+
query := "DELETE FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
353+
if _, err := p.conn.ExecContext(context.TODO(), query, aid); err != nil {
354+
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
355+
}
356+
357+
return nil
250358
}
251359

252360
func (p *Postgres) Run(migration io.Reader) error {
@@ -410,6 +518,12 @@ func (p *Postgres) Drop() (err error) {
410518
if err := tables.Scan(&tableName); err != nil {
411519
return err
412520
}
521+
522+
// do not drop lock table
523+
if tableName == p.config.LockTable && p.config.LockStrategy == LockStrategyTable {
524+
continue
525+
}
526+
413527
if len(tableName) > 0 {
414528
tableNames = append(tableNames, tableName)
415529
}
@@ -474,6 +588,28 @@ func (p *Postgres) ensureVersionTable() (err error) {
474588
return nil
475589
}
476590

591+
func (p *Postgres) ensureLockTable() error {
592+
if p.config.LockStrategy != LockStrategyTable {
593+
return nil
594+
}
595+
596+
var count int
597+
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
598+
if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil {
599+
return &database.Error{OrigErr: err, Query: []byte(query)}
600+
}
601+
if count == 1 {
602+
return nil
603+
}
604+
605+
query = `CREATE TABLE ` + pq.QuoteIdentifier(p.config.LockTable) + ` (lock_id BIGINT NOT NULL PRIMARY KEY)`
606+
if _, err := p.db.Exec(query); err != nil {
607+
return &database.Error{OrigErr: err, Query: []byte(query)}
608+
}
609+
610+
return nil
611+
}
612+
477613
// Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611
478614
func quoteIdentifier(name string) string {
479615
end := strings.IndexRune(name, 0)

database/pgx/v5/pgx_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,33 @@ func TestMigrate(t *testing.T) {
132132
})
133133
}
134134

135+
func TestMigrateLockTable(t *testing.T) {
136+
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
137+
ip, port, err := c.FirstPort()
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
142+
addr := pgConnectionString(ip, port, "x-lock-strategy=table", "x-lock-table=lock_table")
143+
fmt.Println(addr)
144+
p := &Postgres{}
145+
d, err := p.Open(addr)
146+
if err != nil {
147+
t.Fatal(err)
148+
}
149+
defer func() {
150+
if err := d.Close(); err != nil {
151+
t.Error(err)
152+
}
153+
}()
154+
m, err := migrate.NewWithDatabaseInstance("file://../examples/migrations", "pgx", d)
155+
if err != nil {
156+
t.Fatal(err)
157+
}
158+
dt.TestMigrate(t, m)
159+
})
160+
}
161+
135162
func TestMultipleStatements(t *testing.T) {
136163
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
137164
ip, port, err := c.FirstPort()

database/testing/migrate_testing.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ package testing
55

66
import (
77
"testing"
8-
)
98

10-
import (
119
"github.com/golang-migrate/migrate/v4"
1210
)
1311

0 commit comments

Comments
 (0)