Skip to content

Commit 7c18055

Browse files
Remove counts from CalculateNextIterationRangeEndValues that caused problems with --panic-on-warnings (#1557)
* Fix CalculateNextIterationRangeEndValues order by. The split between Temptable and Offset query builders was originally introduced in #471. Then, both query builders have been changed to calculate actual chunk sizes in #1500. The updated implementation of the Offset query introduced a bug, where a missing `order by` clause in `select_osc_chunk` can result in end values potentially surpassing the chunk_size. This wasn't detected during our initial testing where the query just returned rows in their original order, but may happen in real-world scenarios in case the db returns data in an undefined order. An obvious fix would be to just add an `order by` to the Offset builder subquery, however since both builders use temptables now, it makes more sense to simplify and use only one of them. Alternatively, the builder could only use the less performant count query variants when `--panic-on-warnings` is enabled and otherwise use the simpler ones from pull/471. We decided to not follow this path for now, hoping `--panic-on-warnings` becomes an updated and safer default in the future. Co-authored-by: Bastian Bartmann <[email protected]> * Fix builder's chunk order and drop panic-on-warnings row count check - Removed count subqueries from range builders to restore the more performant approach from PR #471 - Modified panic-on-warnings logic to trigger errors based solely on SQL warnings, not on row count mismatches - This addresses potential race conditions where row count comparisons could produce false positives due to concurrent table modifications --------- Co-authored-by: Bastian Bartmann <[email protected]>
1 parent 5c88f54 commit 7c18055

File tree

5 files changed

+116
-85
lines changed

5 files changed

+116
-85
lines changed

go/logic/applier.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
663663
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
664664
// no further chunk to work through, i.e. we're past the last chunk and are done with
665665
// iterating the range (and this done with copying row chunks)
666-
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
666+
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
667667
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
668668
if this.migrationContext.MigrationIterationRangeMinValues == nil {
669669
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
@@ -684,36 +684,32 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
684684
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
685685
)
686686
if err != nil {
687-
return hasFurtherRange, expectedRowCount, err
687+
return hasFurtherRange, err
688688
}
689689

690690
rows, err := this.db.Query(query, explodedArgs...)
691691
if err != nil {
692-
return hasFurtherRange, expectedRowCount, err
692+
return hasFurtherRange, err
693693
}
694694
defer rows.Close()
695695

696-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
696+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
697697
for rows.Next() {
698698
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
699-
return hasFurtherRange, expectedRowCount, err
699+
return hasFurtherRange, err
700700
}
701-
702-
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
703-
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
704-
705-
hasFurtherRange = expectedRowCount > 0
701+
hasFurtherRange = true
706702
}
707703
if err = rows.Err(); err != nil {
708-
return hasFurtherRange, expectedRowCount, err
704+
return hasFurtherRange, err
709705
}
710706
if hasFurtherRange {
711707
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
712-
return hasFurtherRange, expectedRowCount, nil
708+
return hasFurtherRange, nil
713709
}
714710
}
715711
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
716-
return hasFurtherRange, expectedRowCount, nil
712+
return hasFurtherRange, nil
717713
}
718714

719715
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where

go/logic/applier_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,10 +562,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
562562
err = applier.ReadMigrationRangeValues()
563563
suite.Require().NoError(err)
564564

565-
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
565+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
566566
suite.Require().NoError(err)
567567
suite.Require().True(hasFurtherRange)
568-
suite.Require().Equal(int64(1), expectedRangeSize)
569568

570569
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
571570
suite.Require().NoError(err)
@@ -597,6 +596,67 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
597596
Equal(int64(0), migrationContext.RowsDeltaEstimate)
598597
}
599598

599+
func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFailsWithTruncationWarning() {
600+
ctx := context.Background()
601+
602+
var err error
603+
604+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id int not null, name varchar(20), primary key(id))")
605+
suite.Require().NoError(err)
606+
607+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, name varchar(20), primary key(id));")
608+
suite.Require().NoError(err)
609+
610+
_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, name) VALUES (1, 'this string is long')")
611+
suite.Require().NoError(err)
612+
613+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
614+
suite.Require().NoError(err)
615+
616+
migrationContext := base.NewMigrationContext()
617+
migrationContext.ApplierConnectionConfig = connectionConfig
618+
migrationContext.DatabaseName = "test"
619+
migrationContext.SkipPortValidation = true
620+
migrationContext.OriginalTableName = "testing"
621+
migrationContext.AlterStatementOptions = "modify column name varchar(10)"
622+
migrationContext.PanicOnWarnings = true
623+
migrationContext.SetConnectionConfig("innodb")
624+
625+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
626+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name"})
627+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name"})
628+
migrationContext.UniqueKey = &sql.UniqueKey{
629+
Name: "PRIMARY",
630+
NameInGhostTable: "PRIMARY",
631+
Columns: *sql.NewColumnList([]string{"id"}),
632+
}
633+
applier := NewApplier(migrationContext)
634+
635+
err = applier.InitDBConnections()
636+
suite.Require().NoError(err)
637+
638+
err = applier.CreateChangelogTable()
639+
suite.Require().NoError(err)
640+
641+
err = applier.ReadMigrationRangeValues()
642+
suite.Require().NoError(err)
643+
644+
err = applier.AlterGhost()
645+
suite.Require().NoError(err)
646+
647+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
648+
suite.Require().NoError(err)
649+
suite.Require().True(hasFurtherRange)
650+
651+
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
652+
suite.Equal(int64(1), rowsAffected)
653+
suite.Require().NoError(err)
654+
655+
// Verify the warning was recorded and will cause the migrator to panic
656+
suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings)
657+
suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1")
658+
}
659+
600660
func TestApplier(t *testing.T) {
601661
suite.Run(t, new(ApplierTestSuite))
602662
}

