Skip to content

Commit b23d20f

Browse files
authored
Notify waiting channels on completed transaction, not just the watermark (#1531)
* Notify waiting channels on completed transaction, not just the watermark. * Add checksum validation to coordinator test * Use errgroup to perform transactions concurrently in coordinator_test.go * Configure concurrency separate from total number of transactions. * Run similar number of txs to previous test and ignore context. * Have at least 1 child in a transaction. * Notify waiting channels for the current sequence number.
1 parent b1f4061 commit b23d20f

File tree

8 files changed

+273
-17
lines changed

8 files changed

+273
-17
lines changed

go/logic/coordinator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,9 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
520520
}
521521
channelsToNotify = make([]chan struct{}, 0)
522522

523-
// Schedule any jobs that were waiting for this job to complete
523+
// Schedule any jobs that were waiting for this job to complete or for the low watermark
524524
for waitingForSequenceNumber, channels := range c.waitingJobs {
525-
if waitingForSequenceNumber <= c.lowWaterMark {
525+
if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber {
526526
channelsToNotify = append(channelsToNotify, channels...)
527527
delete(c.waitingJobs, waitingForSequenceNumber)
528528
}

go/logic/coordinator_test.go

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
gosql "database/sql"
66
"fmt"
7+
"math/rand/v2"
78
"os"
89
"testing"
910
"time"
@@ -18,13 +19,16 @@ import (
1819
"github.com/stretchr/testify/suite"
1920
"github.com/testcontainers/testcontainers-go"
2021
"github.com/testcontainers/testcontainers-go/wait"
22+
"golang.org/x/sync/errgroup"
2123
)
2224

2325
type CoordinatorTestSuite struct {
2426
suite.Suite
2527

26-
mysqlContainer testcontainers.Container
27-
db *gosql.DB
28+
mysqlContainer testcontainers.Container
29+
db *gosql.DB
30+
concurrentTransactions int
31+
transactionsPerWorker int
2832
}
2933

3034
func (suite *CoordinatorTestSuite) SetupSuite() {
@@ -51,6 +55,10 @@ func (suite *CoordinatorTestSuite) SetupSuite() {
5155
suite.Require().NoError(err)
5256

5357
suite.db = db
58+
suite.concurrentTransactions = 100
59+
suite.transactionsPerWorker = 1
60+
61+
db.SetMaxOpenConns(suite.concurrentTransactions)
5462
}
5563

5664
func (suite *CoordinatorTestSuite) SetupTest() {
@@ -61,6 +69,9 @@ func (suite *CoordinatorTestSuite) SetupTest() {
6169
_, err = suite.db.ExecContext(ctx, "SET @@GLOBAL.binlog_transaction_dependency_tracking = WRITESET")
6270
suite.Require().NoError(err)
6371

72+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET @@GLOBAL.max_connections = %d", suite.concurrentTransactions*2))
73+
suite.Require().NoError(err)
74+
6475
_, err = suite.db.ExecContext(ctx, "CREATE DATABASE test")
6576
suite.Require().NoError(err)
6677
}
@@ -133,20 +144,35 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
133144
err = applier.CreateChangelogTable()
134145
suite.Require().NoError(err)
135146

136-
// TODO: use errgroup
137-
for i := 0; i < 100; i++ {
138-
tx, err := suite.db.Begin()
139-
suite.Require().NoError(err)
140-
141-
for j := 0; j < 100; j++ {
142-
_, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
143-
suite.Require().NoError(err)
144-
}
147+
g, _ := errgroup.WithContext(ctx)
148+
for range suite.concurrentTransactions {
149+
g.Go(func() error {
150+
for range suite.transactionsPerWorker {
151+
tx, txErr := suite.db.Begin()
152+
if txErr != nil {
153+
return txErr
154+
}
155+
156+
for range rand.IntN(100) + 1 {
157+
_, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int()))
158+
if txErr != nil {
159+
return txErr
160+
}
161+
}
162+
163+
txErr = tx.Commit()
164+
if txErr != nil {
165+
return txErr
166+
}
167+
}
145168

146-
err = tx.Commit()
147-
suite.Require().NoError(err)
169+
return nil
170+
})
148171
}
149172

173+
err = g.Wait()
174+
suite.Require().NoError(err)
175+
150176
_, err = suite.db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
151177
suite.Require().NoError(err)
152178

@@ -179,8 +205,8 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
179205
return streamCtx.Err() != nil
180206
}
181207
go func() {
182-
err = coord.StartStreaming(streamCtx, canStopStreaming)
183-
suite.Require().Equal(context.Canceled, err)
208+
streamErr := coord.StartStreaming(streamCtx, canStopStreaming)
209+
suite.Require().Equal(context.Canceled, streamErr)
184210
}()
185211

186212
// Give streamer some time to start
@@ -199,6 +225,23 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
199225
}
200226

201227
fmt.Printf("Time taken: %s\n", time.Since(startAt))
228+
229+
result, err := suite.db.Exec(`SELECT * FROM (
230+
SELECT t1.id,
231+
CRC32(CONCAT_WS(';',t1.id,t1.name))
232+
AS checksum1,
233+
CRC32(CONCAT_WS(';',t2.id,t2.name))
234+
AS checksum2
235+
FROM test.gh_ost_test t1
236+
LEFT JOIN test._gh_ost_test_gho t2
237+
ON t1.id = t2.id
238+
) AS checksums
239+
WHERE checksums.checksum1 != checksums.checksum2`)
240+
suite.Require().NoError(err)
241+
242+
count, err := result.RowsAffected()
243+
suite.Require().NoError(err)
244+
suite.Require().Zero(count)
202245
}
203246

204247
func TestCoordinator(t *testing.T) {

vendor/golang.org/x/sync/LICENSE

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/PATENTS

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/errgroup.go

Lines changed: 136 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/go120.go

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/pre_go120.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf
299299
golang.org/x/net/context
300300
# golang.org/x/sync v0.11.0
301301
## explicit; go 1.18
302+
golang.org/x/sync/errgroup
302303
# golang.org/x/sys v0.30.0
303304
## explicit; go 1.18
304305
golang.org/x/sys/cpu

0 commit comments

Comments
 (0)