@@ -29,6 +29,7 @@ type CoordinatorTestSuite struct {
29
29
db * gosql.DB
30
30
concurrentTransactions int
31
31
transactionsPerWorker int
32
+ transactionSize int
32
33
}
33
34
34
35
func (suite * CoordinatorTestSuite ) SetupSuite () {
@@ -55,8 +56,9 @@ func (suite *CoordinatorTestSuite) SetupSuite() {
55
56
suite .Require ().NoError (err )
56
57
57
58
suite .db = db
58
- suite .concurrentTransactions = 100
59
- suite .transactionsPerWorker = 1
59
+ suite .concurrentTransactions = 8
60
+ suite .transactionsPerWorker = 1000
61
+ suite .transactionSize = 10
60
62
61
63
db .SetMaxOpenConns (suite .concurrentTransactions )
62
64
}
@@ -145,40 +147,50 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
145
147
suite .Require ().NoError (err )
146
148
147
149
g , _ := errgroup .WithContext (ctx )
148
- for range suite .concurrentTransactions {
150
+ for i := range suite .concurrentTransactions {
149
151
g .Go (func () error {
152
+ r := rand .New (rand .NewPCG (uint64 (0 ), uint64 (i )))
153
+ maxID := int64 (1 )
150
154
for range suite .transactionsPerWorker {
151
155
tx , txErr := suite .db .Begin ()
152
156
if txErr != nil {
153
157
return txErr
154
158
}
155
159
156
- for range rand .IntN (100 ) + 1 {
157
- _ , txErr = tx .Exec (fmt .Sprintf ("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')" , rand .Int ()))
158
- if txErr != nil {
159
- return txErr
160
+ // generate random write queries
161
+ for range r .IntN (suite .transactionSize ) + 1 {
162
+ switch r .IntN (5 ) {
163
+ case 0 :
164
+ _ , txErr = tx .Exec (fmt .Sprintf ("DELETE FROM test.gh_ost_test WHERE id=%d" , r .Int64N (maxID )))
165
+ if txErr != nil {
166
+ return txErr
167
+ }
168
+ case 1 , 2 :
169
+ _ , txErr = tx .Exec (fmt .Sprintf ("UPDATE test.gh_ost_test SET name='test-%d' WHERE id=%d" , r .Int (), r .Int64N (maxID )))
170
+ if txErr != nil {
171
+ return txErr
172
+ }
173
+ default :
174
+ res , txErr := tx .Exec (fmt .Sprintf ("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')" , r .Int ()))
175
+ if txErr != nil {
176
+ return txErr
177
+ }
178
+ lastID , err := res .LastInsertId ()
179
+ if err != nil {
180
+ return err
181
+ }
182
+ maxID = lastID + 1
160
183
}
161
184
}
162
-
163
185
txErr = tx .Commit ()
164
186
if txErr != nil {
165
187
return txErr
166
188
}
167
189
}
168
-
169
190
return nil
170
191
})
171
192
}
172
193
173
- err = g .Wait ()
174
- suite .Require ().NoError (err )
175
-
176
- _ , err = suite .db .Exec ("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1" )
177
- suite .Require ().NoError (err )
178
-
179
- _ , err = suite .db .Exec ("INSERT INTO test.gh_ost_test (name) VALUES ('test')" )
180
- suite .Require ().NoError (err )
181
-
182
194
_ , err = applier .WriteChangelogState ("completed" )
183
195
suite .Require ().NoError (err )
184
196
@@ -224,14 +236,16 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
224
236
suite .Require ().NoError (err )
225
237
}
226
238
239
+ //err = g.Wait()
240
+ //suite.Require().NoError(err)
241
+ g .Wait () // there will be deadlock errors
242
+
227
243
fmt .Printf ("Time taken: %s\n " , time .Since (startAt ))
228
244
229
245
result , err := suite .db .Exec (`SELECT * FROM (
230
246
SELECT t1.id,
231
- CRC32(CONCAT_WS(';',t1.id,t1.name))
232
- AS checksum1,
233
- CRC32(CONCAT_WS(';',t2.id,t2.name))
234
- AS checksum2
247
+ CRC32(CONCAT_WS(';',t1.id,t1.name)) AS checksum1,
248
+ CRC32(CONCAT_WS(';',t2.id,t2.name)) AS checksum2
235
249
FROM test.gh_ost_test t1
236
250
LEFT JOIN test._gh_ost_test_gho t2
237
251
ON t1.id = t2.id
0 commit comments