Skip to content

Commit d927ae6

Browse files
committed
mysql: no parameters
1 parent 2c82a07 commit d927ae6

File tree

4 files changed

+27
-21
lines changed

4 files changed

+27
-21
lines changed

flow/connectors/mysql/cdc.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ func (c *MySqlConnector) getTableSchemaForTable(
6868
return nil, err
6969
}
7070

71-
rs, err := c.Execute(ctx, `select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale
72-
from information_schema.columns
73-
where table_schema = ? and table_name = ? order by ordinal_position`, schemaTable.Schema, schemaTable.Table)
71+
rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key, is_nullable, numeric_precision, numeric_scale
72+
from information_schema.columns where table_schema = '%s' and table_name = '%s' order by ordinal_position`,
73+
mysql.Escape(schemaTable.Schema), mysql.Escape(schemaTable.Table)))
7474
if err != nil {
7575
return nil, err
7676
}

flow/connectors/mysql/mysql.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,18 +440,19 @@ func (c *MySqlConnector) StatActivity(
440440
ctx context.Context,
441441
req *protos.PostgresPeerActivityInfoRequest,
442442
) (*protos.PeerStatResponse, error) {
443-
rs, err := c.Execute(ctx, "SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER=?", c.config.User)
443+
rs, err := c.Execute(ctx,
444+
fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER='%s'", c.config.User))
444445
if err != nil {
445446
// 42S02 is ER_NO_SUCH_TABLE
446447
var myErr *mysql.MyError
447448
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
448449
// mariadb
449450
rs, err = c.Execute(ctx,
450-
"SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+
451-
" FROM performance_schema.threads WHERE USER=?", c.config.User)
451+
fmt.Sprintf("SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+
452+
" FROM performance_schema.threads WHERE USER='%s'", c.config.User))
452453
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
453454
rs, err = c.Execute(ctx,
454-
"SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER=?", c.config.User)
455+
fmt.Sprintf("SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER='%s'", c.config.User))
455456
}
456457
}
457458

flow/connectors/mysql/qrep.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
)
2323

2424
func (c *MySqlConnector) tableRowEstimate(ctx context.Context, schema string, table string) (int64, error) {
25-
rs, err := c.Execute(ctx, "select table_rows from information_schema.tables where table_schema=? and table_name=?", schema, table)
25+
rs, err := c.Execute(ctx, fmt.Sprintf("select table_rows from information_schema.tables where table_schema='%s' and table_name='%s'",
26+
mysql.Escape(schema), mysql.Escape(table)))
2627
if err != nil {
2728
return 0, fmt.Errorf("failed to query information schema for row count estimate: %w", err)
2829
}
@@ -61,12 +62,13 @@ func (c *MySqlConnector) GetQRepPartitions(
6162
var minmaxQuery string
6263
var minmaxHasCount bool
6364
if last != nil && last.Range != nil {
65+
// partial query, append minVal later
6466
if numPartitions == 0 {
6567
minmaxHasCount = true
66-
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ?",
68+
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`),COUNT(*) FROM %[1]s WHERE `%[2]s` > ",
6769
parsedWatermarkTable.MySQL(), config.WatermarkColumn)
6870
} else {
69-
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ?",
71+
minmaxQuery = fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s WHERE `%[2]s` > ",
7072
parsedWatermarkTable.MySQL(), config.WatermarkColumn)
7173
}
7274
} else if numPartitions == 0 {
@@ -94,20 +96,20 @@ func (c *MySqlConnector) GetQRepPartitions(
9496
}
9597
}
9698

97-
var minVal any
9899
var rs *mysql.Result
99100
if last != nil && last.Range != nil {
101+
var minVal string
100102
switch lastRange := last.Range.Range.(type) {
101103
case *protos.PartitionRange_IntRange:
102-
minVal = lastRange.IntRange.End
104+
minVal = strconv.FormatInt(lastRange.IntRange.End, 10)
103105
case *protos.PartitionRange_UintRange:
104-
minVal = lastRange.UintRange.End
106+
minVal = strconv.FormatUint(lastRange.UintRange.End, 10)
105107
case *protos.PartitionRange_TimestampRange:
106-
minVal = lastRange.TimestampRange.End.AsTime().String()
108+
minVal = "'" + lastRange.TimestampRange.End.AsTime().Format("2006-01-02 15:04:05.999999") + "'"
107109
}
108110

109-
c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.Any("minVal", minVal))
110-
rs, err = c.Execute(ctx, minmaxQuery, minVal)
111+
c.logger.Info("querying min/max", slog.String("query", minmaxQuery), slog.String("minVal", minVal))
112+
rs, err = c.Execute(ctx, minmaxQuery+minVal)
111113
} else {
112114
c.logger.Info("querying min/max", slog.String("query", minmaxQuery))
113115
rs, err = c.Execute(ctx, minmaxQuery)

flow/connectors/mysql/schema.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package connmysql
22

33
import (
44
"context"
5+
"fmt"
56
"slices"
67

8+
gomysql "github.com/go-mysql-org/go-mysql/mysql"
9+
710
"github.com/PeerDB-io/peerdb/flow/generated/protos"
811
"github.com/PeerDB-io/peerdb/flow/pkg/mysql"
912
)
@@ -48,8 +51,8 @@ func (c *MySqlConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasRes
4851
func (c *MySqlConnector) GetTablesInSchema(
4952
ctx context.Context, schema string, cdcEnabled bool,
5053
) (*protos.SchemaTablesResponse, error) {
51-
rs, err := c.Execute(ctx, `select table_name, data_length + index_length
52-
from information_schema.tables where table_schema = ? order by table_name`, schema)
54+
rs, err := c.Execute(ctx, fmt.Sprintf(`select table_name, data_length + index_length
55+
from information_schema.tables where table_schema = '%s' order by table_name`, gomysql.Escape(schema)))
5356
if err != nil {
5457
return nil, err
5558
}
@@ -74,9 +77,9 @@ func (c *MySqlConnector) GetTablesInSchema(
7477
}
7578

7679
func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error) {
77-
rs, err := c.Execute(ctx, `select column_name, column_type, column_key
78-
from information_schema.columns where table_schema = ? and table_name = ? order by column_name`,
79-
schema, table)
80+
rs, err := c.Execute(ctx, fmt.Sprintf(`select column_name, column_type, column_key
81+
from information_schema.columns where table_schema = '%s' and table_name = '%s' order by column_name`,
82+
gomysql.Escape(schema), gomysql.Escape(table)))
8083
if err != nil {
8184
return nil, err
8285
}

0 commit comments

Comments
 (0)