Skip to content

Commit a205cd8

Browse files
committed
vtcombo: enable VReplication and OnlineDDL support
vtcombo stubs out VReplicationExec and VReplicationWaitForPos, which prevents OnlineDDL workflows that rely on VReplication. Additionally, when multiple shards share a single MySQL instance (as in vtcombo), the shared _vt sidecar database causes collisions in schema_migrations and vreplication tables. This commit: - Implements VReplicationExec and VReplicationWaitForPos in vtcombo by initializing a VREngine on primary tablets - Adds --per-shard-sidecar flag that gives each shard its own sidecar database (e.g. _vt_commerce_80) via the keyspace SidecarDbName field - Captures the sidecar DB name at init time in the vreplication engine, OnlineDDL executor, and schema engine to avoid races when multiple tablets share a process - Adds NewDBClientWithSidecarName for explicit sidecar name injection instead of reading the process-global value - Removes redundant qualifier baking in sidecardb.validateSchemaDefinition so that cached DDLs work with any sidecar database name - Simplifies controller_plan qualifier validation to only accept _vt, since the DBClient layer handles rewriting to the actual sidecar name - Adds --migration-check-interval passthrough from vttestserver - Adds e2e test proving OnlineDDL works on a sharded vtcombo keyspace Signed-off-by: Armand Parajon <armand@squareup.com>
1 parent a47ce93 commit a205cd8

File tree

17 files changed

+567
-74
lines changed

17 files changed

+567
-74
lines changed

