Skip to content

Commit 03a9bfe

Browse files
authored
Query buffering, terminating blocking transactions for INSTANT DDL and other "special plans" (vitessio#17945)
Signed-off-by: Shlomi Noach <[email protected]>
1 parent 5601849 commit 03a9bfe

File tree

2 files changed

+124
-4
lines changed

2 files changed

+124
-4
lines changed

go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,10 @@ func testScheduler(t *testing.T) {
378378
ALTER TABLE t2_test ENGINE=InnoDB;
379379
`
380380
instantAlterT1Statement = `
381-
ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0;
381+
ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0
382+
`
383+
instantUndoAlterT1Statement = `
384+
ALTER TABLE t1_test DROP COLUMN i0
382385
`
383386
dropT1Statement = `
384387
DROP TABLE IF EXISTS t1_test
@@ -399,7 +402,7 @@ func testScheduler(t *testing.T) {
399402
ALTER TABLE nonexistent FORCE
400403
`
401404
populateT1Statement = `
402-
insert into t1_test values (1, 'new_row')
405+
insert ignore into t1_test values (1, 'new_row')
403406
`
404407
)
405408

@@ -792,6 +795,64 @@ func testScheduler(t *testing.T) {
792795
})
793796
})
794797
}
798+
799+
if forceCutoverCapable {
800+
t.Run("force_cutover_instant", func(t *testing.T) {
801+
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5)
802+
defer cancel()
803+
804+
t.Run("populate t1_test", func(t *testing.T) {
805+
onlineddl.VtgateExecQuery(t, &vtParams, populateT1Statement, "")
806+
})
807+
808+
commitTransactionChan := make(chan any)
809+
transactionErrorChan := make(chan error)
810+
t.Run("locking table rows", func(t *testing.T) {
811+
go runInTransaction(t, ctx, primaryTablet, "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
812+
})
813+
814+
t.Run("execute migration", func(t *testing.T) {
815+
t1uuid = testOnlineDDLStatement(t, createParams(instantAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait
816+
})
817+
t.Run("expect completion", func(t *testing.T) {
818+
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
819+
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
820+
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
821+
})
822+
t.Run("check special_plan", func(t *testing.T) {
823+
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
824+
require.NotNil(t, rs)
825+
for _, row := range rs.Named().Rows {
826+
specialPlan := row.AsString("special_plan", "")
827+
assert.Contains(t, specialPlan, "instant-ddl")
828+
}
829+
})
830+
t.Run("expect transaction failure", func(t *testing.T) {
831+
select {
832+
case commitTransactionChan <- true: // good
833+
case <-ctx.Done():
834+
assert.Fail(t, ctx.Err().Error())
835+
}
836+
// Transaction will now attempt to commit. But we expect our "force_cutover" to have terminated
837+
// the transaction's connection.
838+
select {
839+
case err := <-transactionErrorChan:
840+
assert.ErrorContains(t, err, "broken pipe")
841+
case <-ctx.Done():
842+
assert.Fail(t, ctx.Err().Error())
843+
}
844+
})
845+
t.Run("cleanup: undo migration", func(t *testing.T) {
846+
t1uuid = testOnlineDDLStatement(t, createParams(instantUndoAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait
847+
})
848+
t.Run("cleanup: expect completion", func(t *testing.T) {
849+
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
850+
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
851+
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
852+
})
853+
})
854+
}
855+
795856
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
796857
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
797858
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait

go/vt/vttablet/onlineddl/executor.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2482,6 +2482,65 @@ func (e *Executor) executeAlterViewOnline(ctx context.Context, onlineDDL *schema
24822482
return nil
24832483
}
24842484

2485+
// executeSpecialAlterDirectDDLActionMigration executes a special plan using a direct ALTER TABLE statement.
2486+
func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) {
2487+
2488+
forceCutOverAfter, err := onlineDDL.StrategySetting().ForceCutOverAfter()
2489+
if err != nil {
2490+
return err
2491+
}
2492+
2493+
bufferingCtx, bufferingContextCancel := context.WithCancel(ctx)
2494+
defer bufferingContextCancel()
2495+
2496+
// Buffer queries while issuing the ALTER TABLE statement (we assume this ALTER is going to be quick,
2497+
// as in ALGORITHM=INSTANT or a quick partition operation)
2498+
toggleBuffering := func(bufferQueries bool) {
2499+
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
2500+
timeout := onlineDDL.CutOverThreshold + qrBufferExtraTimeout
2501+
2502+
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries)
2503+
if !bufferQueries {
2504+
// unbuffer existing queries:
2505+
bufferingContextCancel()
2506+
}
2507+
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
2508+
}
2509+
defer toggleBuffering(false)
2510+
toggleBuffering(true)
2511+
2512+
// Give a fraction of a second for a scenario where a query is in
2513+
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
2514+
// they will be able to complete before the ALTER.
2515+
e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering")
2516+
time.Sleep(100 * time.Millisecond)
2517+
2518+
if forceCutOverAfter > 0 {
2519+
// Irrespective of the --force-cut-over-after flag value, as long as it's nonzero, we now terminate
2520+
// connections adn transactions on the migrated table.
2521+
// --force-cut-over-after was designed to work with `vitess` migrations, that could cut-over multiple times,
2522+
// and was meant to set a limit to the overall duration of the attempts, for example 1 hour.
2523+
// With INSTANT DDL or other quick operations, this becomes meaningless. Once we begin the operation, there
2524+
// is no going back. We submit it to MySQL, and it takes however long it takes.
2525+
// In this particular function, we expect *very quick* operation.
2526+
// So we take --force-cut-over-after as a hint that we should force terminate connections and transactions.
2527+
//
2528+
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
2529+
// This will help in keeping the atomicity guarantee of a prepared transaction.
2530+
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
2531+
return vterrors.Wrapf(err, "checking prepared pool for table")
2532+
}
2533+
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
2534+
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
2535+
}
2536+
}
2537+
2538+
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
2539+
return err
2540+
}
2541+
return nil
2542+
}
2543+
24852544
// executeSpecialAlterDDLActionMigrationIfApplicable sees if the given migration can be executed via special execution path, that isn't a full blown online schema change process.
24862545
func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context.Context, onlineDDL *schema.OnlineDDL) (specialMigrationExecuted bool, err error) {
24872546
// Before we jump on to strategies... Some ALTERs can be optimized without having to run through
@@ -2505,11 +2564,11 @@ func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context
25052564
case instantDDLSpecialOperation:
25062565
schemadiff.AddInstantAlgorithm(specialPlan.alterTable)
25072566
onlineDDL.SQL = sqlparser.CanonicalString(specialPlan.alterTable)
2508-
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
2567+
if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil {
25092568
return false, err
25102569
}
25112570
case rangePartitionSpecialOperation:
2512-
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
2571+
if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil {
25132572
return false, err
25142573
}
25152574
default:

0 commit comments

Comments
 (0)