Skip to content

Commit f0e9a2d

Browse files
authored
VReplication: Add reference-tables to existing materialize workflow (vitessio#17804)
Signed-off-by: Noble Mittal <noblemittal@outlook.com>
1 parent 8dbe553 commit f0e9a2d

File tree

25 files changed

+5077
-3076
lines changed

25 files changed

+5077
-3076
lines changed

go/cmd/vtctldclient/command/vreplication/materialize/materialize.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@ package materialize
1818

1919
import (
2020
"fmt"
21+
"strings"
2122

2223
"github.com/spf13/cobra"
2324

2425
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
2526
"vitess.io/vitess/go/mysql/config"
2627
"vitess.io/vitess/go/vt/topo/topoproto"
28+
29+
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
2730
)
2831

2932
var (
33+
updateOptions = struct {
34+
AddReferenceTables []string
35+
}{}
36+
3037
// base is the base command for all actions related to Materialize.
3138
base = &cobra.Command{
3239
Use: "Materialize --workflow <workflow> --target-keyspace <keyspace> [command] [command-flags]",
@@ -35,8 +42,42 @@ var (
3542
Aliases: []string{"materialize"},
3643
Args: cobra.ExactArgs(1),
3744
}
45+
46+
// update is the command for updating existing materialize workflow.
47+
// This can be helpful if we plan to add other actions as well such as
48+
// removing tables from workflow.
49+
update = &cobra.Command{
50+
Use: "update --add-tables='table1,table2'",
51+
Short: "Update existing materialize workflow.",
52+
Aliases: []string{"Update"},
53+
Args: cobra.NoArgs,
54+
RunE: commandUpdate,
55+
}
3856
)
3957

58+
func commandUpdate(cmd *cobra.Command, args []string) error {
59+
tableSettings := []*vtctldatapb.TableMaterializeSettings{}
60+
for _, table := range updateOptions.AddReferenceTables {
61+
tableSettings = append(tableSettings, &vtctldatapb.TableMaterializeSettings{
62+
TargetTable: table,
63+
})
64+
}
65+
66+
_, err := common.GetClient().WorkflowAddTables(common.GetCommandCtx(), &vtctldatapb.WorkflowAddTablesRequest{
67+
Workflow: common.BaseOptions.Workflow,
68+
Keyspace: common.BaseOptions.TargetKeyspace,
69+
TableSettings: tableSettings,
70+
MaterializationIntent: vtctldatapb.MaterializationIntent_REFERENCE,
71+
})
72+
73+
if err != nil {
74+
return err
75+
}
76+
fmt.Printf("Table(s) %s added to the workflow %s. Use show to view the status.\n",
77+
strings.Join(updateOptions.AddReferenceTables, ", "), common.BaseOptions.Workflow)
78+
return nil
79+
}
80+
4081
func registerCommands(root *cobra.Command) {
4182
common.AddCommonFlags(base)
4283
root.AddCommand(base)
@@ -54,6 +95,10 @@ func registerCommands(root *cobra.Command) {
5495
create.Flags().StringSliceVarP(&common.CreateOptions.ReferenceTables, "reference-tables", "r", nil, "Used to specify the reference tables to materialize on every target shard.")
5596
base.AddCommand(create)
5697

98+
update.Flags().StringSliceVar(&updateOptions.AddReferenceTables, "add-reference-tables", nil, "Used to specify the reference tables to be added to the existing workflow")
99+
update.MarkFlagRequired("add-reference-tables")
100+
base.AddCommand(update)
101+
57102
// Generic workflow commands.
58103
opts := &common.SubCommandsOpts{
59104
SubCommand: "Materialize",

go/test/endtoend/vreplication/materialize_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ const (
238238
id2 bigint not null,
239239
primary key (id)
240240
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
241+
create table ref3 (
242+
id bigint not null,
243+
id2 bigint not null,
244+
primary key (id)
245+
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
246+
create table ref4 (
247+
id bigint not null,
248+
id2 bigint not null,
249+
primary key (id)
250+
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
241251
`
242252
refSourceVSchema = `
243253
{
@@ -246,6 +256,12 @@ const (
246256
"type": "reference"
247257
},
248258
"ref2": {
259+
"type": "reference"
260+
},
261+
"ref3": {
262+
"type": "reference"
263+
},
264+
"ref4": {
249265
"type": "reference"
250266
}
251267
}
@@ -261,12 +277,22 @@ const (
261277
"ref2": {
262278
"type": "reference",
263279
"source": "ks1.ref2"
280+
},
281+
"ref3": {
282+
"type": "reference",
283+
"source": "ks1.ref3"
284+
},
285+
"ref4": {
286+
"type": "reference",
287+
"source": "ks1.ref4"
264288
}
265289
}
266290
}
267291
`
268292
initRef1DataQuery = `insert into ks1.ref1(id, val) values (1, 'abc'), (2, 'def'), (3, 'ghi')`
269293
initRef2DataQuery = `insert into ks1.ref2(id, id2) values (1, 1), (2, 2), (3, 3)`
294+
initRef3DataQuery = `insert into ks1.ref3(id, id2) values (1, 1), (2, 2), (3, 3), (4, 4)`
295+
initRef4DataQuery = `insert into ks1.ref4(id, id2) values (1, 1), (2, 2), (3, 3)`
270296
)
271297

272298
// TestReferenceTableMaterialize tests materializing reference tables.
@@ -287,6 +313,10 @@ func TestReferenceTableMaterialize(t *testing.T) {
287313
require.NoError(t, err)
288314
_, err = vtgateConn.ExecuteFetch(initRef2DataQuery, 0, false)
289315
require.NoError(t, err)
316+
_, err = vtgateConn.ExecuteFetch(initRef3DataQuery, 0, false)
317+
require.NoError(t, err)
318+
_, err = vtgateConn.ExecuteFetch(initRef4DataQuery, 0, false)
319+
require.NoError(t, err)
290320

291321
err = vc.VtctldClient.ExecuteCommand("Materialize", "--target-keyspace", "ks2", "--workflow", "wf1", "create",
292322
"--source-keyspace", "ks1", "--reference-tables", "ref1,ref2")
@@ -322,4 +352,41 @@ func TestReferenceTableMaterialize(t *testing.T) {
322352
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref2", 4)
323353
}
324354
vdiff(t, "ks2", "wf1", defaultCellName, nil)
355+
356+
// Testing update with --add-reference-tables.
357+
err = vc.VtctldClient.ExecuteCommand("Materialize", "--target-keyspace", "ks2", "--workflow", "wf1", "update",
358+
"--add-reference-tables", "ref3,ref4")
359+
require.NoError(t, err, "MaterializeAddTables")
360+
361+
for _, shard := range shards {
362+
tab := vc.getPrimaryTablet(t, "ks2", shard)
363+
catchup(t, tab, "wf1", "Materialize")
364+
}
365+
366+
for _, shard := range shards {
367+
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref3", 4)
368+
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref3",
369+
`[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)] [INT64(4) INT64(4)]]`)
370+
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref4", 3)
371+
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref4",
372+
`[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)]]`)
373+
}
374+
vdiff(t, "ks2", "wf1", defaultCellName, nil)
375+
376+
queries = []string{
377+
"update ks1.ref3 set id2=3 where id=2",
378+
"update ks1.ref4 set id2=3 where id=2",
379+
"delete from ks1.ref3 where id2=3",
380+
"delete from ks1.ref4 where id2=3",
381+
"insert into ks1.ref3(id, id2) values (3, 3)",
382+
"insert into ks1.ref4(id, id2) values (3, 3), (4, 4)",
383+
}
384+
for _, query := range queries {
385+
execVtgateQuery(t, vtgateConn, "ks1", query)
386+
}
387+
for _, shard := range shards {
388+
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref3", 3)
389+
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref4", 3)
390+
}
391+
vdiff(t, "ks2", "wf1", defaultCellName, nil)
325392
}

0 commit comments

Comments
 (0)