go/cmd/vtcombo/cli/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ func init() {
123123

124124
utils.SetFlagVar(Main.Flags(), (*topoproto.TabletTypeListFlag)(&tabletTypesToWait), "tablet-types-to-wait", "Wait till connected for specified tablet types during Gateway initialization. Should be provided as a comma-separated set of tablet types.")
125125

126+
utils.SetFlagBoolVar(Main.Flags(), &vtcombo.PerShardSidecar, "per-shard-sidecar", false, "Give each shard its own sidecar database instead of sharing _vt. Required when multiple shards share a single MySQL instance (e.g. vtcombo) to avoid conflicts in schema_migrations and vreplication tables.")
127+
126128
// We're going to force the value later, so don't even bother letting the
127129
// user know about this flag.
128130
Main.Flags().MarkHidden("tablet-protocol")

go/cmd/vttestserver/cli/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ func New() (cmd *cobra.Command) {
227227
cmd.Flags().BoolVar(&doCreateTCPUser, "initialize-with-vt-dba-tcp", false, "If this flag is enabled, MySQL will be initialized with an additional user named vt_dba_tcp, who will have access via TCP/IP connection.")
228228

229229
utils.SetFlagBoolVar(cmd.Flags(), &config.NoScatter, "no-scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries")
230+
231+
utils.SetFlagDurationVar(cmd.Flags(), &config.MigrationCheckInterval, "migration-check-interval", 0, "How often the online DDL executor checks for pending migrations (passed to vtcombo). 0 uses vtcombo default (1m).")
232+
233+
utils.SetFlagBoolVar(cmd.Flags(), &config.PerShardSidecar, "per-shard-sidecar", false, "Give each shard its own sidecar database instead of sharing _vt. Required when multiple shards share a single MySQL instance to avoid conflicts in schema_migrations and vreplication tables.")
234+
230235
acl.RegisterFlags(cmd.Flags())
231236

232237
return cmd
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
Copyright 2026 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package onlineddl
18+
19+
import (
20+
"context"
21+
"database/sql"
22+
"flag"
23+
"fmt"
24+
"os"
25+
"strings"
26+
"testing"
27+
"time"
28+
29+
gosqldriver "github.com/go-sql-driver/mysql"
30+
"github.com/stretchr/testify/assert"
31+
"github.com/stretchr/testify/require"
32+
33+
"vitess.io/vitess/go/vt/log"
34+
"vitess.io/vitess/go/vt/vttest"
35+
36+
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
37+
vttestpb "vitess.io/vitess/go/vt/proto/vttest"
38+
)
39+
40+
var (
41+
localCluster *vttest.LocalCluster
42+
mysqlAddress string
43+
)
44+
45+
const keyspaceName = "ks"
46+
47+
const tableSchema = `
48+
CREATE TABLE test_table (
49+
id BIGINT UNSIGNED NOT NULL,
50+
msg VARCHAR(64),
51+
keyspace_id BIGINT UNSIGNED NOT NULL,
52+
PRIMARY KEY (id)
53+
) ENGINE=InnoDB
54+
`
55+
56+
func TestMain(m *testing.M) {
57+
flag.Parse()
58+
59+
exitcode, err := func() (int, error) {
60+
topology := &vttestpb.VTTestTopology{
61+
Keyspaces: []*vttestpb.Keyspace{
62+
{
63+
Name: keyspaceName,
64+
Shards: []*vttestpb.Shard{
65+
{Name: "-80"},
66+
{Name: "80-"},
67+
},
68+
},
69+
},
70+
}
71+
72+
vschema := &vschemapb.Keyspace{
73+
Sharded: true,
74+
Vindexes: map[string]*vschemapb.Vindex{
75+
"hash": {Type: "hash"},
76+
},
77+
Tables: map[string]*vschemapb.Table{
78+
"test_table": {
79+
ColumnVindexes: []*vschemapb.ColumnVindex{
80+
{Column: "keyspace_id", Name: "hash"},
81+
},
82+
},
83+
},
84+
}
85+
86+
var cfg vttest.Config
87+
cfg.Topology = topology
88+
cfg.EnableOnlineDDL = true
89+
cfg.PerShardSidecar = true
90+
cfg.MigrationCheckInterval = 5 * time.Second
91+
92+
if err := cfg.InitSchemas(keyspaceName, tableSchema, vschema); err != nil {
93+
return 1, err
94+
}
95+
defer os.RemoveAll(cfg.SchemaDir)
96+
97+
localCluster = &vttest.LocalCluster{Config: cfg}
98+
if err := localCluster.Setup(); err != nil {
99+
return 1, err
100+
}
101+
defer localCluster.TearDown()
102+
103+
mysqlAddress = fmt.Sprintf("localhost:%d", localCluster.Env.PortForProtocol("vtcombo_mysql_port", ""))
104+
105+
return m.Run(), nil
106+
}()
107+
if err != nil {
108+
log.Errorf("top level error: %v\n", err)
109+
os.Exit(1)
110+
}
111+
os.Exit(exitcode)
112+
}
113+
114+
func mysqlConn(t *testing.T) *sql.DB {
115+
t.Helper()
116+
cfg := gosqldriver.NewConfig()
117+
cfg.Net = "tcp"
118+
cfg.Addr = mysqlAddress
119+
cfg.DBName = keyspaceName + "@primary"
120+
db, err := sql.Open("mysql", cfg.FormatDSN())
121+
require.NoError(t, err)
122+
t.Cleanup(func() { db.Close() })
123+
return db
124+
}
125+
126+
func TestOnlineDDLSharded(t *testing.T) {
127+
ctx := context.Background()
128+
db := mysqlConn(t)
129+
130+
// Insert a row to prove the table is working across shards.
131+
_, err := db.ExecContext(ctx, "INSERT INTO test_table (id, msg, keyspace_id) VALUES (1, 'hello', 1)")
132+
require.NoError(t, err)
133+
_, err = db.ExecContext(ctx, "INSERT INTO test_table (id, msg, keyspace_id) VALUES (2, 'world', 2)")
134+
require.NoError(t, err)
135+
136+
// Submit an online DDL to add a column.
137+
_, err = db.ExecContext(ctx, "SET @@ddl_strategy='online'")
138+
require.NoError(t, err)
139+
140+
result, err := db.QueryContext(ctx, "ALTER TABLE test_table ADD COLUMN extra VARCHAR(64)")
141+
require.NoError(t, err)
142+
defer result.Close()
143+
144+
// The ALTER returns a UUID for the migration.
145+
require.True(t, result.Next())
146+
var uuid string
147+
require.NoError(t, result.Scan(&uuid))
148+
t.Logf("migration UUID: %s", uuid)
149+
require.NotEmpty(t, uuid)
150+
151+
// Wait for the migration to complete on all shards.
152+
require.Eventually(t, func() bool {
153+
rows, err := db.QueryContext(ctx, fmt.Sprintf("SHOW VITESS_MIGRATIONS LIKE '%s'", uuid))
154+
if err != nil {
155+
return false
156+
}
157+
defer rows.Close()
158+
159+
cols, _ := rows.Columns()
160+
statusIdx := -1
161+
for i, c := range cols {
162+
if c == "migration_status" {
163+
statusIdx = i
164+
break
165+
}
166+
}
167+
if statusIdx < 0 {
168+
return false
169+
}
170+
171+
allComplete := true
172+
count := 0
173+
for rows.Next() {
174+
vals := make([]any, len(cols))
175+
ptrs := make([]any, len(cols))
176+
for i := range vals {
177+
ptrs[i] = &vals[i]
178+
}
179+
if err := rows.Scan(ptrs...); err != nil {
180+
return false
181+
}
182+
count++
183+
status := fmt.Sprintf("%s", vals[statusIdx])
184+
if !strings.EqualFold(status, "complete") {
185+
allComplete = false
186+
}
187+
}
188+
// Expect one migration row per shard.
189+
return count == 2 && allComplete
190+
}, 2*time.Minute, 2*time.Second, "migration %s did not complete on all shards", uuid)
191+
192+
// Verify the column was added by inserting a row that uses it.
193+
_, err = db.ExecContext(ctx, "INSERT INTO test_table (id, msg, keyspace_id, extra) VALUES (3, 'test', 3, 'new_col_value')")
194+
require.NoError(t, err)
195+
196+
// Verify we can read back the new column.
197+
var extra string
198+
err = db.QueryRowContext(ctx, "SELECT extra FROM test_table WHERE id = 3").Scan(&extra)
199+
require.NoError(t, err)
200+
assert.Equal(t, "new_col_value", extra)
201+
}

go/vt/binlog/binlogplayer/dbclient.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,22 @@ type dbClientImpl struct {
5959
// sidecar database qualifiers with the actual one in use on the tablet.
6060
type dbClientImplWithSidecarDBReplacement struct {
6161
dbClientImpl
62+
sidecarDBName string
6263
}
6364

64-
// NewDBClient creates a DBClient instance
65+
// NewDBClient creates a DBClient instance using the current global sidecar name.
6566
func NewDBClient(params dbconfigs.Connector, parser *sqlparser.Parser) DBClient {
66-
if sidecar.GetName() != sidecar.DefaultName {
67+
return NewDBClientWithSidecarName(params, parser, sidecar.GetName())
68+
}
69+
70+
// NewDBClientWithSidecarName creates a DBClient instance with an explicit sidecar
71+
// database name. Use this when the global sidecar.GetName() may not reflect the
72+
// correct per-instance value (e.g., in vtcombo where multiple tablets share a process).
73+
func NewDBClientWithSidecarName(params dbconfigs.Connector, parser *sqlparser.Parser, sidecarDBName string) DBClient {
74+
if sidecarDBName != sidecar.DefaultName {
6775
return &dbClientImplWithSidecarDBReplacement{
68-
dbClientImpl{dbConfig: params, parser: parser},
76+
dbClientImpl: dbClientImpl{dbConfig: params, parser: parser},
77+
sidecarDBName: sidecarDBName,
6978
}
7079
}
7180
return &dbClientImpl{
@@ -181,7 +190,7 @@ func (dc *dbClientImpl) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype
181190

182191
func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
183192
// Replace any provided sidecar database qualifiers with the correct one.
184-
uq, err := dcr.parser.ReplaceTableQualifiers(query, sidecar.DefaultName, sidecar.GetName())
193+
uq, err := dcr.parser.ReplaceTableQualifiers(query, sidecar.DefaultName, dcr.sidecarDBName)
185194
if err != nil {
186195
return nil, err
187196
}
@@ -195,7 +204,7 @@ func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetchMulti(query string,
195204
return nil, err
196205
}
197206
for i, qp := range qps {
198-
uq, err := dcr.parser.ReplaceTableQualifiers(qp, sidecar.DefaultName, sidecar.GetName())
207+
uq, err := dcr.parser.ReplaceTableQualifiers(qp, sidecar.DefaultName, dcr.sidecarDBName)
199208
if err != nil {
200209
return nil, err
201210
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2026 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package binlogplayer
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
"vitess.io/vitess/go/constants/sidecar"
25+
"vitess.io/vitess/go/vt/dbconfigs"
26+
"vitess.io/vitess/go/vt/sqlparser"
27+
)
28+
29+
func TestNewDBClientWithSidecarName(t *testing.T) {
30+
parser := sqlparser.NewTestParser()
31+
var params dbconfigs.Connector
32+
33+
t.Run("default name returns dbClientImpl", func(t *testing.T) {
34+
client := NewDBClientWithSidecarName(params, parser, sidecar.DefaultName)
35+
_, ok := client.(*dbClientImpl)
36+
assert.True(t, ok, "expected *dbClientImpl for default sidecar name")
37+
})
38+
39+
t.Run("custom name returns dbClientImplWithSidecarDBReplacement", func(t *testing.T) {
40+
client := NewDBClientWithSidecarName(params, parser, "_vt_custom_0")
41+
impl, ok := client.(*dbClientImplWithSidecarDBReplacement)
42+
assert.True(t, ok, "expected *dbClientImplWithSidecarDBReplacement for custom sidecar name")
43+
if ok {
44+
assert.Equal(t, "_vt_custom_0", impl.sidecarDBName)
45+
}
46+
})
47+
}

go/vt/sidecardb/sidecardb.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ func init() {
123123

124124
func validateSchemaDefinition(name, schema string, parser *sqlparser.Parser) (string, error) {
125125
stmt, err := parser.ParseStrictDDL(schema)
126-
127126
if err != nil {
128127
return "", err
129128
}
@@ -137,7 +136,6 @@ func validateSchemaDefinition(name, schema string, parser *sqlparser.Parser) (st
137136
if qualifier != "" {
138137
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "database qualifier of %s specified for the %s table when there should not be one", qualifier, name)
139138
}
140-
createTable.Table.Qualifier = sqlparser.NewIdentifierCS(sidecar.GetName())
141139
if !strings.EqualFold(tableName, name) {
142140
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table name of %s does not match the table name specified within the file: %s", name, tableName)
143141
}
@@ -568,6 +566,13 @@ func AddSchemaInitQueries(db *fakesqldb.DB, populateTables bool, parser *sqlpars
568566
sdbe, _ := sqlparser.ParseAndBind(sidecarDBExistsQuery, sqltypes.StringBindVariable(sidecar.GetName()))
569567
db.AddQuery(sdbe, result)
570568
for _, table := range sidecarTables {
569+
// Accept unqualified DDLs for this table. Schema definitions are
570+
// stored without a database qualifier so that sidecardb.Init can
571+
// target any sidecar database name via USE.
572+
quotedName := regexp.QuoteMeta(table.name)
573+
db.AddQueryPattern(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `?%s`? .*", quotedName), result)
574+
db.AddQueryPattern(fmt.Sprintf("ALTER TABLE `?%s`? .*", quotedName), result)
575+
571576
result = &sqltypes.Result{}
572577
if populateTables {
573578
result = sqltypes.MakeTestResult(sqltypes.MakeTestFields(
@@ -628,6 +633,14 @@ func MatchesInitQuery(query string) bool {
628633
return true
629634
}
630635
}
636+
// Also match unqualified DDLs targeting known sidecar tables.
637+
for _, table := range sidecarTables {
638+
quotedName := regexp.QuoteMeta(table.name)
639+
pattern := fmt.Sprintf("(?i)(CREATE TABLE IF NOT EXISTS|ALTER TABLE) `?%s`?", quotedName)
640+
if match, _ := regexp.MatchString(pattern, query); match {
641+
return true
642+
}
643+
}
631644
return false
632645
}
633646

0 commit comments

Comments
 (0)