Skip to content

Commit 875f213

Browse files
authored
Merge branch 'master' into cutover-gho-locked
2 parents de1ced3 + fb3d71d commit 875f213

File tree

7 files changed

+126
-86
lines changed

7 files changed

+126
-86
lines changed

go/logic/applier.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
685685
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
686686
// no further chunk to work through, i.e. we're past the last chunk and are done with
687687
// iterating the range (and this done with copying row chunks)
688-
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
688+
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
689689
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
690690
if this.migrationContext.MigrationIterationRangeMinValues == nil {
691691
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
@@ -706,36 +706,32 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
706706
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
707707
)
708708
if err != nil {
709-
return hasFurtherRange, expectedRowCount, err
709+
return hasFurtherRange, err
710710
}
711711

712712
rows, err := this.db.Query(query, explodedArgs...)
713713
if err != nil {
714-
return hasFurtherRange, expectedRowCount, err
714+
return hasFurtherRange, err
715715
}
716716
defer rows.Close()
717717

718-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
718+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
719719
for rows.Next() {
720720
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
721-
return hasFurtherRange, expectedRowCount, err
721+
return hasFurtherRange, err
722722
}
723-
724-
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
725-
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
726-
727-
hasFurtherRange = expectedRowCount > 0
723+
hasFurtherRange = true
728724
}
729725
if err = rows.Err(); err != nil {
730-
return hasFurtherRange, expectedRowCount, err
726+
return hasFurtherRange, err
731727
}
732728
if hasFurtherRange {
733729
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
734-
return hasFurtherRange, expectedRowCount, nil
730+
return hasFurtherRange, nil
735731
}
736732
}
737733
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
738-
return hasFurtherRange, expectedRowCount, nil
734+
return hasFurtherRange, nil
739735
}
740736

741737
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where

go/logic/applier_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,10 +562,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
562562
err = applier.ReadMigrationRangeValues()
563563
suite.Require().NoError(err)
564564

565-
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
565+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
566566
suite.Require().NoError(err)
567567
suite.Require().True(hasFurtherRange)
568-
suite.Require().Equal(int64(1), expectedRangeSize)
569568

570569
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
571570
suite.Require().NoError(err)
@@ -597,6 +596,67 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
597596
Equal(int64(0), migrationContext.RowsDeltaEstimate)
598597
}
599598

599+
func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFailsWithTruncationWarning() {
600+
ctx := context.Background()
601+
602+
var err error
603+
604+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id int not null, name varchar(20), primary key(id))")
605+
suite.Require().NoError(err)
606+
607+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, name varchar(20), primary key(id));")
608+
suite.Require().NoError(err)
609+
610+
_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, name) VALUES (1, 'this string is long')")
611+
suite.Require().NoError(err)
612+
613+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
614+
suite.Require().NoError(err)
615+
616+
migrationContext := base.NewMigrationContext()
617+
migrationContext.ApplierConnectionConfig = connectionConfig
618+
migrationContext.DatabaseName = "test"
619+
migrationContext.SkipPortValidation = true
620+
migrationContext.OriginalTableName = "testing"
621+
migrationContext.AlterStatementOptions = "modify column name varchar(10)"
622+
migrationContext.PanicOnWarnings = true
623+
migrationContext.SetConnectionConfig("innodb")
624+
625+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
626+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name"})
627+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name"})
628+
migrationContext.UniqueKey = &sql.UniqueKey{
629+
Name: "PRIMARY",
630+
NameInGhostTable: "PRIMARY",
631+
Columns: *sql.NewColumnList([]string{"id"}),
632+
}
633+
applier := NewApplier(migrationContext)
634+
635+
err = applier.InitDBConnections()
636+
suite.Require().NoError(err)
637+
638+
err = applier.CreateChangelogTable()
639+
suite.Require().NoError(err)
640+
641+
err = applier.ReadMigrationRangeValues()
642+
suite.Require().NoError(err)
643+
644+
err = applier.AlterGhost()
645+
suite.Require().NoError(err)
646+
647+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
648+
suite.Require().NoError(err)
649+
suite.Require().True(hasFurtherRange)
650+
651+
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
652+
suite.Equal(int64(1), rowsAffected)
653+
suite.Require().NoError(err)
654+
655+
// Verify the warning was recorded and will cause the migrator to panic
656+
suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings)
657+
suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1")
658+
}
659+
600660
func TestApplier(t *testing.T) {
601661
suite.Run(t, new(ApplierTestSuite))
602662
}

go/logic/migrator.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,9 +1247,8 @@ func (this *Migrator) iterateChunks() error {
12471247
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
12481248

12491249
hasFurtherRange := false
1250-
expectedRangeSize := int64(0)
12511250
if err := this.retryOperation(func() (e error) {
1252-
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
1251+
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
12531252
return e
12541253
}); err != nil {
12551254
return terminateRowIteration(err)
@@ -1281,10 +1280,8 @@ func (this *Migrator) iterateChunks() error {
12811280
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
12821281
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
12831282
}
1284-
if expectedRangeSize != rowsAffected {
1285-
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1286-
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
1287-
}
1283+
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1284+
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
12881285
}
12891286
}
12901287

