Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ func (c *Cassandra) Close() error {
}

func (c *Cassandra) Lock() error {
if !c.isLocked.CAS(false, true) {
if !c.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (c *Cassandra) Unlock() error {
if !c.isLocked.CAS(true, false) {
if !c.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
34 changes: 18 additions & 16 deletions database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a v2 db driver similar to how we use pgxv4 and pgxv5? That way users can decide when to migrate themselves.

Copy link
Author

@kokizzu kokizzu Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it's possible, clickhouse v2 autoregister as "clickhouse" same as v1 (that already deprecated)
while pgx it using different name "pgx5"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally green btw @dhui

"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
Expand Down Expand Up @@ -73,11 +74,12 @@ func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
return nil, err
}
q := migrate.FilterCustomQuery(purl)
q.Scheme = "tcp"
conn, err := sql.Open("clickhouse", q.String())
q.Scheme = "clickhouse"
opt, err := clickhouse.ParseDSN(purl.String())
if err != nil {
return nil, err
}
conn := clickhouse.OpenDB(opt)

multiStatementMaxSize := DefaultMultiStatementMaxSize
if s := purl.Query().Get("x-multi-statement-max-size"); len(s) > 0 {
Expand Down Expand Up @@ -167,7 +169,7 @@ func (ch *ClickHouse) Version() (int, bool, error) {
var (
version int
dirty uint8
query = "SELECT version, dirty FROM `" + ch.config.MigrationsTable + "` ORDER BY sequence DESC LIMIT 1"
query = "SELECT version, dirty FROM " + quoteIdentifier(ch.config.DatabaseName) + "." + quoteIdentifier(ch.config.MigrationsTable) + " ORDER BY sequence DESC LIMIT 1"
)
if err := ch.conn.QueryRow(query).Scan(&version, &dirty); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -192,7 +194,7 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
return err
}

query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
query := "INSERT INTO " + quoteIdentifier(ch.config.DatabaseName) + "." + quoteIdentifier(ch.config.MigrationsTable) + " (version, dirty, sequence) VALUES (?, ?, ?)"
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
Expand Down Expand Up @@ -234,18 +236,18 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
// if not, create the empty migration table
if len(ch.config.ClusterName) > 0 {
query = fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine)
CREATE TABLE %s.%s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, quoteIdentifier(ch.config.DatabaseName), quoteIdentifier(ch.config.MigrationsTable), ch.config.ClusterName, ch.config.MigrationsTableEngine)
} else {
query = fmt.Sprintf(`
CREATE TABLE %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine)
CREATE TABLE %s.%s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, quoteIdentifier(ch.config.DatabaseName), quoteIdentifier(ch.config.MigrationsTable), ch.config.MigrationsTableEngine)
}

if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") {
Expand Down Expand Up @@ -291,14 +293,14 @@ func (ch *ClickHouse) Drop() (err error) {
}

func (ch *ClickHouse) Lock() error {
if !ch.isLocked.CAS(false, true) {
if !ch.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}

return nil
}
func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
if !ch.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}

Expand Down
6 changes: 3 additions & 3 deletions database/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"log"
"testing"

_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/dhui/dktest"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/clickhouse"
Expand All @@ -33,12 +33,12 @@ var (
func clickhouseConnectionString(host, port, engine string) string {
if engine != "" {
return fmt.Sprintf(
"clickhouse://%v:%v?username=user&password=password&database=db&x-multi-statement=true&x-migrations-table-engine=%v&debug=false",
"clickhouse://%v:%v/db?username=user&password=password&x-multi-statement=true&x-migrations-table-engine=%v&debug=false",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one needed at least until this one merged:
ClickHouse/clickhouse-go#1541

ref: ClickHouse/clickhouse-go#757

other related issue: #380

host, port, engine)
}

return fmt.Sprintf(
"clickhouse://%v:%v?username=user&password=password&database=db&x-multi-statement=true&debug=false",
"clickhouse://%v:%v/db?username=user&password=password&x-multi-statement=true&debug=false",
host, port)
}

Expand Down
4 changes: 2 additions & 2 deletions database/firebird/firebird.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func (f *Firebird) Close() error {
}

func (f *Firebird) Lock() error {
if !f.isLocked.CAS(false, true) {
if !f.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (f *Firebird) Unlock() error {
if !f.isLocked.CAS(true, false) {
if !f.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/ql/ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func (m *Ql) Drop() (err error) {
return nil
}
func (m *Ql) Lock() error {
if !m.isLocked.CAS(false, true) {
if !m.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}
func (m *Ql) Unlock() error {
if !m.isLocked.CAS(true, false) {
if !m.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func (p *Redshift) Close() error {

// Redshift does not support advisory lock functions: https://docs.aws.amazon.com/redshift/latest/dg/c_unsupported-postgresql-functions.html
func (p *Redshift) Lock() error {
if !p.isLocked.CAS(false, true) {
if !p.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (p *Redshift) Unlock() error {
if !p.isLocked.CAS(true, false) {
if !p.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/rqlite/rqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (r *Rqlite) Close() error {
// If the implementation can't provide this functionality, return nil.
// Return database.ErrLocked if database is already locked.
func (r *Rqlite) Lock() error {
if !r.isLocked.CAS(false, true) {
if !r.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
Expand All @@ -152,7 +152,7 @@ func (r *Rqlite) Lock() error {
// Unlock should release the lock. Migrate will call this function after
// all migrations have been run.
func (r *Rqlite) Unlock() error {
if !r.isLocked.CAS(true, false) {
if !r.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ func (p *Snowflake) Close() error {
}

func (p *Snowflake) Lock() error {
if !p.isLocked.CAS(false, true) {
if !p.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (p *Snowflake) Unlock() error {
if !p.isLocked.CAS(true, false) {
if !p.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ func (s *Spanner) Close() error {
// Lock implements database.Driver but doesn't do anything because Spanner only
// enqueues the UpdateDatabaseDdlRequest.
func (s *Spanner) Lock() error {
if swapped := s.lock.CAS(unlockedVal, lockedVal); swapped {
if swapped := s.lock.CompareAndSwap(unlockedVal, lockedVal); swapped {
return nil
}
return ErrLockHeld
}

// Unlock implements database.Driver but no action required, see Lock.
func (s *Spanner) Unlock() error {
if swapped := s.lock.CAS(lockedVal, unlockedVal); swapped {
if swapped := s.lock.CompareAndSwap(lockedVal, unlockedVal); swapped {
return nil
}
return ErrLockNotHeld
Expand Down
4 changes: 2 additions & 2 deletions database/sqlcipher/sqlcipher.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ func (m *Sqlite) Drop() (err error) {
}

func (m *Sqlite) Lock() error {
if !m.isLocked.CAS(false, true) {
if !m.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (m *Sqlite) Unlock() error {
if !m.isLocked.CAS(true, false) {
if !m.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ func (m *Sqlite) Drop() (err error) {
}

func (m *Sqlite) Lock() error {
if !m.isLocked.CAS(false, true) {
if !m.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (m *Sqlite) Unlock() error {
if !m.isLocked.CAS(true, false) {
if !m.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/sqlite3/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ func (m *Sqlite) Drop() (err error) {
}

func (m *Sqlite) Lock() error {
if !m.isLocked.CAS(false, true) {
if !m.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (m *Sqlite) Unlock() error {
if !m.isLocked.CAS(true, false) {
if !m.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions database/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (s *Stub) Close() error {
}

func (s *Stub) Lock() error {
if !s.isLocked.CAS(false, true) {
if !s.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (s *Stub) Unlock() error {
if !s.isLocked.CAS(true, false) {
if !s.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion database/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func GenerateAdvisoryLockId(databaseName string, additionalNames ...string) (str

// CasRestoreOnErr CAS wrapper to automatically restore the lock state on error
func CasRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func() error) error {
if !lock.CAS(o, n) {
if !lock.CompareAndSwap(o, n) {
return casErr
}
if err := f(); err != nil {
Expand Down
Loading
Loading