Skip to content

Commit adc7049

Browse files
committed
Fix builder's chunk order and drop panic-on-warnings row count check
- Removed count subqueries from range builders to restore the more performant approach from PR #471 - Modified panic-on-warnings logic to trigger errors based solely on SQL warnings, not on row count mismatches - This addresses potential race conditions where row count comparisons could produce false positives due to concurrent table modifications
1 parent 660cbf9 commit adc7049

File tree

5 files changed

+189
-79
lines changed

5 files changed

+189
-79
lines changed

go/logic/applier.go

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -663,52 +663,53 @@ func (this *Applier) ReadMigrationRangeValues() error {
663663
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
664664
// no further chunk to work through, i.e. we're past the last chunk and are done with
665665
// iterating the range (and this done with copying row chunks)
666-
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
666+
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
667667
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
668668
if this.migrationContext.MigrationIterationRangeMinValues == nil {
669669
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
670670
}
671-
672-
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable(
673-
this.migrationContext.DatabaseName,
674-
this.migrationContext.OriginalTableName,
675-
&this.migrationContext.UniqueKey.Columns,
676-
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
677-
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
678-
atomic.LoadInt64(&this.migrationContext.ChunkSize),
679-
this.migrationContext.GetIteration() == 0,
680-
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
681-
)
682-
if err != nil {
683-
return hasFurtherRange, expectedRowCount, err
684-
}
685-
686-
rows, err := this.db.Query(query, explodedArgs...)
687-
if err != nil {
688-
return hasFurtherRange, expectedRowCount, err
689-
}
690-
defer rows.Close()
691-
692-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
693-
for rows.Next() {
694-
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
695-
return hasFurtherRange, expectedRowCount, err
671+
for i := 0; i < 2; i++ {
672+
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
673+
if i == 1 {
674+
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
675+
}
676+
query, explodedArgs, err := buildFunc(
677+
this.migrationContext.DatabaseName,
678+
this.migrationContext.OriginalTableName,
679+
&this.migrationContext.UniqueKey.Columns,
680+
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
681+
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
682+
atomic.LoadInt64(&this.migrationContext.ChunkSize),
683+
this.migrationContext.GetIteration() == 0,
684+
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
685+
)
686+
if err != nil {
687+
return hasFurtherRange, err
696688
}
697689

698-
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
699-
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
690+
rows, err := this.db.Query(query, explodedArgs...)
691+
if err != nil {
692+
return hasFurtherRange, err
693+
}
694+
defer rows.Close()
700695

701-
hasFurtherRange = expectedRowCount > 0
702-
}
703-
if err = rows.Err(); err != nil {
704-
return hasFurtherRange, expectedRowCount, err
705-
}
706-
if hasFurtherRange {
707-
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
708-
return hasFurtherRange, expectedRowCount, nil
696+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
697+
for rows.Next() {
698+
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
699+
return hasFurtherRange, err
700+
}
701+
hasFurtherRange = true
702+
}
703+
if err = rows.Err(); err != nil {
704+
return hasFurtherRange, err
705+
}
706+
if hasFurtherRange {
707+
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
708+
return hasFurtherRange, nil
709+
}
709710
}
710711
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
711-
return hasFurtherRange, expectedRowCount, nil
712+
return hasFurtherRange, nil
712713
}
713714

714715
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where

go/logic/applier_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,10 +562,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
562562
err = applier.ReadMigrationRangeValues()
563563
suite.Require().NoError(err)
564564

565-
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
565+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
566566
suite.Require().NoError(err)
567567
suite.Require().True(hasFurtherRange)
568-
suite.Require().Equal(int64(1), expectedRangeSize)
569568

570569
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
571570
suite.Require().NoError(err)
@@ -597,6 +596,67 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
597596
Equal(int64(0), migrationContext.RowsDeltaEstimate)
598597
}
599598

599+
func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFailsWithTruncationWarning() {
600+
ctx := context.Background()
601+
602+
var err error
603+
604+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id int not null, name varchar(20), primary key(id))")
605+
suite.Require().NoError(err)
606+
607+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, name varchar(20), primary key(id));")
608+
suite.Require().NoError(err)
609+
610+
_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, name) VALUES (1, 'this string is long')")
611+
suite.Require().NoError(err)
612+
613+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
614+
suite.Require().NoError(err)
615+
616+
migrationContext := base.NewMigrationContext()
617+
migrationContext.ApplierConnectionConfig = connectionConfig
618+
migrationContext.DatabaseName = "test"
619+
migrationContext.SkipPortValidation = true
620+
migrationContext.OriginalTableName = "testing"
621+
migrationContext.AlterStatementOptions = "modify column name varchar(10)"
622+
migrationContext.PanicOnWarnings = true
623+
migrationContext.SetConnectionConfig("innodb")
624+
625+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
626+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name"})
627+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name"})
628+
migrationContext.UniqueKey = &sql.UniqueKey{
629+
Name: "PRIMARY",
630+
NameInGhostTable: "PRIMARY",
631+
Columns: *sql.NewColumnList([]string{"id"}),
632+
}
633+
applier := NewApplier(migrationContext)
634+
635+
err = applier.InitDBConnections()
636+
suite.Require().NoError(err)
637+
638+
err = applier.CreateChangelogTable()
639+
suite.Require().NoError(err)
640+
641+
err = applier.ReadMigrationRangeValues()
642+
suite.Require().NoError(err)
643+
644+
err = applier.AlterGhost()
645+
suite.Require().NoError(err)
646+
647+
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
648+
suite.Require().NoError(err)
649+
suite.Require().True(hasFurtherRange)
650+
651+
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
652+
suite.Equal(int64(1), rowsAffected)
653+
suite.Require().NoError(err)
654+
655+
// Verify the warning was recorded and will cause the migrator to panic
656+
suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings)
657+
suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1")
658+
}
659+
600660
func TestApplier(t *testing.T) {
601661
suite.Run(t, new(ApplierTestSuite))
602662
}

