From c45a40502184c326a0a937f51012cd659bc13c88 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 10:25:33 -0400 Subject: [PATCH 1/7] Notify waiting channels on completed transaction, not just the watermark. --- go/logic/coordinator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index f3ee4942f..8c9243fcd 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -520,9 +520,9 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize } channelsToNotify = make([]chan struct{}, 0) - // Schedule any jobs that were waiting for this job to complete + // Schedule any jobs that were waiting for this job to complete or for the low watermark for waitingForSequenceNumber, channels := range c.waitingJobs { - if waitingForSequenceNumber <= c.lowWaterMark { + if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber { channelsToNotify = append(channelsToNotify, channels...) delete(c.waitingJobs, waitingForSequenceNumber) } From 9cf1bfdd702533c6efdb688664dbf1160826f0c6 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 10:35:16 -0400 Subject: [PATCH 2/7] Add checksum validation to coordinator test --- go/logic/coordinator_test.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 6a5f6d8df..c11c13fbc 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -179,8 +179,8 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { return streamCtx.Err() != nil } go func() { - err = coord.StartStreaming(streamCtx, canStopStreaming) - suite.Require().Equal(context.Canceled, err) + streamErr := coord.StartStreaming(streamCtx, canStopStreaming) + suite.Require().Equal(context.Canceled, streamErr) }() // Give streamer some time to start @@ -199,6 +199,23 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { } fmt.Printf("Time taken: %s\n", time.Since(startAt)) + + result, err := suite.db.Exec(`SELECT * FROM ( + SELECT t1.id, + CRC32(CONCAT_WS(';',t1.id,t1.name)) + AS checksum1, + CRC32(CONCAT_WS(';',t2.id,t2.name)) + AS checksum2 + FROM test.gh_ost_test t1 + LEFT JOIN test._gh_ost_test_gho t2 + ON t1.id = t2.id +) AS checksums +WHERE checksums.checksum1 != checksums.checksum2`) + suite.Require().NoError(err) + + count, err := result.RowsAffected() + suite.Require().NoError(err) + suite.Require().Zero(count) } func TestCoordinator(t *testing.T) { From 011794dd1bad211530de8f794919d75d7a8fad7f Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 10:46:48 -0400 Subject: [PATCH 3/7] Use errgroup to perform transactions concurrently in coordinator_test.go --- go/logic/coordinator.go | 2 +- go/logic/coordinator_test.go | 39 +++-- vendor/golang.org/x/sync/LICENSE | 27 ++++ vendor/golang.org/x/sync/PATENTS | 22 +++ vendor/golang.org/x/sync/errgroup/errgroup.go | 136 ++++++++++++++++++ vendor/golang.org/x/sync/errgroup/go120.go | 13 ++ .../golang.org/x/sync/errgroup/pre_go120.go | 14 ++ vendor/modules.txt | 1 + 8 files changed, 241 insertions(+), 13 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go create mode 100644 vendor/golang.org/x/sync/errgroup/go120.go create mode 100644 vendor/golang.org/x/sync/errgroup/pre_go120.go diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 8c9243fcd..f0702b92b 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -522,7 +522,7 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize // Schedule any jobs that were waiting for this job to complete or for the low watermark for waitingForSequenceNumber, channels := range c.waitingJobs { - if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber { + if waitingForSequenceNumber <= c.lowWaterMark { channelsToNotify = append(channelsToNotify, channels...) delete(c.waitingJobs, waitingForSequenceNumber) } diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index c11c13fbc..9db78f2a9 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -4,6 +4,7 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand/v2" "os" "testing" "time" @@ -18,13 +19,15 @@ import ( "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "golang.org/x/sync/errgroup" ) type CoordinatorTestSuite struct { suite.Suite - mysqlContainer testcontainers.Container - db *gosql.DB + mysqlContainer testcontainers.Container + db *gosql.DB + concurrentTransactions int } func (suite *CoordinatorTestSuite) SetupSuite() { @@ -51,6 +54,10 @@ func (suite *CoordinatorTestSuite) SetupSuite() { suite.Require().NoError(err) suite.db = db + suite.concurrentTransactions = 500 + + _, err = db.Exec(fmt.Sprintf("SET GLOBAL max_connections = %d", suite.concurrentTransactions*2)) + suite.Require().NoError(err) } func (suite *CoordinatorTestSuite) SetupTest() { @@ -133,20 +140,28 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { err = applier.CreateChangelogTable() suite.Require().NoError(err) - // TODO: use errgroup - for i := 0; i < 100; i++ { - tx, err := suite.db.Begin() - suite.Require().NoError(err) + g, ctx := errgroup.WithContext(ctx) + for range suite.concurrentTransactions { + g.Go(func() error { + tx, txErr := suite.db.Begin() + if txErr != nil { + return txErr + } - for j := 0; j < 100; j++ { - _, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')") - suite.Require().NoError(err) - } + for range 100 { + _, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int())) + if txErr != nil { + return txErr + } + } - err = tx.Commit() - suite.Require().NoError(err) + return tx.Commit() + }) } + err = g.Wait() + suite.Require().NoError(err) + _, err = suite.db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1") suite.Require().NoError(err) diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..2a7cf70da --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000..b8322598a --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,136 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := withCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// A limit of zero will prevent any new goroutines from being added. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/golang.org/x/sync/errgroup/go120.go b/vendor/golang.org/x/sync/errgroup/go120.go new file mode 100644 index 000000000..f93c740b6 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/go120.go @@ -0,0 +1,13 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + return context.WithCancelCause(parent) +} diff --git a/vendor/golang.org/x/sync/errgroup/pre_go120.go b/vendor/golang.org/x/sync/errgroup/pre_go120.go new file mode 100644 index 000000000..88ce33434 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/pre_go120.go @@ -0,0 +1,14 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + ctx, cancel := context.WithCancel(parent) + return ctx, func(error) { cancel() } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 70e8758d2..d52603324 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -299,6 +299,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf golang.org/x/net/context # golang.org/x/sync v0.11.0 ## explicit; go 1.18 +golang.org/x/sync/errgroup # golang.org/x/sys v0.30.0 ## explicit; go 1.18 golang.org/x/sys/cpu From e4c36d0ea845d48cf2ad7233cb0684e6f6fb322f Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 10:54:28 -0400 Subject: [PATCH 4/7] Configure concurrency separate from total number of transactions. --- go/logic/coordinator_test.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 9db78f2a9..4e24ab013 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -28,6 +28,7 @@ type CoordinatorTestSuite struct { mysqlContainer testcontainers.Container db *gosql.DB concurrentTransactions int + transactionsPerWorker int } func (suite *CoordinatorTestSuite) SetupSuite() { @@ -54,10 +55,10 @@ func (suite *CoordinatorTestSuite) SetupSuite() { suite.Require().NoError(err) suite.db = db - suite.concurrentTransactions = 500 + suite.concurrentTransactions = 100 + suite.transactionsPerWorker = 100 - _, err = db.Exec(fmt.Sprintf("SET GLOBAL max_connections = %d", suite.concurrentTransactions*2)) - suite.Require().NoError(err) + db.SetMaxOpenConns(suite.concurrentTransactions) } func (suite *CoordinatorTestSuite) SetupTest() { @@ -68,6 +69,9 @@ func (suite *CoordinatorTestSuite) SetupTest() { _, err = suite.db.ExecContext(ctx, "SET @@GLOBAL.binlog_transaction_dependency_tracking = WRITESET") suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET @@GLOBAL.max_connections = %d", suite.concurrentTransactions*2)) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, "CREATE DATABASE test") suite.Require().NoError(err) } @@ -143,19 +147,26 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { g, ctx := errgroup.WithContext(ctx) for range suite.concurrentTransactions { g.Go(func() error { - tx, txErr := suite.db.Begin() - if txErr != nil { - return txErr - } + for range suite.transactionsPerWorker { + tx, txErr := suite.db.Begin() + if txErr != nil { + return txErr + } - for range 100 { - _, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int())) + for range rand.IntN(100) { + _, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int())) + if txErr != nil { + return txErr + } + } + + txErr = tx.Commit() if txErr != nil { return txErr } } - return tx.Commit() + return nil }) } From c9b574ac3442f8028c6f69527897a3a6850792c6 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 11:02:51 -0400 Subject: [PATCH 5/7] Run similar number of txs to previous test and ignore context. --- go/logic/coordinator_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 4e24ab013..adea326e6 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -56,7 +56,7 @@ func (suite *CoordinatorTestSuite) SetupSuite() { suite.db = db suite.concurrentTransactions = 100 - suite.transactionsPerWorker = 100 + suite.transactionsPerWorker = 1 db.SetMaxOpenConns(suite.concurrentTransactions) } @@ -144,7 +144,7 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { err = applier.CreateChangelogTable() suite.Require().NoError(err) - g, ctx := errgroup.WithContext(ctx) + g, _ := errgroup.WithContext(ctx) for range suite.concurrentTransactions { g.Go(func() error { for range suite.transactionsPerWorker { From 584ce4e82e9a737e60f82b3a8c74e002b8fb9a84 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 11:08:15 -0400 Subject: [PATCH 6/7] Have at least 1 child in a transaction. --- go/logic/coordinator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index adea326e6..0d7f0a24c 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -153,7 +153,7 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { return txErr } - for range rand.IntN(100) { + for range rand.IntN(100) + 1 { _, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int())) if txErr != nil { return txErr From d4cf67aa1f196948730b888f931f9f93a53229e7 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Thu, 10 Apr 2025 11:32:03 -0400 Subject: [PATCH 7/7] Notify waiting channels for the current sequence number. --- go/logic/coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index f0702b92b..8c9243fcd 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -522,7 +522,7 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize // Schedule any jobs that were waiting for this job to complete or for the low watermark for waitingForSequenceNumber, channels := range c.waitingJobs { - if waitingForSequenceNumber <= c.lowWaterMark { + if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber { channelsToNotify = append(channelsToNotify, channels...) delete(c.waitingJobs, waitingForSequenceNumber) }