@@ -393,7 +393,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() {
393
393
migrationContext .ChunkSize = chunkSize
394
394
395
395
// fill with some rows
396
- numRows := int64 (2222 )
396
+ numRows := int64 (3421 )
397
397
for i := range numRows {
398
398
_ , err = suite .db .ExecContext (ctx ,
399
399
fmt .Sprintf ("INSERT INTO %s (id, name, age) VALUES (%d, 'user-%d', %d)" , getTestTableName (), i , i , i % 99 ))
@@ -413,24 +413,93 @@ func (suite *MigratorTestSuite) TestCopierIntPK() {
413
413
atomic .StoreInt64 (& migrator .rowCopyCompleteFlag , 1 )
414
414
}()
415
415
416
- copiedRows := int64 (0 )
417
416
for {
418
417
if atomic .LoadInt64 (& migrator .rowCopyCompleteFlag ) == 1 {
418
+ suite .Assert ().Equal ((numRows / chunkSize )+ 1 , migrator .migrationContext .GetIteration ())
419
419
return
420
420
}
421
421
select {
422
422
case copyRowsFunc := <- migrator .copyRowsQueue :
423
423
{
424
424
suite .Require ().NoError (copyRowsFunc ())
425
- copiedRows += migrationContext .ChunkSize
426
- copiedRows = min (copiedRows , numRows )
427
425
428
426
// check ghost table has expected number of rows
429
427
var ghostRows int64
430
428
suite .db .QueryRowContext (ctx ,
431
429
fmt .Sprintf (`SELECT COUNT(*) FROM %s` , getTestGhostTableName ()),
432
430
).Scan (& ghostRows )
433
- suite .Assert ().Equal (copiedRows , ghostRows )
431
+ suite .Assert ().Equal (migrator .migrationContext .TotalRowsCopied , ghostRows )
432
+ }
433
+ default :
434
+ time .Sleep (time .Second )
435
+ }
436
+ }
437
+ }
438
+
439
+ func (suite * MigratorTestSuite ) TestCopierCompositePK () {
440
+ ctx := context .Background ()
441
+
442
+ _ , err := suite .db .ExecContext (ctx , fmt .Sprintf ("CREATE TABLE %s (id INT UNSIGNED, t CHAR(32), PRIMARY KEY (t, id));" , getTestTableName ()))
443
+ suite .Require ().NoError (err )
444
+
445
+ connectionConfig , err := getTestConnectionConfig (ctx , suite .mysqlContainer )
446
+ suite .Require ().NoError (err )
447
+
448
+ migrationContext := newTestMigrationContext ()
449
+ migrationContext .ApplierConnectionConfig = connectionConfig
450
+ migrationContext .InspectorConnectionConfig = connectionConfig
451
+ migrationContext .SetConnectionConfig ("innodb" )
452
+
453
+ migrationContext .AlterStatementOptions = "ENGINE=InnoDB"
454
+ migrationContext .OriginalTableColumns = sql .NewColumnList ([]string {"id" , "t" })
455
+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "t" })
456
+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "t" })
457
+ migrationContext .UniqueKey = & sql.UniqueKey {
458
+ Name : "PRIMARY" ,
459
+ NameInGhostTable : "PRIMARY" ,
460
+ Columns : * sql .NewColumnList ([]string {"t" , "id" }),
461
+ }
462
+
463
+ chunkSize := int64 (100 )
464
+ migrationContext .ChunkSize = chunkSize
465
+
466
+ // fill with some rows
467
+ numRows := int64 (2049 )
468
+ for i := range numRows {
469
+ query := fmt .Sprintf (`INSERT INTO %s (id, t) VALUES (FLOOR(100000000 * RAND(%d)), MD5(RAND(%d)))` , getTestTableName (), i , i )
470
+ _ , err = suite .db .ExecContext (ctx , query )
471
+ suite .Require ().NoError (err )
472
+ }
473
+
474
+ migrator := NewMigrator (migrationContext , "0.0.0" )
475
+ suite .Require ().NoError (migrator .initiateApplier ())
476
+ suite .Require ().NoError (migrator .applier .prepareQueries ())
477
+ suite .Require ().NoError (migrator .applier .ReadMigrationRangeValues ())
478
+
479
+ go migrator .iterateChunks ()
480
+ go func () {
481
+ if err := <- migrator .rowCopyComplete ; err != nil {
482
+ migrator .migrationContext .PanicAbort <- err
483
+ }
484
+ atomic .StoreInt64 (& migrator .rowCopyCompleteFlag , 1 )
485
+ }()
486
+
487
+ for {
488
+ if atomic .LoadInt64 (& migrator .rowCopyCompleteFlag ) == 1 {
489
+ suite .Assert ().Equal ((numRows / chunkSize )+ 1 , migrator .migrationContext .GetIteration ())
490
+ return
491
+ }
492
+ select {
493
+ case copyRowsFunc := <- migrator .copyRowsQueue :
494
+ {
495
+ suite .Require ().NoError (copyRowsFunc ())
496
+
497
+ // check ghost table has expected number of rows
498
+ var ghostRows int64
499
+ suite .db .QueryRowContext (ctx ,
500
+ fmt .Sprintf (`SELECT COUNT(*) FROM %s` , getTestGhostTableName ()),
501
+ ).Scan (& ghostRows )
502
+ suite .Assert ().Equal (migrator .migrationContext .TotalRowsCopied , ghostRows )
434
503
}
435
504
default :
436
505
time .Sleep (time .Second )
0 commit comments