Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/logic/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,9 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
}
channelsToNotify = make([]chan struct{}, 0)

// Schedule any jobs that were waiting for this job to complete
// Schedule any jobs that were waiting for this job to complete or for the low watermark
for waitingForSequenceNumber, channels := range c.waitingJobs {
if waitingForSequenceNumber <= c.lowWaterMark {
if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber {
channelsToNotify = append(channelsToNotify, channels...)
delete(c.waitingJobs, waitingForSequenceNumber)
}
Expand Down
73 changes: 58 additions & 15 deletions go/logic/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand/v2"
"os"
"testing"
"time"
Expand All @@ -18,13 +19,16 @@ import (
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"golang.org/x/sync/errgroup"
)

type CoordinatorTestSuite struct {
suite.Suite

mysqlContainer testcontainers.Container
db *gosql.DB
mysqlContainer testcontainers.Container
db *gosql.DB
concurrentTransactions int
transactionsPerWorker int
}

func (suite *CoordinatorTestSuite) SetupSuite() {
Expand All @@ -51,6 +55,10 @@ func (suite *CoordinatorTestSuite) SetupSuite() {
suite.Require().NoError(err)

suite.db = db
suite.concurrentTransactions = 100
suite.transactionsPerWorker = 1

db.SetMaxOpenConns(suite.concurrentTransactions)
}

func (suite *CoordinatorTestSuite) SetupTest() {
Expand All @@ -61,6 +69,9 @@ func (suite *CoordinatorTestSuite) SetupTest() {
_, err = suite.db.ExecContext(ctx, "SET @@GLOBAL.binlog_transaction_dependency_tracking = WRITESET")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET @@GLOBAL.max_connections = %d", suite.concurrentTransactions*2))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "CREATE DATABASE test")
suite.Require().NoError(err)
}
Expand Down Expand Up @@ -133,20 +144,35 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
err = applier.CreateChangelogTable()
suite.Require().NoError(err)

// TODO: use errgroup
for i := 0; i < 100; i++ {
tx, err := suite.db.Begin()
suite.Require().NoError(err)

for j := 0; j < 100; j++ {
_, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
suite.Require().NoError(err)
}
g, _ := errgroup.WithContext(ctx)
for range suite.concurrentTransactions {
g.Go(func() error {
for range suite.transactionsPerWorker {
tx, txErr := suite.db.Begin()
if txErr != nil {
return txErr
}

for range rand.IntN(100) + 1 {
_, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int()))
if txErr != nil {
return txErr
}
}

txErr = tx.Commit()
if txErr != nil {
return txErr
}
}

err = tx.Commit()
suite.Require().NoError(err)
return nil
})
}

err = g.Wait()
suite.Require().NoError(err)

_, err = suite.db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
suite.Require().NoError(err)

Expand Down Expand Up @@ -179,8 +205,8 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
return streamCtx.Err() != nil
}
go func() {
err = coord.StartStreaming(streamCtx, canStopStreaming)
suite.Require().Equal(context.Canceled, err)
streamErr := coord.StartStreaming(streamCtx, canStopStreaming)
suite.Require().Equal(context.Canceled, streamErr)
}()

// Give streamer some time to start
Expand All @@ -199,6 +225,23 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
}

fmt.Printf("Time taken: %s\n", time.Since(startAt))

result, err := suite.db.Exec(`SELECT * FROM (
SELECT t1.id,
CRC32(CONCAT_WS(';',t1.id,t1.name))
AS checksum1,
CRC32(CONCAT_WS(';',t2.id,t2.name))
AS checksum2
FROM test.gh_ost_test t1
LEFT JOIN test._gh_ost_test_gho t2
ON t1.id = t2.id
) AS checksums
WHERE checksums.checksum1 != checksums.checksum2`)
suite.Require().NoError(err)

count, err := result.RowsAffected()
suite.Require().NoError(err)
suite.Require().Zero(count)
}

func TestCoordinator(t *testing.T) {
Expand Down
27 changes: 27 additions & 0 deletions vendor/golang.org/x/sync/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/sync/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/errgroup/errgroup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/golang.org/x/sync/errgroup/go120.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/golang.org/x/sync/errgroup/pre_go120.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf
golang.org/x/net/context
# golang.org/x/sync v0.11.0
## explicit; go 1.18
golang.org/x/sync/errgroup
# golang.org/x/sys v0.30.0
## explicit; go 1.18
golang.org/x/sys/cpu
Expand Down
Loading