Skip to content

Commit 1352d99

Browse files
committed
Added support for pgxv5 locking table
In order to support running migrations through PgBouncer which does not support advisory locks.
1 parent 2788339 commit 1352d99

File tree

3 files changed

+182
-21
lines changed

3 files changed

+182
-21
lines changed

database/pgx/v5/pgx.go

Lines changed: 155 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ import (
2222
"github.com/jackc/pgerrcode"
2323
"github.com/jackc/pgx/v5/pgconn"
2424
_ "github.com/jackc/pgx/v5/stdlib"
25+
"github.com/lib/pq"
26+
)
27+
28+
const (
29+
LockStrategyAdvisory = "advisory"
30+
LockStrategyTable = "table"
2531
)
2632

2733
func init() {
@@ -34,18 +40,23 @@ var (
3440

3541
DefaultMigrationsTable = "schema_migrations"
3642
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
43+
DefaultLockTable = "schema_lock"
44+
DefaultLockStrategy = LockStrategyAdvisory
3745
)
3846

3947
var (
4048
ErrNilConfig = fmt.Errorf("no config")
4149
ErrNoDatabaseName = fmt.Errorf("no database name")
4250
ErrNoSchema = fmt.Errorf("no schema")
51+
ErrDatabaseDirty = fmt.Errorf("database is dirty")
4352
)
4453

4554
type Config struct {
4655
MigrationsTable string
4756
DatabaseName string
4857
SchemaName string
58+
LockTable string
59+
LockStrategy string
4960
migrationsSchemaName string
5061
migrationsTableName string
5162
StatementTimeout time.Duration
@@ -105,6 +116,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
105116
config.MigrationsTable = DefaultMigrationsTable
106117
}
107118

119+
if len(config.LockTable) == 0 {
120+
config.LockTable = DefaultLockTable
121+
}
122+
123+
if len(config.LockStrategy) == 0 {
124+
config.LockStrategy = DefaultLockStrategy
125+
}
126+
108127
config.migrationsSchemaName = config.SchemaName
109128
config.migrationsTableName = config.MigrationsTable
110129
if config.MigrationsTableQuoted {
@@ -130,6 +149,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
130149
config: config,
131150
}
132151

152+
if err := px.ensureLockTable(); err != nil {
153+
return nil, err
154+
}
155+
133156
if err := px.ensureVersionTable(); err != nil {
134157
return nil, err
135158
}
@@ -193,13 +216,18 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
193216
}
194217
}
195218

219+
lockStrategy := purl.Query().Get("x-lock-strategy")
220+
lockTable := purl.Query().Get("x-lock-table")
221+
196222
px, err := WithInstance(db, &Config{
197223
DatabaseName: purl.Path,
198224
MigrationsTable: migrationsTable,
199225
MigrationsTableQuoted: migrationsTableQuoted,
200226
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
201227
MultiStatementEnabled: multiStatementEnabled,
202228
MultiStatementMaxSize: multiStatementMaxSize,
229+
LockStrategy: lockStrategy,
230+
LockTable: lockTable,
203231
})
204232

205233
if err != nil {
@@ -218,36 +246,116 @@ func (p *Postgres) Close() error {
218246
return nil
219247
}
220248

221-
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
222249
func (p *Postgres) Lock() error {
223250
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
224-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
225-
if err != nil {
226-
return err
227-
}
228-
229-
// This will wait indefinitely until the lock can be acquired.
230-
query := `SELECT pg_advisory_lock($1)`
231-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
232-
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
251+
switch p.config.LockStrategy {
252+
case LockStrategyAdvisory:
253+
return p.applyAdvisoryLock()
254+
case LockStrategyTable:
255+
return p.applyTableLock()
256+
default:
257+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
233258
}
234-
return nil
235259
})
236260
}
237261

238262
func (p *Postgres) Unlock() error {
239263
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
240-
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
241-
if err != nil {
242-
return err
264+
switch p.config.LockStrategy {
265+
case LockStrategyAdvisory:
266+
return p.releaseAdvisoryLock()
267+
case LockStrategyTable:
268+
return p.releaseTableLock()
269+
default:
270+
return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy)
243271
}
272+
})
273+
}
244274

