Skip to content

Commit 9b5e2bc

Browse files
committed
fix Coordinator test
1 parent 569d035 commit 9b5e2bc

File tree

4 files changed

+89
-66
lines changed

4 files changed

+89
-66
lines changed

go/binlog/gomysql_reader.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
7676

7777
// StreamEvents reads binlog events and sends them to the given channel.
7878
// It is blocking and should be executed in a goroutine.
79-
func (this *GoMySQLReader) StreamEvents(ctx context.Context, eventChannel chan<- *replication.BinlogEvent) error {
79+
func (this *GoMySQLReader) StreamEvents(ctx context.Context, canStopStreaming func() bool, eventChannel chan<- *replication.BinlogEvent) error {
8080
for {
81+
if canStopStreaming() {
82+
return nil
83+
}
8184
if err := ctx.Err(); err != nil {
8285
return err
8386
}

go/logic/coordinator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,13 @@ func (c *Coordinator) StartStreaming(ctx context.Context, canStopStreaming func(
276276

277277
var retries int64
278278
for {
279-
if err := c.binlogReader.StreamEvents(ctx, c.events); err != nil {
279+
if err := ctx.Err(); err != nil {
280+
return err
281+
}
282+
if canStopStreaming() {
283+
return nil
284+
}
285+
if err := c.binlogReader.StreamEvents(ctx, canStopStreaming, c.events); err != nil {
280286
if errors.Is(err, context.Canceled) {
281287
return err
282288
}

go/logic/coordinator_test.go

Lines changed: 78 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,49 +12,81 @@ import (
1212
"github.com/github/gh-ost/go/binlog"
1313
"github.com/github/gh-ost/go/mysql"
1414
"github.com/github/gh-ost/go/sql"
15-
"github.com/stretchr/testify/require"
15+
"github.com/stretchr/testify/suite"
1616
"github.com/testcontainers/testcontainers-go"
1717
"github.com/testcontainers/testcontainers-go/wait"
1818
)
1919

20-
func TestCoordinator(t *testing.T) {
20+
type CoordinatorTestSuite struct {
21+
suite.Suite
22+
23+
mysqlContainer testcontainers.Container
24+
db *gosql.DB
25+
}
26+
27+
func (suite *CoordinatorTestSuite) SetupSuite() {
2128
ctx := context.Background()
2229
req := testcontainers.ContainerRequest{
23-
Image: "mysql:8.0",
24-
Env: map[string]string{"MYSQL_ROOT_PASSWORD": "root"},
25-
WaitingFor: wait.ForLog("port: 3306 MySQL Community Server - GPL"),
26-
ExposedPorts: []string{"3306"},
30+
Image: "mysql:8.0.40",
31+
Env: map[string]string{"MYSQL_ROOT_PASSWORD": "root-password"},
32+
WaitingFor: wait.ForListeningPort("3306/tcp"),
33+
ExposedPorts: []string{"3306/tcp"},
2734
}
2835

2936
mysqlContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
3037
ContainerRequest: req,
3138
Started: true,
3239
})
33-
require.NoError(t, err)
34-
t.Cleanup(func() {
35-
ctx := context.Background()
36-
require.NoError(t, mysqlContainer.Terminate(ctx))
37-
})
40+
suite.Require().NoError(err)
3841

39-
host, err := mysqlContainer.Host(ctx)
40-
require.NoError(t, err)
42+
suite.mysqlContainer = mysqlContainer
4143

42-
mappedPort, err := mysqlContainer.MappedPort(ctx, "3306")
43-
require.NoError(t, err)
44+
dsn, err := GetDSN(ctx, mysqlContainer)
45+
suite.Require().NoError(err)
4446

45-
db, err := gosql.Open("mysql", "root:root@tcp("+host+":"+mappedPort.Port()+")/")
46-
require.NoError(t, err)
47+
db, err := gosql.Open("mysql", dsn)
48+
suite.Require().NoError(err)
4749

48-
t.Cleanup(func() {
49-
require.NoError(t, db.Close())
50-
})
50+
suite.db = db
51+
}
52+
53+
func (suite *CoordinatorTestSuite) SetupTest() {
54+
ctx := context.Background()
55+
_, err := suite.db.ExecContext(ctx, "RESET MASTER")
56+
suite.Require().NoError(err)
57+
58+
_, err = suite.db.ExecContext(ctx, "SET @@GLOBAL.binlog_transaction_dependency_tracking = WRITESET")
59+
suite.Require().NoError(err)
60+
61+
_, err = suite.db.ExecContext(ctx, "CREATE DATABASE test")
62+
suite.Require().NoError(err)
63+
}
64+
65+
func (suite *CoordinatorTestSuite) TearDownTest() {
66+
ctx := context.Background()
67+
_, err := suite.db.ExecContext(ctx, "DROP DATABASE test")
68+
suite.Require().NoError(err)
69+
}
70+
71+
func (suite *CoordinatorTestSuite) TeardownSuite() {
72+
ctx := context.Background()
73+
74+
suite.Assert().NoError(suite.db.Close())
75+
suite.Assert().NoError(suite.mysqlContainer.Terminate(ctx))
76+
}
77+
78+
func (suite *CoordinatorTestSuite) TestApplyDML() {
79+
ctx := context.Background()
80+
81+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
5182

5283
_ = os.Remove("/tmp/gh-ost.sock")
5384

54-
//prepareDatabase(t, db)
85+
_, err = suite.db.Exec("CREATE TABLE test.gh_ost_test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255)) ENGINE=InnoDB")
86+
suite.Require().NoError(err)
5587

56-
_, err = db.Exec("CREATE TABLE test._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))")
57-
require.NoError(t, err)
88+
_, err = suite.db.Exec("CREATE TABLE test._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))")
89+
suite.Require().NoError(err)
5890

5991
migrationContext := base.NewMigrationContext()
6092
migrationContext.DatabaseName = "test"
@@ -67,23 +99,8 @@ func TestCoordinator(t *testing.T) {
6799
migrationContext.ThrottleHTTPIntervalMillis = 100
68100
migrationContext.DMLBatchSize = 10
69101

70-
migrationContext.ApplierConnectionConfig = &mysql.ConnectionConfig{
71-
Key: mysql.InstanceKey{
72-
Hostname: host,
73-
Port: mappedPort.Int(),
74-
},
75-
User: "root",
76-
Password: "root",
77-
}
78-
79-
migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{
80-
Key: mysql.InstanceKey{
81-
Hostname: host,
82-
Port: mappedPort.Int(),
83-
},
84-
User: "root",
85-
Password: "root",
86-
}
102+
migrationContext.ApplierConnectionConfig = connectionConfig
103+
migrationContext.InspectorConnectionConfig = connectionConfig
87104

88105
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
89106
migrationContext.GhostTableColumns = sql.NewColumnList([]string{"id", "name"})
@@ -96,41 +113,41 @@ func TestCoordinator(t *testing.T) {
96113
}
97114

98115
migrationContext.SetConnectionConfig("innodb")
116+
migrationContext.SkipPortValidation = true
99117
migrationContext.NumWorkers = 4
100-
// HACK: so
101-
migrationContext.AzureMySQL = true
102118

103119
applier := NewApplier(migrationContext)
104120
err = applier.InitDBConnections(migrationContext.NumWorkers)
105-
require.NoError(t, err)
121+
suite.Require().NoError(err)
106122

107123
err = applier.prepareQueries()
108-
require.NoError(t, err)
124+
suite.Require().NoError(err)
109125

110126
err = applier.CreateChangelogTable()
111-
require.NoError(t, err)
127+
suite.Require().NoError(err)
112128

129+
// TODO: use errgroup
113130
for i := 0; i < 100; i++ {
114-
tx, err := db.Begin()
115-
require.NoError(t, err)
131+
tx, err := suite.db.Begin()
132+
suite.Require().NoError(err)
116133

117134
for j := 0; j < 100; j++ {
118135
_, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
119-
require.NoError(t, err)
136+
suite.Require().NoError(err)
120137
}
121138

122139
err = tx.Commit()
123-
require.NoError(t, err)
140+
suite.Require().NoError(err)
124141
}
125142

126-
_, err = db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
127-
require.NoError(t, err)
143+
_, err = suite.db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
144+
suite.Require().NoError(err)
128145

129-
_, err = db.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
130-
require.NoError(t, err)
146+
_, err = suite.db.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
147+
suite.Require().NoError(err)
131148

132149
_, err = applier.WriteChangelogState("completed")
133-
require.NoError(t, err)
150+
suite.Require().NoError(err)
134151

135152
ctx, cancel := context.WithCancel(context.Background())
136153

@@ -148,15 +165,15 @@ func TestCoordinator(t *testing.T) {
148165
LogFile: "binlog.000001",
149166
LogPos: int64(4),
150167
}
151-
coord.InitializeWorkers(8)
168+
coord.InitializeWorkers(4)
152169

153170
streamCtx, cancelStreaming := context.WithCancel(context.Background())
154171
canStopStreaming := func() bool {
155172
return streamCtx.Err() != nil
156173
}
157174
go func() {
158175
err = coord.StartStreaming(streamCtx, canStopStreaming)
159-
require.Equal(t, context.Canceled, err)
176+
suite.Require().Equal(context.Canceled, err)
160177
}()
161178

162179
// Give streamer some time to start
@@ -171,8 +188,12 @@ func TestCoordinator(t *testing.T) {
171188
}
172189

173190
err = coord.ProcessEventsUntilDrained()
174-
require.NoError(t, err)
191+
suite.Require().NoError(err)
175192
}
176193

177194
fmt.Printf("Time taken: %s\n", time.Since(startAt))
178195
}
196+
197+
func TestCoordinator(t *testing.T) {
198+
suite.Run(t, new(CoordinatorTestSuite))
199+
}

go/logic/migrator.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -410,13 +410,6 @@ waitForGhostTable:
410410
if err := this.countTableRows(); err != nil {
411411
return err
412412
}
413-
// if err := this.addDMLEventsListener(); err != nil {
414-
// return err
415-
// }
416-
417-
// if err := this.addTrxListener(); err != nil {
418-
// return err
419-
// }
420413

421414
if err := this.applier.ReadMigrationRangeValues(); err != nil {
422415
return err

0 commit comments

Comments
 (0)