From 6f8b7746810d4ea70a4759a60bf4d78f20f1c37d Mon Sep 17 00:00:00 2001 From: sinomoe Date: Fri, 8 Nov 2024 15:59:21 +0800 Subject: [PATCH 1/5] feat(canal): support ipv6 address --- canal/canal.go | 20 ++++++++++++++------ dump/dumper.go | 23 +++++++++++++++++++---- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 1ba0c5165..50015403b 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -481,17 +481,25 @@ func (c *Canal) prepareSyncer() error { if strings.Contains(c.cfg.Addr, "/") { cfg.Host = c.cfg.Addr } else { - seps := strings.Split(c.cfg.Addr, ":") - if len(seps) != 2 { - return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr) + ipv6 := strings.Count(c.cfg.Addr, ":") > 1 + if ipv6 && !strings.ContainsAny(c.cfg.Addr, "[]") { + return errors.Errorf("invalid mysql ipv6 addr format %s, must [host]:port", c.cfg.Addr) } - - port, err := strconv.ParseUint(seps[1], 10, 16) + lastSep := strings.LastIndex(c.cfg.Addr, ":") + if !ipv6 && lastSep == -1 { + return errors.Errorf("invalid mysql ipv4 addr format %s, must host:port", c.cfg.Addr) + } + addr := strings.Trim(c.cfg.Addr[:lastSep], "[]") + ip := net.ParseIP(addr) + if ip == nil { + return errors.Errorf("invalid mysql ip format %s", addr) + } + port, err := strconv.ParseUint(c.cfg.Addr[lastSep+1:], 10, 16) if err != nil { return errors.Trace(err) } - cfg.Host = seps[0] + cfg.Host = addr cfg.Port = uint16(port) } diff --git a/dump/dumper.go b/dump/dumper.go index c7ea56db7..b5e0a4e38 100644 --- a/dump/dumper.go +++ b/dump/dumper.go @@ -212,10 +212,25 @@ func (d *Dumper) Dump(w io.Writer) error { if strings.Contains(d.Addr, "/") { args = append(args, fmt.Sprintf("--socket=%s", d.Addr)) } else { - seps := strings.SplitN(d.Addr, ":", 2) - args = append(args, fmt.Sprintf("--host=%s", seps[0])) - if len(seps) > 1 { - args = append(args, fmt.Sprintf("--port=%s", seps[1])) + ipv6 := strings.Count(d.Addr, ":") > 1 + lastSep := strings.LastIndex(d.Addr, ":") + var host, port string + // without port + host = d.Addr + // ipv6 with port + if ipv6 && strings.ContainsAny(d.Addr, "[]") { + host = strings.Trim(d.Addr[:lastSep], "[]") + port = d.Addr[lastSep+1:] + } + // ipv4 with port + if !ipv6 && lastSep != -1 { + host = d.Addr[:lastSep] + port = d.Addr[lastSep+1:] + } + + args = append(args, fmt.Sprintf("--host=%s", host)) + if port != "" { + args = append(args, fmt.Sprintf("--port=%s", port)) } } From 2c1adaaf153cfad97bad63ac5659a646e9d58a08 Mon Sep 17 00:00:00 2001 From: sinomoe Date: Fri, 8 Nov 2024 16:21:45 +0800 Subject: [PATCH 2/5] feat: use net.SplitHostPort to parse ip with ports --- canal/canal.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 50015403b..f4376796e 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -481,26 +481,17 @@ func (c *Canal) prepareSyncer() error { if strings.Contains(c.cfg.Addr, "/") { cfg.Host = c.cfg.Addr } else { - ipv6 := strings.Count(c.cfg.Addr, ":") > 1 - if ipv6 && !strings.ContainsAny(c.cfg.Addr, "[]") { - return errors.Errorf("invalid mysql ipv6 addr format %s, must [host]:port", c.cfg.Addr) - } - lastSep := strings.LastIndex(c.cfg.Addr, ":") - if !ipv6 && lastSep == -1 { - return errors.Errorf("invalid mysql ipv4 addr format %s, must host:port", c.cfg.Addr) - } - addr := strings.Trim(c.cfg.Addr[:lastSep], "[]") - ip := net.ParseIP(addr) - if ip == nil { - return errors.Errorf("invalid mysql ip format %s", addr) + host, port, err := net.SplitHostPort(c.cfg.Addr) + if err != nil { + return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr) } - port, err := strconv.ParseUint(c.cfg.Addr[lastSep+1:], 10, 16) + portNumber, err := strconv.ParseUint(port, 10, 16) if err != nil { return errors.Trace(err) } - cfg.Host = addr - cfg.Port = uint16(port) + cfg.Host = host + cfg.Port = uint16(portNumber) } c.syncer = replication.NewBinlogSyncer(cfg) From 31b7e523201a5a493ff4002e7ba1ca1ea93d3c5d Mon Sep 17 00:00:00 2001 From: sinomoe Date: Fri, 8 Nov 2024 16:42:23 +0800 Subject: [PATCH 3/5] chore: optimze validate error message --- canal/canal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/canal.go b/canal/canal.go index f4376796e..64a1c343e 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -483,7 +483,7 @@ func (c *Canal) prepareSyncer() error { } else { host, port, err := net.SplitHostPort(c.cfg.Addr) if err != nil { - return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr) + return errors.Errorf("invalid MySQL address format %s, must host:port", c.cfg.Addr) } portNumber, err := strconv.ParseUint(port, 10, 16) if err != nil { From 5459d61b511798a21f08a4193b53b23ed2ed19e2 Mon Sep 17 00:00:00 2001 From: sinomoe Date: Fri, 8 Nov 2024 18:54:33 +0800 Subject: [PATCH 4/5] feat(ci): complete test cases --- .github/workflows/ci.yml | 6 ++++++ canal/canal_test.go | 23 ++++++++++++++++++++++- mysql/const.go | 1 + 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe75cb5aa..6036731df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,11 @@ jobs: echo -n "mysqldump -V: " ; mysqldump -V echo -e '[mysqld]\nserver-id=1\nlog-bin=mysql\nbinlog-format=row\ngtid-mode=ON\nenforce_gtid_consistency=ON\n' | sudo tee /etc/mysql/conf.d/replication.cnf + + # bind to :: for dual-stack listening + sudo sed -i 's/bind-address.*= 127.0.0.1/bind-address = ::/' /etc/mysql/mysql.conf.d/mysqld.cnf + sudo sed -i 's/mysqlx-bind-address.*= 127.0.0.1/mysqlx-bind-address = ::/' /etc/mysql/mysql.conf.d/mysqld.cnf + sudo service mysql start # apply this for mysql5 & mysql8 compatibility @@ -109,5 +114,6 @@ jobs: uses: actions/setup-go@v5 with: go-version: "1.22" + - name: Build on ${{ matrix.os }}/${{ matrix.arch }} run: GOARCH=${{ matrix.arch }} GOOS=${{ matrix.os }} go build ./... diff --git a/canal/canal_test.go b/canal/canal_test.go index 14a7056b9..444256076 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -16,12 +16,30 @@ import ( ) type canalTestSuite struct { + addr string suite.Suite c *Canal } +type canalTestSuiteOption func(c *canalTestSuite) + +func withAddr(addr string) canalTestSuiteOption { + return func(c *canalTestSuite) { + c.addr = addr + } +} + +func newCanalTestSuite(opts ...canalTestSuiteOption) *canalTestSuite { + c := new(canalTestSuite) + for _, opt := range opts { + opt(c) + } + return c +} + func TestCanalSuite(t *testing.T) { - suite.Run(t, new(canalTestSuite)) + suite.Run(t, newCanalTestSuite()) + suite.Run(t, newCanalTestSuite(withAddr(mysql.DEFAULT_IPV6_ADDR))) } const ( @@ -37,6 +55,9 @@ const ( func (s *canalTestSuite) SetupSuite() { cfg := NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%s", *test_util.MysqlHost, *test_util.MysqlPort) + if s.addr != "" { + cfg.Addr = s.addr + } cfg.User = "root" cfg.HeartbeatPeriod = 200 * time.Millisecond cfg.ReadTimeout = 300 * time.Millisecond diff --git a/mysql/const.go b/mysql/const.go index b6da2e736..a11b05370 100644 --- a/mysql/const.go +++ b/mysql/const.go @@ -172,6 +172,7 @@ const ( const ( DEFAULT_ADDR = "127.0.0.1:3306" + DEFAULT_IPV6_ADDR = "[::1]:3306" DEFAULT_USER = "root" DEFAULT_PASSWORD = "" DEFAULT_FLAVOR = "mysql" From 95942b09eab580a0bf8f3ddb47a38edf532746d4 Mon Sep 17 00:00:00 2001 From: sinomoe Date: Mon, 11 Nov 2024 11:24:12 +0800 Subject: [PATCH 5/5] feat(dumper): use net.SplitHostPort to parse address --- dump/dumper.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/dump/dumper.go b/dump/dumper.go index b5e0a4e38..b2b7e4377 100644 --- a/dump/dumper.go +++ b/dump/dumper.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "net" "os" "os/exec" "regexp" @@ -212,20 +213,9 @@ func (d *Dumper) Dump(w io.Writer) error { if strings.Contains(d.Addr, "/") { args = append(args, fmt.Sprintf("--socket=%s", d.Addr)) } else { - ipv6 := strings.Count(d.Addr, ":") > 1 - lastSep := strings.LastIndex(d.Addr, ":") - var host, port string - // without port - host = d.Addr - // ipv6 with port - if ipv6 && strings.ContainsAny(d.Addr, "[]") { - host = strings.Trim(d.Addr[:lastSep], "[]") - port = d.Addr[lastSep+1:] - } - // ipv4 with port - if !ipv6 && lastSep != -1 { - host = d.Addr[:lastSep] - port = d.Addr[lastSep+1:] + host, port, err := net.SplitHostPort(d.Addr) + if err != nil { + host = d.Addr } args = append(args, fmt.Sprintf("--host=%s", host))