go/logic/migrator.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,9 +1241,8 @@ func (this *Migrator) iterateChunks() error {
12411241
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
12421242

12431243
hasFurtherRange := false
1244-
expectedRangeSize := int64(0)
12451244
if err := this.retryOperation(func() (e error) {
1246-
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
1245+
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
12471246
return e
12481247
}); err != nil {
12491248
return terminateRowIteration(err)
@@ -1275,10 +1274,8 @@ func (this *Migrator) iterateChunks() error {
12751274
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
12761275
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
12771276
}
1278-
if expectedRangeSize != rowsAffected {
1279-
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1280-
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
1281-
}
1277+
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1278+
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
12821279
}
12831280
}
12841281

go/sql/builder.go

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,59 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
251251
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
252252
}
253253

254+
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
255+
if uniqueKeyColumns.Len() == 0 {
256+
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
257+
}
258+
databaseName = EscapeName(databaseName)
259+
tableName = EscapeName(tableName)
260+
261+
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
262+
if includeRangeStartValues {
263+
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
264+
}
265+
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
266+
if err != nil {
267+
return "", explodedArgs, err
268+
}
269+
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
270+
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
271+
if err != nil {
272+
return "", explodedArgs, err
273+
}
274+
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
275+
276+
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
277+
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
278+
for i, column := range uniqueKeyColumns.Columns() {
279+
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
280+
if column.Type == EnumColumnType {
281+
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
282+
} else {
283+
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
284+
}
285+
}
286+
result = fmt.Sprintf(`
287+
select /* gh-ost %s.%s %s */
288+
%s
289+
from
290+
%s.%s
291+
where
292+
%s and %s
293+
order by
294+
%s
295+
limit 1
296+
offset %d`,
297+
databaseName, tableName, hint,
298+
strings.Join(uniqueKeyColumnNames, ", "),
299+
databaseName, tableName,
300+
rangeStartComparison, rangeEndComparison,
301+
strings.Join(uniqueKeyColumnAscending, ", "),
302+
(chunkSize - 1),
303+
)
304+
return result, explodedArgs, nil
305+
}
306+
254307
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
255308
if uniqueKeyColumns.Len() == 0 {
256309
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
@@ -286,22 +339,8 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
286339
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
287340
}
288341
}
289-
290-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
291342
result = fmt.Sprintf(`
292-
select /* gh-ost %s.%s %s */
293-
%s,
294-
(select count(*) from (
295-
select
296-
%s
297-
from
298-
%s.%s
299-
where
300-
%s and %s
301-
order by
302-
%s
303-
limit %d
304-
) select_osc_chunk)
343+
select /* gh-ost %s.%s %s */ %s
305344
from (
306345
select
307346
%s
@@ -316,17 +355,13 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
316355
order by
317356
%s
318357
limit 1`,
319-
databaseName, tableName, hint, joinedColumnNames,
320-
joinedColumnNames, databaseName, tableName,
321-
rangeStartComparison, rangeEndComparison,
322-
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
323-
joinedColumnNames, databaseName, tableName,
358+
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
359+
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
324360
rangeStartComparison, rangeEndComparison,
325361
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
326362
strings.Join(uniqueKeyColumnDescending, ", "),
327363
)
328-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
329-
return result, append(explodedArgs, explodedArgs...), nil
364+
return result, explodedArgs, nil
330365
}
331366

332367
func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {

go/sql/builder_test.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,33 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
325325
}
326326
}
327327

328+
func TestBuildUniqueKeyRangeEndPreparedQueryViaOffset(t *testing.T) {
329+
databaseName := "mydb"
330+
originalTableName := "tbl"
331+
var chunkSize int64 = 500
332+
{
333+
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
334+
rangeStartArgs := []interface{}{3, 17}
335+
rangeEndArgs := []interface{}{103, 117}
336+
337+
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
338+
require.NoError(t, err)
339+
expected := `
340+
select /* gh-ost mydb.tbl test */
341+
name, position
342+
from
343+
mydb.tbl
344+
where
345+
((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
346+
order by
347+
name asc, position asc
348+
limit 1
349+
offset 499`
350+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
351+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
352+
}
353+
}
354+
328355
func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
329356
databaseName := "mydb"
330357
originalTableName := "tbl"
@@ -338,17 +365,7 @@ func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
338365
require.NoError(t, err)
339366
expected := `
340367
select /* gh-ost mydb.tbl test */
341-
name, position,
342-
(select count(*) from (
343-
select
344-
name, position
345-
from
346-
mydb.tbl
347-
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
348-
order by
349-
name asc, position asc
350-
limit 500
351-
) select_osc_chunk)
368+
name, position
352369
from (
353370
select
354371
name, position
@@ -363,7 +380,7 @@ func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
363380
name desc, position desc
364381
limit 1`
365382
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
366-
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
383+
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
367384
}
368385
}
369386

0 commit comments

Comments
 (0)