Skip to content

Commit 660cbf9

Browse files
Fix CalculateNextIterationRangeEndValues order by.
The split between Temptable and Offset query builders was originally introduced in #471. Then, both query builders have been changed to calculate actual chunk sizes in #1500. The updated implementation of the Offset query introduced a bug, where a missing `order by` clause in `select_osc_chunk` can result in end values potentially surpassing the chunk_size. This wasn't detected during our initial testing where the query just returned rows in their original order, but may happen in real-world scenarios in case the db returns data in an undefined order. An obvious fix would be to just add an `order by` to the Offset builder subquery, however since both builders use temptables now, it makes more sense to simplify and use only one of them. Alternatively, the builder could only use the less performant count query variants when `--panic-on-warnings` is enabled and otherwise use the simpler ones from pull/471. We decided to not follow this path for now, hoping `--panic-on-warnings` becomes an updated and safer default in the future. Co-authored-by: Bastian Bartmann <[email protected]>
1 parent 5c88f54 commit 660cbf9

File tree

3 files changed

+34
-113
lines changed

3 files changed

+34
-113
lines changed

go/logic/applier.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -668,49 +668,44 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
668668
if this.migrationContext.MigrationIterationRangeMinValues == nil {
669669
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
670670
}
671-
for i := 0; i < 2; i++ {
672-
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
673-
if i == 1 {
674-
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
675-
}
676-
query, explodedArgs, err := buildFunc(
677-
this.migrationContext.DatabaseName,
678-
this.migrationContext.OriginalTableName,
679-
&this.migrationContext.UniqueKey.Columns,
680-
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
681-
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
682-
atomic.LoadInt64(&this.migrationContext.ChunkSize),
683-
this.migrationContext.GetIteration() == 0,
684-
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
685-
)
686-
if err != nil {
687-
return hasFurtherRange, expectedRowCount, err
688-
}
689-
690-
rows, err := this.db.Query(query, explodedArgs...)
691-
if err != nil {
692-
return hasFurtherRange, expectedRowCount, err
693-
}
694-
defer rows.Close()
695671

696-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
697-
for rows.Next() {
698-
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
699-
return hasFurtherRange, expectedRowCount, err
700-
}
672+
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable(
673+
this.migrationContext.DatabaseName,
674+
this.migrationContext.OriginalTableName,
675+
&this.migrationContext.UniqueKey.Columns,
676+
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
677+
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
678+
atomic.LoadInt64(&this.migrationContext.ChunkSize),
679+
this.migrationContext.GetIteration() == 0,
680+
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
681+
)
682+
if err != nil {
683+
return hasFurtherRange, expectedRowCount, err
684+
}
701685

702-
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
703-
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
686+
rows, err := this.db.Query(query, explodedArgs...)
687+
if err != nil {
688+
return hasFurtherRange, expectedRowCount, err
689+
}
690+
defer rows.Close()
704691

705-
hasFurtherRange = expectedRowCount > 0
706-
}
707-
if err = rows.Err(); err != nil {
692+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
693+
for rows.Next() {
694+
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
708695
return hasFurtherRange, expectedRowCount, err
709696
}
710-
if hasFurtherRange {
711-
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
712-
return hasFurtherRange, expectedRowCount, nil
713-
}
697+
698+
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
699+
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
700+
701+
hasFurtherRange = expectedRowCount > 0
702+
}
703+
if err = rows.Err(); err != nil {
704+
return hasFurtherRange, expectedRowCount, err
705+
}
706+
if hasFurtherRange {
707+
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
708+
return hasFurtherRange, expectedRowCount, nil
714709
}
715710
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
716711
return hasFurtherRange, expectedRowCount, nil

go/sql/builder.go

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -251,80 +251,6 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
251251
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
252252
}
253253

254-
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
255-
if uniqueKeyColumns.Len() == 0 {
256-
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
257-
}
258-
databaseName = EscapeName(databaseName)
259-
tableName = EscapeName(tableName)
260-
261-
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
262-
if includeRangeStartValues {
263-
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
264-
}
265-
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
266-
if err != nil {
267-
return "", explodedArgs, err
268-
}
269-
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
270-
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
271-
if err != nil {
272-
return "", explodedArgs, err
273-
}
274-
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
275-
276-
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
277-
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
278-
for i, column := range uniqueKeyColumns.Columns() {
279-
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
280-
if column.Type == EnumColumnType {
281-
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
282-
} else {
283-
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
284-
}
285-
}
286-
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
287-
result = fmt.Sprintf(`
288-
select /* gh-ost %s.%s %s */
289-
%s,
290-
(select count(*) from (
291-
select
292-
%s
293-
from
294-
%s.%s
295-
where
296-
%s and %s
297-
limit
298-
%d
299-
) select_osc_chunk)
300-
from (
301-
select
302-
%s
303-
from
304-
%s.%s
305-
where
306-
%s and %s
307-
limit
308-
%d
309-
) select_osc_chunk
310-
order by
311-
%s
312-
limit 1
313-
offset %d`,
314-
databaseName, tableName, hint,
315-
joinedColumnNames, joinedColumnNames,
316-
databaseName, tableName,
317-
rangeStartComparison, rangeEndComparison, chunkSize,
318-
joinedColumnNames,
319-
databaseName, tableName,
320-
rangeStartComparison, rangeEndComparison, chunkSize,
321-
strings.Join(uniqueKeyColumnAscending, ", "),
322-
(chunkSize - 1),
323-
)
324-
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
325-
return result, append(explodedArgs, explodedArgs...), nil
326-
}
327-
328254
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
329255
if uniqueKeyColumns.Len() == 0 {
330256
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")

go/sql/builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
325325
}
326326
}
327327

328-
func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
328+
func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
329329
databaseName := "mydb"
330330
originalTableName := "tbl"
331331
var chunkSize int64 = 500

0 commit comments

Comments
 (0)