Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (c *MySqlConnector) getTableSchemaForTable(
return nil, err
}

rs, err := c.Execute(ctx, `select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale
from information_schema.columns
where table_schema = ? and table_name = ? order by ordinal_position`, schemaTable.Schema, schemaTable.Table)
rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale
from information_schema.columns where table_schema = '%s' and table_name = '%s' order by ordinal_position`,
mysql.Escape(schemaTable.Schema), mysql.Escape(schemaTable.Table)))
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,20 @@ func (c *MySqlConnector) StatActivity(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
rs, err := c.Execute(ctx, "SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER=?", c.config.User)
rs, err := c.Execute(ctx,
fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER='%s'", mysql.Escape(c.config.User)))
if err != nil {
// 42S02 is ER_NO_SUCH_TABLE
var myErr *mysql.MyError
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
// mariadb
rs, err = c.Execute(ctx,
"SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+
" FROM performance_schema.threads WHERE USER=?", c.config.User)
fmt.Sprintf("SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+
" FROM performance_schema.threads WHERE USER='%s'", mysql.Escape(c.config.User)))
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
rs, err = c.Execute(ctx,
"SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER=?", c.config.User)
fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER='%s'",
mysql.Escape(c.config.User)))
}
}

Expand Down
21 changes: 12 additions & 9 deletions flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

func (c *MySqlConnector) tableRowEstimate(ctx context.Context, schema string, table string) (int64, error) {
rs, err := c.Execute(ctx, "select table_rows from information_schema.tables where table_schema=? and table_name=?", schema, table)
rs, err := c.Execute(ctx, fmt.Sprintf("select table_rows from information_schema.tables where table_schema='%s' and table_name='%s'",
mysql.Escape(schema), mysql.Escape(table)))
if err != nil {
return 0, fmt.Errorf("failed to query information schema for row count estimate: %w", err)
}
Expand Down Expand Up @@ -61,12 +62,13 @@ func (c *MySqlConnector) GetQRepPartitions(
var minmaxQuery string
var minmaxHasCount bool
if last != nil && last.Range != nil {
// partial query, append minVal later
if numPartitions == 0 {
minmaxHasCount = true
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ?",
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ",
parsedWatermarkTable.MySQL(), config.WatermarkColumn)
} else {
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ?",
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ",
parsedWatermarkTable.MySQL(), config.WatermarkColumn)
}
} else if numPartitions == 0 {
Expand Down Expand Up @@ -94,20 +96,21 @@ func (c *MySqlConnector) GetQRepPartitions(
}
}

var minVal any
var rs *mysql.Result
if last != nil && last.Range != nil {
var minVal string
switch lastRange := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
minVal = lastRange.IntRange.End
minVal = strconv.FormatInt(lastRange.IntRange.End, 10)
case *protos.PartitionRange_UintRange:
minVal = lastRange.UintRange.End
minVal = strconv.FormatUint(lastRange.UintRange.End, 10)
case *protos.PartitionRange_TimestampRange:
minVal = lastRange.TimestampRange.End.AsTime().String()
time := lastRange.TimestampRange.End.AsTime()
minVal = "'" + time.Format("2006-01-02 15:04:05.999999") + "'"
}

c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.Any("minVal", minVal))
rs, err = c.Execute(ctx, minmaxQuery, minVal)
c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.String("minVal", minVal))
rs, err = c.Execute(ctx, minmaxQuery+minVal)
} else {
c.logger.Info("querying min/max", slog.String("query", minmaxQuery))
rs, err = c.Execute(ctx, minmaxQuery)
Expand Down
13 changes: 8 additions & 5 deletions flow/connectors/mysql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package connmysql

import (
"context"
"fmt"
"slices"

gomysql "github.com/go-mysql-org/go-mysql/mysql"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/pkg/mysql"
)
Expand Down Expand Up @@ -48,8 +51,8 @@ func (c *MySqlConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasRes
func (c *MySqlConnector) GetTablesInSchema(
ctx context.Context, schema string, cdcEnabled bool,
) (*protos.SchemaTablesResponse, error) {
rs, err := c.Execute(ctx, `select table_name, data_length + index_length
from information_schema.tables where table_schema = ? order by table_name`, schema)
rs, err := c.Execute(ctx, fmt.Sprintf(`select table_name, data_length + index_length
from information_schema.tables where table_schema = '%s' order by table_name`, gomysql.Escape(schema)))
if err != nil {
return nil, err
}
Expand All @@ -74,9 +77,9 @@ func (c *MySqlConnector) GetTablesInSchema(
}

func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error) {
rs, err := c.Execute(ctx, `select column_name, column_type, column_key
from information_schema.columns where table_schema = ? and table_name = ? order by column_name`,
schema, table)
rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key
from information_schema.columns where table_schema = '%s' and table_name = '%s' order by column_name`,
gomysql.Escape(schema), gomysql.Escape(table)))
if err != nil {
return nil, err
}
Expand Down
Loading