Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion materialize-mysql/.snapshots/TestConfigURI-Basic
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&multiStatements=true
config valid
2 changes: 1 addition & 1 deletion materialize-mysql/.snapshots/TestConfigURI-IncorrectSSL
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&multiStatements=true
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI multiStatements=true allows users to provide multiple statements to the additional SQL value. Postgres allows multiple statements by default, MySQL requires it to be explicitly enabled.

invalid 'sslmode' configuration: unknown setting "whoops-this-isnt-right"
2 changes: 1 addition & 1 deletion materialize-mysql/.snapshots/TestConfigURI-RequireSSL
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&tls=preferred
will:secret1234@tcp(example.com:3306)/somedb?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&multiStatements=true&tls=preferred
config valid
6 changes: 6 additions & 0 deletions materialize-mysql/.snapshots/TestSpecification
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@
"description": "Name of the database table",
"x-collection-name": true
},
"additional_table_create_sql": {
"type": "string",
"title": "Additional Table Create SQL",
"description": "Additional SQL statement(s) to be run after the table is created.",
"multiline": true
},
"delta_updates": {
"type": "boolean",
"title": "Delta Update",
Expand Down
24 changes: 22 additions & 2 deletions materialize-mysql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
"github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
)

type client struct {
Expand Down Expand Up @@ -221,8 +222,27 @@ func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boi
}

func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error {
_, err := c.db.ExecContext(ctx, tc.TableCreateSql)
return err
var res tableConfig
if tc.Resource != nil {
res = tc.Resource.(tableConfig)
}

if _, err := c.db.ExecContext(ctx, tc.TableCreateSql); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these statements be in a transaction so all tables are created atomically?

return fmt.Errorf("executing CREATE TABLE statement: %w", err)
}

if res.AdditionalSql != "" {
if _, err := c.db.ExecContext(ctx, res.AdditionalSql); err != nil {
return fmt.Errorf("executing additional SQL statement '%s': %w", res.AdditionalSql, err)
}

log.WithFields(log.Fields{
"table": tc.Identifier,
"query": res.AdditionalSql,
}).Info("executed AdditionalSql")
}

return nil
}

func (c *client) DeleteTable(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) {
Expand Down
4 changes: 2 additions & 2 deletions materialize-mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func TestMySQLConfig(t *testing.T) {
}
require.NoError(t, validConfig.Validate())
var uri = validConfig.ToURI()
require.Equal(t, "youser:shmassword@tcp(post.toast:1234)/namegame?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql", uri)
require.Equal(t, "youser:shmassword@tcp(post.toast:1234)/namegame?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&multiStatements=true", uri)

var noPort = validConfig
noPort.Address = "post.toast"
require.NoError(t, noPort.Validate())
uri = noPort.ToURI()
require.Equal(t, "youser:shmassword@tcp(post.toast:3306)/namegame?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql", uri)
require.Equal(t, "youser:shmassword@tcp(post.toast:3306)/namegame?clientFoundRows=true&connectionAttributes=program_name%3AEstuary+materialize-mysql&multiStatements=true", uri)

var noAddress = validConfig
noAddress.Address = ""
Expand Down
6 changes: 4 additions & 2 deletions materialize-mysql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (c config) ToURI() string {
mysqlCfg.Passwd = c.Password
mysqlCfg.DBName = c.Database
mysqlCfg.ClientFoundRows = true
mysqlCfg.MultiStatements = true

if c.Advanced.SSLMode != "" {
// see https://pkg.go.dev/github.com/go-sql-driver/mysql#section-readme
Expand All @@ -212,8 +213,9 @@ func (c config) ToURI() string {
}

type tableConfig struct {
Table string `json:"table" jsonschema:"title=Table,description=Name of the database table" jsonschema_extras:"x-collection-name=true"`
Delta bool `json:"delta_updates,omitempty" jsonschema:"default=false,title=Delta Update,description=Should updates to this table be done via delta updates. Default is false." jsonschema_extras:"x-delta-updates=true"`
Table string `json:"table" jsonschema:"title=Table,description=Name of the database table" jsonschema_extras:"x-collection-name=true"`
AdditionalSql string `json:"additional_table_create_sql,omitempty" jsonschema:"title=Additional Table Create SQL,description=Additional SQL statement(s) to be run after the table is created." jsonschema_extras:"multiline=true"`
Delta bool `json:"delta_updates,omitempty" jsonschema:"default=false,title=Delta Update,description=Should updates to this table be done via delta updates. Default is false." jsonschema_extras:"x-delta-updates=true"`
}

func (r tableConfig) Validate() error {
Expand Down
51 changes: 51 additions & 0 deletions materialize-mysql/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,57 @@ func TestIntegration(t *testing.T) {
)
})

t.Run("additional table create sql", func(t *testing.T) {
ctx := context.Background()
cfg := testConfig()

db, err := stdsql.Open("mysql", cfg.ToURI())
require.NoError(t, err)
defer db.Close()

c := &client{db: db}

tableName := "test_additional_sql"
indexName := "idx_extra"

cleanup := func() { db.ExecContext(ctx, fmt.Sprintf("DROP TABLE %s;", testDialect.Identifier(tableName))) }
cleanup()
t.Cleanup(cleanup)

tc := sql.TableCreate{
Table: sql.Table{Identifier: testDialect.Identifier(tableName)},
TableCreateSql: fmt.Sprintf(
"CREATE TABLE %s (id INT NOT NULL, val VARCHAR(64) NOT NULL);",
testDialect.Identifier(tableName),
),
Resource: tableConfig{
Table: tableName,
AdditionalSql: fmt.Sprintf(
"CREATE INDEX %s ON %s (val); ALTER TABLE %s COMMENT = 'extra';",
indexName,
testDialect.Identifier(tableName),
testDialect.Identifier(tableName),
),
},
}

require.NoError(t, c.CreateTable(ctx, tc))

var indexCount int
require.NoError(t, db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM information_schema.statistics WHERE table_schema = ? AND table_name = ? AND index_name = ?",
cfg.Database, tableName, indexName,
).Scan(&indexCount))
require.Equal(t, 1, indexCount, "expected index created by AdditionalSql")

var tableComment string
require.NoError(t, db.QueryRowContext(ctx,
"SELECT table_comment FROM information_schema.tables WHERE table_schema = ? AND table_name = ?",
cfg.Database, tableName,
).Scan(&tableComment))
require.Equal(t, "extra", tableComment, "expected comment set by AdditionalSql")
})

t.Run("apply changes", func(t *testing.T) {
boilerplate.RunTestAllTasks(t, "testdata/apply-changes.flow.yaml", func(t *testing.T, _ []byte, taskName string, cfg config) {
t.Run(taskName, func(t *testing.T) {
Expand Down
Loading