Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
vendor
plugin.so

web/app/node_modules
web/app/node_modulessimple
*.sqlite-shm
*.sqlite-wal
52 changes: 39 additions & 13 deletions backend/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ import (
var migrationsFS embed.FS

func NewMysqlBackend(host string, port int, user, password, database string, opts ...option) *mysqlBackend {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&interpolateParams=true", user, password, host, port, database)

db, err := sql.Open("mysql", dsn)
if err != nil {
panic(err)
}

return newMysqlBackend(db, dsn, opts...)
}

// NewMysqlBackendWithDB creates a new MySQL backend using an existing database connection.
// The provided database connection should already be configured and connected to the target database.
// Note: Migrations will be applied using the provided connection directly, so ensure the connection
// supports multiple statements if ApplyMigrations is enabled (default: true).
func NewMysqlBackendWithDB(db *sql.DB, opts ...option) *mysqlBackend {
return newMysqlBackend(db, "", opts...)
}

func newMysqlBackend(db *sql.DB, dsn string, opts ...option) *mysqlBackend {
options := &options{
Options: backend.ApplyOptions(),
ApplyMigrations: true,
Expand All @@ -41,13 +60,6 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt
opt(options)
}

dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&interpolateParams=true", user, password, host, port, database)

db, err := sql.Open("mysql", dsn)
if err != nil {
panic(err)
}

if options.MySQLOptions != nil {
options.MySQLOptions(db)
}
Expand Down Expand Up @@ -85,10 +97,22 @@ func (mb *mysqlBackend) Close() error {

// Migrate applies any pending database migrations.
func (mb *mysqlBackend) Migrate() error {
schemaDsn := mb.dsn + "&multiStatements=true"
db, err := sql.Open("mysql", schemaDsn)
if err != nil {
return fmt.Errorf("opening schema database: %w", err)
var db *sql.DB
var shouldCloseDb bool

if mb.dsn != "" {
// When DSN is available, create a new connection with multiStatements support
schemaDsn := mb.dsn + "&multiStatements=true"
var err error
db, err = sql.Open("mysql", schemaDsn)
if err != nil {
return fmt.Errorf("opening schema database: %w", err)
}
shouldCloseDb = true
} else {
// When using an existing DB connection, use it directly for migrations
db = mb.db
shouldCloseDb = false
}

dbi, err := mysql.WithInstance(db, &mysql.Config{})
Expand All @@ -112,8 +136,10 @@ func (mb *mysqlBackend) Migrate() error {
}
}

if err := db.Close(); err != nil {
return fmt.Errorf("closing schema database: %w", err)
if shouldCloseDb {
if err := db.Close(); err != nil {
return fmt.Errorf("closing schema database: %w", err)
}
}

return nil
Expand Down
151 changes: 151 additions & 0 deletions backend/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,58 @@ func TestMySqlBackendE2E(t *testing.T) {
})
}

func TestMySqlBackendWithDB_E2E(t *testing.T) {
if testing.Short() {
t.Skip()
}

var dbName string

test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
// Create a database for testing
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
panic(err)
}

dbName = "test_with_db_e2e_" + strings.Replace(uuid.NewString(), "-", "", -1)
if _, err := db.Exec("CREATE DATABASE " + dbName); err != nil {
panic(fmt.Errorf("creating database: %w", err))
}

if err := db.Close(); err != nil {
panic(err)
}

// Create a connection to the test database
testDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true&multiStatements=true", testUser, testPassword, dbName))
if err != nil {
panic(fmt.Errorf("connecting to test database: %w", err))
}

options = append(options, backend.WithStickyTimeout(0))

return NewMysqlBackendWithDB(testDB, WithBackendOptions(options...))
}, func(b test.TestBackend) {
if err := b.Close(); err != nil {
panic(err)
}

db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
panic(err)
}

if _, err := db.Exec("DROP DATABASE IF EXISTS " + dbName); err != nil {
panic(fmt.Errorf("dropping database: %w", err))
}

if err := db.Close(); err != nil {
panic(err)
}
})
}

var _ test.TestBackend = (*mysqlBackend)(nil)