245-
query := `SELECT pg_advisory_unlock($1)`
246-
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
247-
return &database.Error{OrigErr: err, Query: []byte(query)}
275+
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
276+
func (p *Postgres) applyAdvisoryLock() error {
277+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
278+
if err != nil {
279+
return err
280+
}
281+
282+
// This will wait indefinitely until the lock can be acquired.
283+
query := `SELECT pg_advisory_lock($1)`
284+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
285+
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
286+
}
287+
return nil
288+
}
289+
290+
func (p *Postgres) applyTableLock() error {
291+
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
292+
if err != nil {
293+
return &database.Error{OrigErr: err, Err: "transaction start failed"}
294+
}
295+
defer func() {
296+
errRollback := tx.Rollback()
297+
if errRollback != nil {
298+
err = multierror.Append(err, errRollback)
248299
}
249-
return nil
250-
})
300+
}()
301+
302+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
303+
if err != nil {
304+
return err
305+
}
306+
307+
query := "SELECT * FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
308+
rows, err := tx.Query(query, aid)
309+
if err != nil {
310+
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
311+
}
312+
313+
defer func() {
314+
if errClose := rows.Close(); errClose != nil {
315+
err = multierror.Append(err, errClose)
316+
}
317+
}()
318+
319+
// If row exists at all, lock is present
320+
locked := rows.Next()
321+
if locked {
322+
return database.ErrLocked
323+
}
324+
325+
query = "INSERT INTO " + pq.QuoteIdentifier(p.config.LockTable) + " (lock_id) VALUES ($1)"
326+
if _, err := tx.Exec(query, aid); err != nil {
327+
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
328+
}
329+
330+
return tx.Commit()
331+
}
332+
333+
func (p *Postgres) releaseAdvisoryLock() error {
334+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
335+
if err != nil {
336+
return err
337+
}
338+
339+
query := `SELECT pg_advisory_unlock($1)`
340+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
341+
return &database.Error{OrigErr: err, Query: []byte(query)}
342+
}
343+
344+
return nil
345+
}
346+
347+
func (p *Postgres) releaseTableLock() error {
348+
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName)
349+
if err != nil {
350+
return err
351+
}
352+
353+
query := "DELETE FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1"
354+
if _, err := p.conn.ExecContext(context.TODO(), query, aid); err != nil {
355+
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
356+
}
357+
358+
return nil
251359
}
252360

253361
func (p *Postgres) Run(migration io.Reader) error {
@@ -411,6 +519,12 @@ func (p *Postgres) Drop() (err error) {
411519
if err := tables.Scan(&tableName); err != nil {
412520
return err
413521
}
522+
523+
// do not drop lock table
524+
if tableName == p.config.LockTable && p.config.LockStrategy == LockStrategyTable {
525+
continue
526+
}
527+
414528
if len(tableName) > 0 {
415529
tableNames = append(tableNames, tableName)
416530
}
@@ -475,6 +589,28 @@ func (p *Postgres) ensureVersionTable() (err error) {
475589
return nil
476590
}
477591

592+
func (p *Postgres) ensureLockTable() error {
593+
if p.config.LockStrategy != LockStrategyTable {
594+
return nil
595+
}
596+
597+
var count int
598+
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
599+
if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil {
600+
return &database.Error{OrigErr: err, Query: []byte(query)}
601+
}
602+
if count == 1 {
603+
return nil
604+
}
605+
606+
query = `CREATE TABLE ` + pq.QuoteIdentifier(p.config.LockTable) + ` (lock_id BIGINT NOT NULL PRIMARY KEY)`
607+
if _, err := p.db.Exec(query); err != nil {
608+
return &database.Error{OrigErr: err, Query: []byte(query)}
609+
}
610+
611+
return nil
612+
}
613+
478614
// Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611
479615
func quoteIdentifier(name string) string {
480616
end := strings.IndexRune(name, 0)

database/pgx/v5/pgx_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,33 @@ func TestMigrate(t *testing.T) {
132132
})
133133
}
134134

135+
func TestMigrateLockTable(t *testing.T) {
136+
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
137+
ip, port, err := c.FirstPort()
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
142+
addr := pgConnectionString(ip, port, "x-lock-strategy=table", "x-lock-table=lock_table")
143+
fmt.Println(addr)
144+
p := &Postgres{}
145+
d, err := p.Open(addr)
146+
if err != nil {
147+
t.Fatal(err)
148+
}
149+
defer func() {
150+
if err := d.Close(); err != nil {
151+
t.Error(err)
152+
}
153+
}()
154+
m, err := migrate.NewWithDatabaseInstance("file://../examples/migrations", "pgx", d)
155+
if err != nil {
156+
t.Fatal(err)
157+
}
158+
dt.TestMigrate(t, m)
159+
})
160+
}
161+
135162
func TestMultipleStatements(t *testing.T) {
136163
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
137164
ip, port, err := c.FirstPort()

database/testing/migrate_testing.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ package testing
55

66
import (
77
"testing"
8-
)
98

10-
import (
119
"github.com/golang-migrate/migrate/v4"
1210
)
1311

0 commit comments

Comments
 (0)