Skip to content

Commit 5aefdb0

Browse files
Feature(onlineddl): Add shard-specific completion to online ddl (vitessio#18331)
Signed-off-by: siddharth16396 <[email protected]> Signed-off-by: Shlomi Noach <[email protected]> Co-authored-by: Shlomi Noach <[email protected]>
1 parent efae039 commit 5aefdb0

File tree

7 files changed

+7865
-7754
lines changed

7 files changed

+7865
-7754
lines changed

go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,61 @@ func testScheduler(t *testing.T) {
605605
})
606606
})
607607

608+
t.Run("Postpone completion ALTER with shards", func(t *testing.T) {
609+
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait
610+
611+
t.Run("wait for t1 running", func(t *testing.T) {
612+
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
613+
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
614+
})
615+
616+
t.Run("wait for ready_to_complete", func(t *testing.T) {
617+
waitForReadyToComplete(t, t1uuid, true)
618+
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
619+
require.NotNil(t, rs)
620+
for _, row := range rs.Named().Rows {
621+
assert.True(t, row["shadow_analyzed_timestamp"].IsNull())
622+
}
623+
})
624+
625+
t.Run("check postpone_completion", func(t *testing.T) {
626+
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
627+
require.NotNil(t, rs)
628+
for _, row := range rs.Named().Rows {
629+
postponeCompletion := row.AsInt64("postpone_completion", 0)
630+
assert.Equal(t, int64(1), postponeCompletion)
631+
}
632+
})
633+
t.Run("complete with irrelevant shards", func(t *testing.T) {
634+
onlineddl.CheckCompleteMigrationShards(t, &vtParams, shards, t1uuid, "x,y,z", false)
635+
// Added an artificial sleep here just to ensure we're not missing a would-be completion./
636+
time.Sleep(2 * time.Second)
637+
// Migration should still be in running state
638+
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
639+
// postpone_completion should still be set
640+
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
641+
require.NotNil(t, rs)
642+
for _, row := range rs.Named().Rows {
643+
postponeCompletion := row.AsInt64("postpone_completion", 0)
644+
assert.Equal(t, int64(1), postponeCompletion)
645+
}
646+
})
647+
t.Run("complete with relevant shards", func(t *testing.T) {
648+
onlineddl.CheckCompleteMigrationShards(t, &vtParams, shards, t1uuid, "x, y, 1", true)
649+
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
650+
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
651+
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
652+
})
653+
t.Run("check no postpone_completion", func(t *testing.T) {
654+
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
655+
require.NotNil(t, rs)
656+
for _, row := range rs.Named().Rows {
657+
postponeCompletion := row.AsInt64("postpone_completion", 0)
658+
assert.Equal(t, int64(0), postponeCompletion)
659+
}
660+
})
661+
})
662+
608663
t.Run("Delayed postpone completion ALTER", func(t *testing.T) {
609664
onlineddl.ThrottleAllMigrations(t, &vtParams)
610665
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)

go/test/endtoend/onlineddl/vtgate_util.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,22 @@ func CheckCompleteMigration(t *testing.T, vtParams *mysql.ConnParams, shards []c
179179
}
180180
}
181181

182+
// CheckCompleteMigrationShards attempts to complete a migration for specific shards, and expects success by counting affected rows
183+
func CheckCompleteMigrationShards(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, completeShards string, expectCompletePossible bool) {
184+
query, err := sqlparser.ParseAndBind("alter vitess_migration %a complete vitess_shards %a",
185+
sqltypes.StringBindVariable(uuid),
186+
sqltypes.StringBindVariable(completeShards),
187+
)
188+
require.NoError(t, err)
189+
r := VtgateExecQuery(t, vtParams, query, "")
190+
191+
if expectCompletePossible {
192+
assert.Equal(t, len(shards), int(r.RowsAffected))
193+
} else {
194+
assert.Equal(t, int(0), int(r.RowsAffected))
195+
}
196+
}
197+
182198
// CheckPostponeCompleteMigration attempts to postpone an existing migration, and expects success by counting affected rows
183199
func CheckPostponeCompleteMigration(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectPotponePossible bool) {
184200
query, err := sqlparser.ParseAndBind("alter vitess_migration %a postpone complete",

go/vt/sqlparser/parse_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2583,6 +2583,10 @@ var (
25832583
input: "alter vitess_migration launch all",
25842584
}, {
25852585
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete",
2586+
}, {
2587+
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete vitess_shards '-40'",
2588+
}, {
2589+
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete vitess_shards '-40,40-80'",
25862590
}, {
25872591
input: "alter vitess_migration complete all",
25882592
}, {

0 commit comments

Comments
 (0)