func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
Expand Down Expand Up @@ -163,6 +215,105 @@ func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]*history.Event,
return f, nil
}

func TestNewMysqlBackendWithDB_Integration(t *testing.T) {
if testing.Short() {
t.Skip()
}

var dbName string

t.Run("NewMysqlBackendWithDB_Basic", func(t *testing.T) {
// Create a database for testing
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
defer db.Close()

dbName = "test_with_db_" + strings.Replace(uuid.NewString(), "-", "", -1)
if _, err := db.Exec("CREATE DATABASE " + dbName); err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() {
if _, err := db.Exec("DROP DATABASE IF EXISTS " + dbName); err != nil {
t.Errorf("Failed to drop database: %v", err)
}
}()

// Create a connection to the test database
testDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true&multiStatements=true", testUser, testPassword, dbName))
if err != nil {
t.Fatalf("Failed to connect to test database: %v", err)
}
defer testDB.Close()

// Test creating backend with existing DB
mysqlBackend := NewMysqlBackendWithDB(testDB, WithBackendOptions(backend.WithStickyTimeout(0)))
if mysqlBackend == nil {
t.Fatal("Expected backend to be created")
}

// Verify the backend has the correct DB instance
if mysqlBackend.db != testDB {
t.Error("Expected backend to use the provided DB instance")
}

// Verify DSN is empty when using existing DB
if mysqlBackend.dsn != "" {
t.Errorf("Expected DSN to be empty when using existing DB, got: %s", mysqlBackend.dsn)
}

// Test that the backend can perform basic operations
if !mysqlBackend.FeatureSupported(backend.Feature_Expiration) {
t.Error("Expected backend to support expiration feature")
}

// Close the backend
if err := mysqlBackend.Close(); err != nil {
t.Errorf("Failed to close backend: %v", err)
}
})

t.Run("NewMysqlBackendWithDB_WithoutMigrations", func(t *testing.T) {
// Create a database for testing
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
defer db.Close()

dbName = "test_no_migrations_" + strings.Replace(uuid.NewString(), "-", "", -1)
if _, err := db.Exec("CREATE DATABASE " + dbName); err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() {
if _, err := db.Exec("DROP DATABASE IF EXISTS " + dbName); err != nil {
t.Errorf("Failed to drop database: %v", err)
}
}()

// Create a connection to the test database
testDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true&multiStatements=true", testUser, testPassword, dbName))
if err != nil {
t.Fatalf("Failed to connect to test database: %v", err)
}
defer testDB.Close()

// Test creating backend with existing DB and migrations disabled
mysqlBackend := NewMysqlBackendWithDB(testDB,
WithApplyMigrations(false),
WithBackendOptions(backend.WithStickyTimeout(0)))
if mysqlBackend == nil {
t.Fatal("Expected backend to be created")
}

// Close the backend
if err := mysqlBackend.Close(); err != nil {
t.Errorf("Failed to close backend: %v", err)
}
})
}

func Test_MysqlBackend_WorkerName(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
22 changes: 21 additions & 1 deletion docs/source/includes/_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,29 @@ See `migrations/sqlite` for the schema and migrations. Main tables:

```go
func NewMysqlBackend(host string, port int, user, password, database string, opts ...option)
func NewMysqlBackendWithDB(db *sql.DB, opts ...option)
```

Create a new MySQL backend instance with `NewMysqlBackend`.
Create a new MySQL backend instance with `NewMysqlBackend` or `NewMysqlBackendWithDB`.

Use `NewMysqlBackend` when you want the backend to manage the database connection:

```go
backend := mysql.NewMysqlBackend("localhost", 3306, "user", "password", "dbname")
```

Use `NewMysqlBackendWithDB` when you want to provide your own database connection:

```go
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/dbname?parseTime=true&interpolateParams=true&multiStatements=true")
// Configure connection pool settings as needed
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)

backend := mysql.NewMysqlBackendWithDB(db)
```

**Note:** When using `NewMysqlBackendWithDB`, ensure your connection string includes `multiStatements=true` if you plan to use automatic migrations (which is the default).

### Options

Expand Down
Loading