diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index ce65396583..459a56b4f3 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -68,9 +68,9 @@ func (c *MySqlConnector) getTableSchemaForTable( return nil, err } - rs, err := c.Execute(ctx, `select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale - from information_schema.columns - where table_schema = ? and table_name = ? order by ordinal_position`, schemaTable.Schema, schemaTable.Table) + rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale + from information_schema.columns where table_schema = '%s' and table_name = '%s' order by ordinal_position`, + mysql.Escape(schemaTable.Schema), mysql.Escape(schemaTable.Table))) if err != nil { return nil, err } diff --git a/flow/connectors/mysql/mysql.go b/flow/connectors/mysql/mysql.go index 45e62e47fb..c535f6a1b4 100644 --- a/flow/connectors/mysql/mysql.go +++ b/flow/connectors/mysql/mysql.go @@ -440,18 +440,20 @@ func (c *MySqlConnector) StatActivity( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerStatResponse, error) { - rs, err := c.Execute(ctx, "SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER=?", c.config.User) + rs, err := c.Execute(ctx, + fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER='%s'", mysql.Escape(c.config.User))) if err != nil { // 42S02 is ER_NO_SUCH_TABLE var myErr *mysql.MyError if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" { // mariadb rs, err = c.Execute(ctx, - "SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+ - " FROM performance_schema.threads WHERE USER=?", c.config.User) + fmt.Sprintf("SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+ + " FROM performance_schema.threads WHERE USER='%s'", mysql.Escape(c.config.User))) if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" { rs, err = c.Execute(ctx, - "SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER=?", c.config.User) + fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER='%s'", + mysql.Escape(c.config.User))) } } diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index db5b24cf21..992c26a6ed 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -22,7 +22,8 @@ import ( ) func (c *MySqlConnector) tableRowEstimate(ctx context.Context, schema string, table string) (int64, error) { - rs, err := c.Execute(ctx, "select table_rows from information_schema.tables where table_schema=? and table_name=?", schema, table) + rs, err := c.Execute(ctx, fmt.Sprintf("select table_rows from information_schema.tables where table_schema='%s' and table_name='%s'", + mysql.Escape(schema), mysql.Escape(table))) if err != nil { return 0, fmt.Errorf("failed to query information schema for row count estimate: %w", err) } @@ -61,12 +62,13 @@ func (c *MySqlConnector) GetQRepPartitions( var minmaxQuery string var minmaxHasCount bool if last != nil && last.Range != nil { + // partial query, append minVal later if numPartitions == 0 { minmaxHasCount = true - minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ?", + minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ", parsedWatermarkTable.MySQL(), config.WatermarkColumn) } else { - minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ?", + minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ", parsedWatermarkTable.MySQL(), config.WatermarkColumn) } } else if numPartitions == 0 { @@ -94,20 +96,21 @@ func (c *MySqlConnector) GetQRepPartitions( } } - var minVal any var rs *mysql.Result if last != nil && last.Range != nil { + var minVal string switch lastRange := last.Range.Range.(type) { case *protos.PartitionRange_IntRange: - minVal = lastRange.IntRange.End + minVal = strconv.FormatInt(lastRange.IntRange.End, 10) case *protos.PartitionRange_UintRange: - minVal = lastRange.UintRange.End + minVal = strconv.FormatUint(lastRange.UintRange.End, 10) case *protos.PartitionRange_TimestampRange: - minVal = lastRange.TimestampRange.End.AsTime().String() + time := lastRange.TimestampRange.End.AsTime() + minVal = "'" + time.Format("2006-01-02 15:04:05.999999") + "'" } - c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.Any("minVal", minVal)) - rs, err = c.Execute(ctx, minmaxQuery, minVal) + c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.String("minVal", minVal)) + rs, err = c.Execute(ctx, minmaxQuery+minVal) } else { c.logger.Info("querying min/max", slog.String("query", minmaxQuery)) rs, err = c.Execute(ctx, minmaxQuery) diff --git a/flow/connectors/mysql/schema.go b/flow/connectors/mysql/schema.go index b8691fb854..64c041532b 100644 --- a/flow/connectors/mysql/schema.go +++ b/flow/connectors/mysql/schema.go @@ -2,8 +2,11 @@ package connmysql import ( "context" + "fmt" "slices" + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/pkg/mysql" ) @@ -48,8 +51,8 @@ func (c *MySqlConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasRes func (c *MySqlConnector) GetTablesInSchema( ctx context.Context, schema string, cdcEnabled bool, ) (*protos.SchemaTablesResponse, error) { - rs, err := c.Execute(ctx, `select table_name, data_length + index_length - from information_schema.tables where table_schema = ? order by table_name`, schema) + rs, err := c.Execute(ctx, fmt.Sprintf(`select table_name, data_length + index_length + from information_schema.tables where table_schema = '%s' order by table_name`, gomysql.Escape(schema))) if err != nil { return nil, err } @@ -74,9 +77,9 @@ func (c *MySqlConnector) GetTablesInSchema( } func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error) { - rs, err := c.Execute(ctx, `select column_name, column_type, column_key - from information_schema.columns where table_schema = ? and table_name = ? order by column_name`, - schema, table) + rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key + from information_schema.columns where table_schema = '%s' and table_name = '%s' order by column_name`, + gomysql.Escape(schema), gomysql.Escape(table))) if err != nil { return nil, err }