Skip to content

Commit af687be

Browse files
authored
Merge branch 'master' into remove_failover
2 parents 167a302 + 5acb569 commit af687be

File tree

11 files changed

+256
-67
lines changed

11 files changed

+256
-67
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ build:
99
${GO} build -o bin/go-mysqldump cmd/go-mysqldump/main.go
1010
${GO} build -o bin/go-canal cmd/go-canal/main.go
1111
${GO} build -o bin/go-binlogparser cmd/go-binlogparser/main.go
12+
${GO} build -o bin/go-mysqlserver cmd/go-mysqlserver/main.go
1213

1314
test:
1415
${GO} test --race -timeout 2m ./...

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@ func main() {
304304
}
305305
}
306306
}
307-
308307
```
309308

310309
Another shell

cmd/go-mysqlserver/main.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"net"
6+
7+
"github.com/go-mysql-org/go-mysql/server"
8+
)
9+
10+
func main() {
11+
// Listen for connections on localhost port 4000
12+
l, err := net.Listen("tcp", "127.0.0.1:4000")
13+
if err != nil {
14+
log.Fatal(err)
15+
}
16+
17+
log.Println("Listening on port 4000, connect with 'mysql -h 127.0.0.1 -P 4000 -u root'")
18+
19+
// Accept a new connection once
20+
c, err := l.Accept()
21+
if err != nil {
22+
log.Fatal(err)
23+
}
24+
25+
log.Println("Accepted connection")
26+
27+
// Create a connection with user root and an empty password.
28+
// You can use your own handler to handle command here.
29+
conn, err := server.NewConn(c, "root", "", server.EmptyHandler{})
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
34+
log.Println("Registered the connection with the server")
35+
36+
// as long as the client keeps sending commands, keep handling them
37+
for {
38+
if err := conn.HandleCommand(); err != nil {
39+
log.Fatal(err)
40+
}
41+
}
42+
}

dump/dumper.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"os"
88
"os/exec"
9+
"regexp"
910
"strings"
1011

1112
. "github.com/go-mysql-org/go-mysql/mysql"
@@ -44,6 +45,9 @@ type Dumper struct {
4445

4546
// see detectColumnStatisticsParamSupported
4647
isColumnStatisticsParamSupported bool
48+
49+
mysqldumpVersion string
50+
sourceDataSupported bool
4751
}
4852

4953
func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
@@ -67,7 +71,14 @@ func NewDumper(executionPath string, addr string, user string, password string)
6771
d.IgnoreTables = make(map[string][]string)
6872
d.ExtraOptions = make([]string, 0, 5)
6973
d.masterDataSkipped = false
70-
d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported()
74+
75+
out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
76+
if err != nil {
77+
return d, err
78+
}
79+
d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported(out)
80+
d.mysqldumpVersion = d.getMysqldumpVersion(out)
81+
d.sourceDataSupported = d.detectSourceDataSupported(d.mysqldumpVersion)
7182

7283
d.ErrOut = os.Stderr
7384

@@ -81,12 +92,47 @@ func NewDumper(executionPath string, addr string, user string, password string)
8192
// But this parameter exists only for versions >=8.0.2 (https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html).
8293
//
8394
// For environments where the version of mysql-server and mysqldump differs, we try to check this parameter and use it if available.
84-
func (d *Dumper) detectColumnStatisticsParamSupported() bool {
85-
out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
86-
if err != nil {
95+
func (d *Dumper) detectColumnStatisticsParamSupported(helpOutput []byte) bool {
96+
return bytes.Contains(helpOutput, []byte(`--column-statistics`))
97+
}
98+
99+
// mysqldump Ver 10.19 Distrib 10.3.37-MariaDB, for linux-systemd (x86_64)`, `10.3.37-MariaDB
100+
// opt/mysql/11.0.0/bin/mysqldump from 11.0.0-preview-MariaDB, client 10.19 for linux-systemd (x86_64)
101+
func (d *Dumper) getMysqldumpVersion(helpOutput []byte) string {
102+
mysqldumpVersionRegexpNew := regexp.MustCompile(`mysqldump Ver ([0-9][^ ]*) for`)
103+
if m := mysqldumpVersionRegexpNew.FindSubmatch(helpOutput); m != nil {
104+
return string(m[1])
105+
}
106+
107+
mysqldumpVersionRegexpOld := regexp.MustCompile(`mysqldump Ver .* Distrib ([0-9][^ ]*),`)
108+
if m := mysqldumpVersionRegexpOld.FindSubmatch(helpOutput); m != nil {
109+
return string(m[1])
110+
}
111+
112+
mysqldumpVersionRegexpMaria := regexp.MustCompile(`mysqldump from ([0-9][^ ]*), `)
113+
if m := mysqldumpVersionRegexpMaria.FindSubmatch(helpOutput); m != nil {
114+
return string(m[1])
115+
}
116+
117+
return ""
118+
}
119+
120+
func (d *Dumper) detectSourceDataSupported(version string) bool {
121+
// Failed to detect mysqldump version
122+
if version == "" {
123+
return false
124+
}
125+
126+
// MySQL 5.x
127+
if version[0] == byte('5') {
128+
return false
129+
}
130+
131+
if strings.Contains(version, "MariaDB") {
87132
return false
88133
}
89-
return bytes.Contains(out, []byte(`--column-statistics`))
134+
135+
return true
90136
}
91137

92138
func (d *Dumper) SetCharset(charset string) {
@@ -169,7 +215,11 @@ func (d *Dumper) Dump(w io.Writer) error {
169215
passwordArgIndex := len(args) - 1
170216

171217
if !d.masterDataSkipped {
172-
args = append(args, "--master-data")
218+
if d.sourceDataSupported {
219+
args = append(args, "--source-data")
220+
} else {
221+
args = append(args, "--master-data")
222+
}
173223
}
174224

175225
if d.maxAllowedPacket > 0 {

dump/dumper_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package dump
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestGetMysqldumpVersion(t *testing.T) {
10+
versions := []struct {
11+
line string // mysqldump --help | head -1
12+
version string // 9.1.0
13+
}{
14+
// Oracle MySQL
15+
{`mysqldump Ver 10.13 Distrib 5.5.62, for linux-glibc2.12 (x86_64)`, `5.5.62`},
16+
{`mysqldump Ver 10.13 Distrib 5.6.44, for linux-glibc2.12 (x86_64)`, `5.6.44`},
17+
{`mysqldump Ver 10.13 Distrib 5.7.31, for linux-glibc2.12 (x86_64)`, `5.7.31`},
18+
{`mysqldump Ver 10.13 Distrib 5.7.36, for linux-glibc2.12 (x86_64)`, `5.7.36`},
19+
{`mysqldump Ver 8.0.11 for linux-glibc2.12 on x86_64 (MySQL Community Server - GPL)`, `8.0.11`},
20+
{`mysqldump Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.22`},
21+
{`mysqldump Ver 8.0.25 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.25`},
22+
{`mysqldump Ver 8.0.26 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.26`},
23+
{`mysqldump Ver 8.0.27 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.27`},
24+
{`mysqldump Ver 8.0.28 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.28`},
25+
{`mysqldump Ver 8.0.31 for Linux on x86_64 (Source distribution)`, `8.0.31`},
26+
{`mysqldump Ver 8.0.32 for Linux on x86_64 (MySQL Community Server - GPL)`, `8.0.32`},
27+
{`mysqldump Ver 8.4.2 for FreeBSD14.0 on amd64 (Source distribution)`, `8.4.2`},
28+
{`mysqldump Ver 9.1.0 for Linux on x86_64 (MySQL Community Server - GPL)`, `9.1.0`},
29+
30+
// MariaDB
31+
{`mysqldump Ver 10.19 Distrib 10.3.37-MariaDB, for linux-systemd (x86_64)`, `10.3.37-MariaDB`},
32+
{`mysqldump Ver 10.19 Distrib 10.6.11-MariaDB, for linux-systemd (x86_64)`, `10.6.11-MariaDB`},
33+
{`opt/mysql/11.0.0/bin/mysqldump from 11.0.0-preview-MariaDB, client 10.19 for linux-systemd (x86_64)`, `11.0.0-preview-MariaDB`},
34+
{`opt/mysql/11.2.2/bin/mysqldump from 11.2.2-MariaDB, client 10.19 for linux-systemd (x86_64)`, `11.2.2-MariaDB`},
35+
}
36+
37+
d := new(Dumper)
38+
for _, v := range versions {
39+
ver := d.getMysqldumpVersion([]byte(v.line))
40+
require.Equal(t, v.version, ver, v.line)
41+
}
42+
}
43+
44+
func TestDetectSourceDataSupported(t *testing.T) {
45+
versions := []struct {
46+
version string
47+
supported bool
48+
}{
49+
{`5.7.40`, false},
50+
{`8.0.11`, true},
51+
{`8.4.1`, true},
52+
{`9.1.0`, true},
53+
{``, false},
54+
{`10.3.37-MariaDB`, false},
55+
{`11.2.2-MariaDB`, false},
56+
}
57+
58+
d := new(Dumper)
59+
for _, v := range versions {
60+
require.Equal(t, v.supported, d.detectSourceDataSupported(v.version), v.version)
61+
}
62+
}

dump/parser.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var valuesExp *regexp.Regexp
2929
var gtidExp *regexp.Regexp
3030

3131
func init() {
32-
binlogExp = regexp.MustCompile(`^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\d+);`)
32+
binlogExp = regexp.MustCompile(`^CHANGE (MASTER|REPLICATION SOURCE) TO (MASTER_LOG_FILE|SOURCE_LOG_FILE)='(.+)', (MASTER_LOG_POS|SOURCE_LOG_POS)=(\d+);`)
3333
useExp = regexp.MustCompile("^USE `(.+)`;")
3434
valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$")
3535
// The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB.
@@ -71,8 +71,8 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
7171
}
7272
}
7373
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
74-
name := m[0][1]
75-
pos, err := strconv.ParseUint(m[0][2], 10, 64)
74+
name := m[0][3]
75+
pos, err := strconv.ParseUint(m[0][5], 10, 64)
7676
if err != nil {
7777
return errors.Errorf("parse binlog %v err, invalid number", line)
7878
}

dump/parser_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,34 @@ import (
88
"github.com/stretchr/testify/require"
99
)
1010

11+
// This tests the binlogExp regexp that matches the line that mysqldump adds when called with --master-data or --source-data
12+
func TestBinlogExp(t *testing.T) {
13+
stmts := []struct {
14+
input string
15+
file string
16+
pos string
17+
}{
18+
{
19+
// MySQL 9.1.0
20+
`CHANGE REPLICATION SOURCE TO SOURCE_LOG_FILE='binlog.000002', SOURCE_LOG_POS=170923;`,
21+
`binlog.000002`,
22+
`170923`,
23+
},
24+
{
25+
`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`,
26+
`mysql-bin.008995`,
27+
`102052485`,
28+
},
29+
}
30+
31+
for _, stmt := range stmts {
32+
m := binlogExp.FindAllStringSubmatch(stmt.input, -1)
33+
require.NotNil(t, m)
34+
require.Equal(t, stmt.file, m[0][3])
35+
require.Equal(t, stmt.pos, m[0][5])
36+
}
37+
}
38+
1139
func TestParseGtidExp(t *testing.T) {
1240
// binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
1341
// gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")

replication/binlogsyncer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func (b *BinlogSyncer) registerSlave() error {
356356

357357
serverUUID, err := uuid.NewUUID()
358358
if err != nil {
359-
b.cfg.Logger.Errorf("failed to get new uud %v", err)
359+
b.cfg.Logger.Errorf("failed to get new uuid %v", err)
360360
return errors.Trace(err)
361361
}
362362
if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil {
@@ -404,6 +404,8 @@ func (b *BinlogSyncer) prepare() error {
404404
return errors.Trace(err)
405405
}
406406

407+
b.cfg.Logger.Infof("Connected to %s %s server", b.cfg.Flavor, b.c.GetServerVersion())
408+
407409
return nil
408410
}
409411

0 commit comments

Comments
 (0)