go/logic/migrator.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,9 +1241,8 @@ func (this *Migrator) iterateChunks() error {
12411241
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
12421242

12431243
hasFurtherRange := false
1244-
expectedRangeSize := int64(0)
12451244
if err := this.retryOperation(func() (e error) {
1246-
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
1245+
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
12471246
return e
12481247
}); err != nil {
12491248
return terminateRowIteration(err)
@@ -1275,10 +1274,8 @@ func (this *Migrator) iterateChunks() error {
12751274
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
12761275
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
12771276
}
1278-
if expectedRangeSize != rowsAffected {
1279-
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1280-
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
1281-
}
1277+
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1278+
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
12821279
}
12831280
}
12841281

go/sql/builder.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -283,46 +283,25 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
283283
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
284284
}
285285
}
286-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
287286
result = fmt.Sprintf(`
288287
select /* gh-ost %s.%s %s */
289-
%s,
290-
(select count(*) from (
291-
select
292-
%s
293-
from
294-
%s.%s
295-
where
296-
%s and %s
297-
limit
298-
%d
299-
) select_osc_chunk)
300-
from (
301-
select
302-
%s
303-
from
304-
%s.%s
305-
where
306-
%s and %s
307-
limit
308-
%d
309-
) select_osc_chunk
288+
%s
289+
from
290+
%s.%s
291+
where
292+
%s and %s
310293
order by
311294
%s
312295
limit 1
313296
offset %d`,
314297
databaseName, tableName, hint,
315-
joinedColumnNames, joinedColumnNames,
316-
databaseName, tableName,
317-
rangeStartComparison, rangeEndComparison, chunkSize,
318-
joinedColumnNames,
298+
strings.Join(uniqueKeyColumnNames, ", "),
319299
databaseName, tableName,
320-
rangeStartComparison, rangeEndComparison, chunkSize,
300+
rangeStartComparison, rangeEndComparison,
321301
strings.Join(uniqueKeyColumnAscending, ", "),
322302
(chunkSize - 1),
323303
)
324-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
325-
return result, append(explodedArgs, explodedArgs...), nil
304+
return result, explodedArgs, nil
326305
}
327306

328307
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
@@ -360,22 +339,8 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
360339
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
361340
}
362341
}
363-
364-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
365342
result = fmt.Sprintf(`
366-
select /* gh-ost %s.%s %s */
367-
%s,
368-
(select count(*) from (
369-
select
370-
%s
371-
from
372-
%s.%s
373-
where
374-
%s and %s
375-
order by
376-
%s
377-
limit %d
378-
) select_osc_chunk)
343+
select /* gh-ost %s.%s %s */ %s
379344
from (
380345
select
381346
%s
@@ -390,17 +355,13 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
390355
order by
391356
%s
392357
limit 1`,
393-
databaseName, tableName, hint, joinedColumnNames,
394-
joinedColumnNames, databaseName, tableName,
395-
rangeStartComparison, rangeEndComparison,
396-
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
397-
joinedColumnNames, databaseName, tableName,
358+
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
359+
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
398360
rangeStartComparison, rangeEndComparison,
399361
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
400362
strings.Join(uniqueKeyColumnDescending, ", "),
401363
)
402-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
403-
return result, append(explodedArgs, explodedArgs...), nil
364+
return result, explodedArgs, nil
404365
}
405366

406367
func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {

go/sql/builder_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,34 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
325325
}
326326
}
327327

328-
func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
328+
func TestBuildUniqueKeyRangeEndPreparedQueryViaOffset(t *testing.T) {
329+
databaseName := "mydb"
330+
originalTableName := "tbl"
331+
var chunkSize int64 = 500
332+
{
333+
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
334+
rangeStartArgs := []interface{}{3, 17}
335+
rangeEndArgs := []interface{}{103, 117}
336+
337+
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
338+
require.NoError(t, err)
339+
expected := `
340+
select /* gh-ost mydb.tbl test */
341+
name, position
342+
from
343+
mydb.tbl
344+
where
345+
((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
346+
order by
347+
name asc, position asc
348+
limit 1
349+
offset 499`
350+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
351+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
352+
}
353+
}
354+
355+
func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
329356
databaseName := "mydb"
330357
originalTableName := "tbl"
331358
var chunkSize int64 = 500
@@ -338,17 +365,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
338365
require.NoError(t, err)
339366
expected := `
340367
select /* gh-ost mydb.tbl test */
341-
name, position,
342-
(select count(*) from (
343-
select
344-
name, position
345-
from
346-
mydb.tbl
347-
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
348-
order by
349-
name asc, position asc
350-
limit 500
351-
) select_osc_chunk)
368+
name, position
352369
from (
353370
select
354371
name, position
@@ -363,7 +380,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
363380
name desc, position desc
364381
limit 1`
365382
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
366-
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
383+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
367384
}
368385
}
369386

0 commit comments

Comments
 (0)