Skip to content

Commit 40b6e73

Browse files
heavycrystalserprexCopilot
authored
[mysql] move validation to shared for reuse (#3150)
Co-authored-by: Philip Dubé <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent e28005b commit 40b6e73

File tree

2 files changed

+277
-227
lines changed

2 files changed

+277
-227
lines changed

flow/connectors/mysql/validate.go

Lines changed: 38 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,10 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"log/slog"
8-
"strconv"
9-
"strings"
10-
11-
"github.com/go-mysql-org/go-mysql/mysql"
127

138
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
149
"github.com/PeerDB-io/peerdb/flow/generated/protos"
15-
"github.com/PeerDB-io/peerdb/flow/shared"
10+
peerdb_mysql "github.com/PeerDB-io/peerdb/flow/shared/mysql"
1611
)
1712

1813
func (c *MySqlConnector) CheckSourceTables(ctx context.Context, tableNames []*utils.SchemaTable) error {
@@ -44,215 +39,37 @@ func (c *MySqlConnector) CheckReplicationConnectivity(ctx context.Context) error
4439
return nil
4540
}
4641

47-
// only for MySQL 5.7 and below
48-
func (c *MySqlConnector) checkMySQL5_BinlogSettings(ctx context.Context) error {
49-
cmp, err := c.CompareServerVersion(ctx, "5.5.0")
50-
if err != nil {
51-
return fmt.Errorf("failed to get server version: %w", err)
52-
}
53-
if cmp < 0 {
54-
c.logger.Warn("cannot validate mysql prior to 5.5.0, uncharted territory")
55-
return nil
56-
}
57-
58-
// MySQL 5.6.6 introduced GTIDs so they may work, no need to enforce filepos for now
59-
60-
query := "SELECT @@binlog_format"
61-
checkBinlogRowImage := false
62-
cmp, err = c.CompareServerVersion(ctx, "5.6.2")
63-
if err != nil {
64-
return fmt.Errorf("failed to get server version: %w", err)
65-
}
66-
if cmp >= 0 {
67-
query += ", @@binlog_row_image"
68-
checkBinlogRowImage = true
69-
}
70-
71-
// binlog_expire_logs_seconds was introduced in 8.0 https://dev.mysql.com/worklog/task/?id=10924
72-
// since expire_logs_days has day granularity, all settings of it work for us so not checking
73-
rs, err := c.Execute(ctx, query)
74-
if err != nil {
75-
return fmt.Errorf("failed to retrieve settings <5.7: %w", err)
76-
}
77-
if len(rs.Values) == 0 {
78-
return errors.New("no value returned for settings <5.7")
79-
}
80-
row := rs.Values[0]
81-
82-
binlogFormat := shared.UnsafeFastReadOnlyBytesToString(row[0].AsString())
83-
if binlogFormat != "ROW" {
84-
return errors.New("binlog_format must be set to 'ROW', currently " + binlogFormat)
85-
}
86-
if checkBinlogRowImage {
87-
binlogRowImage := shared.UnsafeFastReadOnlyBytesToString(row[1].AsString())
88-
if binlogRowImage != "FULL" {
89-
return errors.New("binlog_row_image must be set to 'FULL', currently " + binlogRowImage)
90-
}
91-
}
92-
93-
return nil
94-
}
95-
96-
func (c *MySqlConnector) checkMariaDB_BinlogSettings(ctx context.Context, requireRowMetadata bool) error {
97-
query := "SELECT @@binlog_format, @@binlog_row_image, @@binlog_row_metadata"
98-
99-
checkBinlogExpiry := false
100-
cmp, err := c.CompareServerVersion(ctx, "10.6.1")
101-
if err != nil {
102-
return fmt.Errorf("failed to get server version: %w", err)
103-
}
104-
if cmp >= 0 {
105-
checkBinlogExpiry = true
106-
query += ", @@binlog_expire_logs_seconds"
107-
} else {
108-
c.logger.Warn("MariaDB version does not support binlog_expire_logs_seconds, skipping check")
109-
}
110-
111-
rs, err := c.Execute(ctx, query)
112-
if err != nil {
113-
return fmt.Errorf("failed to retrieve settings: %w", err)
114-
}
115-
if len(rs.Values) == 0 {
116-
return errors.New("no value returned for settings")
117-
}
118-
row := rs.Values[0]
119-
120-
binlogFormat := shared.UnsafeFastReadOnlyBytesToString(row[0].AsString())
121-
if binlogFormat != "ROW" {
122-
return errors.New("binlog_format must be set to 'ROW', currently " + binlogFormat)
123-
}
124-
125-
binlogRowImage := shared.UnsafeFastReadOnlyBytesToString(row[1].AsString())
126-
if binlogRowImage != "FULL" {
127-
return errors.New("binlog_row_image must be set to 'FULL', currently " + binlogRowImage)
128-
}
129-
130-
binlogRowMetadata := shared.UnsafeFastReadOnlyBytesToString(row[2].AsString())
131-
if binlogRowMetadata != "FULL" {
132-
if requireRowMetadata {
133-
return errors.New("binlog_row_metadata must be set to 'FULL' for column exclusion support, currently " + binlogRowMetadata)
134-
} else {
135-
c.logger.Warn("binlog_row_metadata should be set to 'FULL' for more reliable replication",
136-
slog.String("binlog_row_metadata", strings.Clone(binlogRowMetadata)))
137-
}
138-
}
139-
140-
if checkBinlogExpiry {
141-
binlogExpireLogsSeconds := row[3].AsUint64()
142-
if binlogExpireLogsSeconds < 86400 && binlogExpireLogsSeconds != 0 {
143-
c.logger.Warn("binlog_expire_logs_seconds should be at least 24 hours",
144-
slog.Uint64("binlog_expire_logs_seconds", binlogExpireLogsSeconds))
145-
}
146-
}
147-
148-
return nil
149-
}
150-
15142
func (c *MySqlConnector) CheckBinlogSettings(ctx context.Context, requireRowMetadata bool) error {
152-
if c.config.Flavor == protos.MySqlFlavor_MYSQL_MARIA {
153-
return c.checkMariaDB_BinlogSettings(ctx, requireRowMetadata)
154-
} else if c.config.Flavor == protos.MySqlFlavor_MYSQL_MYSQL {
155-
cmp, err := c.CompareServerVersion(ctx, "8.0.1")
43+
for conn, err := range c.withRetries(ctx) {
15644
if err != nil {
157-
return fmt.Errorf("failed to get server version: %w", err)
45+
return err
15846
}
159-
if cmp < 0 {
160-
if requireRowMetadata {
161-
return errors.New(
162-
"MySQL version too old for column exclusion support, " +
163-
"please disable it or upgrade to >8.0.1 (binlog_row_metadata needed)",
164-
)
165-
}
166-
c.logger.Warn("cannot validate mysql prior to 8.0.1, falling back to MySQL 5.7 check")
167-
return c.checkMySQL5_BinlogSettings(ctx)
168-
}
169-
}
170-
171-
query := "SELECT @@binlog_expire_logs_seconds, @@binlog_format, @@binlog_row_image, @@binlog_row_metadata"
172-
173-
checkRowValueOptions := false
174-
cmp, err := c.CompareServerVersion(ctx, "8.0.3")
175-
if err != nil {
176-
return fmt.Errorf("failed to get server version: %w", err)
177-
}
178-
if cmp >= 0 {
179-
checkRowValueOptions = true
180-
query += ", @@binlog_row_value_options"
181-
}
182-
183-
rs, err := c.Execute(ctx, query)
184-
if err != nil {
185-
return fmt.Errorf("failed to retrieve settings: %w", err)
186-
}
187-
if len(rs.Values) == 0 {
188-
return errors.New("no value returned for settings")
189-
}
190-
row := rs.Values[0]
191-
192-
binlogExpireLogsSeconds := row[0].AsUint64()
193-
if binlogExpireLogsSeconds < 86400 && binlogExpireLogsSeconds != 0 {
194-
c.logger.Warn("binlog_expire_logs_seconds should be at least 24 hours",
195-
slog.Uint64("binlog_expire_logs_seconds", binlogExpireLogsSeconds))
196-
}
197-
198-
binlogFormat := shared.UnsafeFastReadOnlyBytesToString(row[1].AsString())
199-
if binlogFormat != "ROW" {
200-
return errors.New("binlog_format must be set to 'ROW', currently " + binlogFormat)
201-
}
202-
203-
binlogRowImage := shared.UnsafeFastReadOnlyBytesToString(row[2].AsString())
204-
if binlogRowImage != "FULL" {
205-
c.logger.Warn("binlog_row_image should be set to 'FULL' to avoid missing data",
206-
slog.String("binlog_row_image", strings.Clone(binlogRowImage)))
207-
}
20847

209-
binlogRowMetadata := shared.UnsafeFastReadOnlyBytesToString(row[3].AsString())
210-
if binlogRowMetadata != "FULL" {
211-
if requireRowMetadata {
212-
return errors.New("binlog_row_metadata must be set to 'FULL' for column exclusion support, currently " + binlogRowMetadata)
213-
} else {
214-
c.logger.Warn("binlog_row_metadata should be set to 'FULL' for more reliable replication",
215-
slog.String("binlog_row_metadata", strings.Clone(binlogRowMetadata)))
216-
}
217-
}
218-
219-
if checkRowValueOptions {
220-
binlogRowValueOptions := shared.UnsafeFastReadOnlyBytesToString(row[4].AsString())
221-
if binlogRowValueOptions != "" {
222-
return errors.New("binlog_row_value_options must be disabled, currently " + binlogRowValueOptions)
223-
}
224-
}
225-
226-
return nil
227-
}
228-
229-
func (c *MySqlConnector) CheckRDSBinlogSettings(ctx context.Context) error {
230-
// AWS RDS/Aurora has its own binlog retention setting that we need to check, minimum 24h
231-
// check RDS/Aurora binlog retention setting
232-
if rs, err := c.Execute(ctx, "SELECT value FROM mysql.rds_configuration WHERE name='binlog retention hours'"); err != nil {
233-
var mErr *mysql.MyError
234-
if errors.As(err, &mErr) && mErr.Code == mysql.ER_NO_SUCH_TABLE || mErr.Code == mysql.ER_TABLEACCESS_DENIED_ERROR {
235-
// Table doesn't exist, which means this is not RDS/Aurora
236-
slog.Warn("mysql.rds_configuration table does not exist, skipping Aurora/RDS binlog retention check", slog.Any("error", err))
237-
return nil
238-
}
239-
return errors.New("failed to check RDS/Aurora binlog retention hours: " + err.Error())
240-
} else if len(rs.Values) > 0 {
241-
binlogRetentionHoursStr := shared.UnsafeFastReadOnlyBytesToString(rs.Values[0][0].AsString())
242-
if binlogRetentionHoursStr == "" {
243-
return errors.New("RDS/Aurora setting 'binlog retention hours' should be at least 24, currently unset")
244-
}
245-
slog.Info("binlog retention hours", "binlogRetentionHours", binlogRetentionHoursStr)
246-
if binlogRetentionHours, err := strconv.Atoi(binlogRetentionHoursStr); err != nil {
247-
return errors.New("failed to parse RDS/Aurora setting 'binlog retention hours': " + err.Error())
248-
} else if binlogRetentionHours < 24 {
249-
return errors.New("RDS/Aurora setting 'binlog retention hours' should be at least 24, currently " + binlogRetentionHoursStr)
48+
switch c.config.Flavor {
49+
case protos.MySqlFlavor_MYSQL_MARIA:
50+
return peerdb_mysql.CheckMariaDBBinlogSettings(conn, c.logger)
51+
case protos.MySqlFlavor_MYSQL_MYSQL:
52+
cmp, err := c.CompareServerVersion(ctx, "8.0.1")
53+
if err != nil {
54+
return fmt.Errorf("failed to get server version: %w", err)
55+
}
56+
if cmp < 0 {
57+
if requireRowMetadata {
58+
return errors.New(
59+
"MySQL version too old for column exclusion support, " +
60+
"please disable it or upgrade to >8.0.1 (binlog_row_metadata needed)",
61+
)
62+
}
63+
c.logger.Warn("cannot validate mysql prior to 8.0.1, falling back to MySQL 5.7 check")
64+
return peerdb_mysql.CheckMySQL5BinlogSettings(conn, c.logger)
65+
} else {
66+
return peerdb_mysql.CheckMySQL8BinlogSettings(conn, c.logger)
67+
}
68+
default:
69+
return fmt.Errorf("unsupported MySQL flavor: %s", c.config.Flavor.String())
25070
}
251-
} else {
252-
slog.Warn("binlog retention hours returned nothing, skipping Aurora/RDS binlog retention check")
25371
}
254-
255-
return nil
72+
return errors.New("failed to connect to MySQL server")
25673
}
25774

25875
func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
@@ -287,8 +104,13 @@ func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
287104
if err := c.CheckBinlogSettings(ctx, requireRowMetadata); err != nil {
288105
return fmt.Errorf("binlog configuration error: %w", err)
289106
}
290-
if err := c.CheckRDSBinlogSettings(ctx); err != nil {
291-
return fmt.Errorf("binlog configuration error: %w", err)
107+
for conn, err := range c.withRetries(ctx) {
108+
if err != nil {
109+
return err
110+
}
111+
if err := peerdb_mysql.CheckRDSBinlogSettings(conn, c.logger); err != nil {
112+
return fmt.Errorf("binlog configuration error: %w", err)
113+
}
292114
}
293115

294116
return nil
@@ -299,22 +121,11 @@ func (c *MySqlConnector) ValidateCheck(ctx context.Context) error {
299121
return errors.New("flavor is set to unknown")
300122
}
301123

302-
// MariaDB specific setting, introduced in MariaDB 10.0.3
303-
if rs, err := c.Execute(ctx, "select @@gtid_strict_mode"); err != nil {
304-
var mErr *mysql.MyError
305-
// seems to be MySQL
306-
if errors.As(err, &mErr) && mErr.Code == mysql.ER_UNKNOWN_SYSTEM_VARIABLE {
307-
if c.config.Flavor != protos.MySqlFlavor_MYSQL_MYSQL {
308-
return errors.New("server appears to be MySQL but flavor is not set to MySQL")
309-
}
310-
} else {
311-
return fmt.Errorf("failed to check GTID mode: %w", err)
312-
}
313-
} else if len(rs.Values) > 0 {
314-
if c.config.Flavor != protos.MySqlFlavor_MYSQL_MARIA {
315-
return errors.New("server appears to be MariaDB but flavor is not set to MariaDB")
124+
for conn, err := range c.withRetries(ctx) {
125+
if err != nil {
126+
return err
316127
}
128+
return peerdb_mysql.ValidateFlavor(conn, c.config.Flavor)
317129
}
318-
319-
return nil
130+
return errors.New("failed to connect to MySQL server")
320131
}

0 commit comments

Comments
 (0)