go/sql/builder.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -283,46 +283,25 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
283283
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
284284
}
285285
}
286-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
287286
result = fmt.Sprintf(`
288287
select /* gh-ost %s.%s %s */
289-
%s,
290-
(select count(*) from (
291-
select
292-
%s
293-
from
294-
%s.%s
295-
where
296-
%s and %s
297-
limit
298-
%d
299-
) select_osc_chunk)
300-
from (
301-
select
302-
%s
303-
from
304-
%s.%s
305-
where
306-
%s and %s
307-
limit
308-
%d
309-
) select_osc_chunk
288+
%s
289+
from
290+
%s.%s
291+
where
292+
%s and %s
310293
order by
311294
%s
312295
limit 1
313296
offset %d`,
314297
databaseName, tableName, hint,
315-
joinedColumnNames, joinedColumnNames,
316-
databaseName, tableName,
317-
rangeStartComparison, rangeEndComparison, chunkSize,
318-
joinedColumnNames,
298+
strings.Join(uniqueKeyColumnNames, ", "),
319299
databaseName, tableName,
320-
rangeStartComparison, rangeEndComparison, chunkSize,
300+
rangeStartComparison, rangeEndComparison,
321301
strings.Join(uniqueKeyColumnAscending, ", "),
322302
(chunkSize - 1),
323303
)
324-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
325-
return result, append(explodedArgs, explodedArgs...), nil
304+
return result, explodedArgs, nil
326305
}
327306

328307
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
@@ -360,22 +339,8 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
360339
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
361340
}
362341
}
363-
364-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
365342
result = fmt.Sprintf(`
366-
select /* gh-ost %s.%s %s */
367-
%s,
368-
(select count(*) from (
369-
select
370-
%s
371-
from
372-
%s.%s
373-
where
374-
%s and %s
375-
order by
376-
%s
377-
limit %d
378-
) select_osc_chunk)
343+
select /* gh-ost %s.%s %s */ %s
379344
from (
380345
select
381346
%s
@@ -390,17 +355,13 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
390355
order by
391356
%s
392357
limit 1`,
393-
databaseName, tableName, hint, joinedColumnNames,
394-
joinedColumnNames, databaseName, tableName,
395-
rangeStartComparison, rangeEndComparison,
396-
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
397-
joinedColumnNames, databaseName, tableName,
358+
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
359+
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
398360
rangeStartComparison, rangeEndComparison,
399361
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
400362
strings.Join(uniqueKeyColumnDescending, ", "),
401363
)
402-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
403-
return result, append(explodedArgs, explodedArgs...), nil
364+
return result, explodedArgs, nil
404365
}
405366

406367
func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {

go/sql/builder_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,34 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
325325
}
326326
}
327327

328-
func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
328+
func TestBuildUniqueKeyRangeEndPreparedQueryViaOffset(t *testing.T) {
329+
databaseName := "mydb"
330+
originalTableName := "tbl"
331+
var chunkSize int64 = 500
332+
{
333+
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
334+
rangeStartArgs := []interface{}{3, 17}
335+
rangeEndArgs := []interface{}{103, 117}
336+
337+
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
338+
require.NoError(t, err)
339+
expected := `
340+
select /* gh-ost mydb.tbl test */
341+
name, position
342+
from
343+
mydb.tbl
344+
where
345+
((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
346+
order by
347+
name asc, position asc
348+
limit 1
349+
offset 499`
350+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
351+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
352+
}
353+
}
354+
355+
func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
329356
databaseName := "mydb"
330357
originalTableName := "tbl"
331358
var chunkSize int64 = 500
@@ -338,17 +365,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
338365
require.NoError(t, err)
339366
expected := `
340367
select /* gh-ost mydb.tbl test */
341-
name, position,
342-
(select count(*) from (
343-
select
344-
name, position
345-
from
346-
mydb.tbl
347-
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
348-
order by
349-
name asc, position asc
350-
limit 500
351-
) select_osc_chunk)
368+
name, position
352369
from (
353370
select
354371
name, position
@@ -363,7 +380,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
363380
name desc, position desc
364381
limit 1`
365382
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
366-
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
383+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
367384
}
368385
}
369386

go/sql/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (this *ColumnValues) AbstractValues() []interface{} {
325325
func (this *ColumnValues) StringColumn(index int) string {
326326
val := this.AbstractValues()[index]
327327
if ints, ok := val.([]uint8); ok {
328-
return string(ints)
328+
return fmt.Sprintf("%x", ints)
329329
}
330330
return fmt.Sprintf("%+v", val)
331331
}

go/sql/types_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,12 @@ func TestGetColumn(t *testing.T) {
4040
require.Nil(t, column)
4141
}
4242
}
43+
44+
func TestBinaryToString(t *testing.T) {
45+
id := []uint8{0x1b, 0x99}
46+
col := make([]interface{}, 1)
47+
col[0] = id
48+
cv := ToColumnValues(col)
49+
50+
require.Equal(t, "1b99", cv.StringColumn(0))
51+
}

0 commit comments

Comments
 (0)