@@ -20,6 +20,7 @@ import (
2020 "context"
2121 "errors"
2222 "fmt"
23+ "path"
2324 "slices"
2425 "strings"
2526 "testing"
@@ -1014,6 +1015,210 @@ func TestMoveTablesNoRoutingRules(t *testing.T) {
10141015 require .Zerof (t , len (rr .Rules ), "routing rules should be empty, found %+v" , rr .Rules )
10151016}
10161017
1018+ func TestMoveTablesCreateShardedVSchemaRollback (t * testing.T ) {
1019+ ms := & vtctldatapb.MaterializeSettings {
1020+ Workflow : "workflow" ,
1021+ SourceKeyspace : "sourceks" ,
1022+ TargetKeyspace : "targetks" ,
1023+ TableSettings : []* vtctldatapb.TableMaterializeSettings {{
1024+ TargetTable : "t1" ,
1025+ SourceExpression : "select * from t1" ,
1026+ }},
1027+ }
1028+
1029+ ctx , cancel := context .WithCancel (context .Background ())
1030+ defer cancel ()
1031+ env := newTestMaterializerEnv (t , ctx , ms , []string {"-" }, []string {"-" })
1032+ defer env .close ()
1033+
1034+ targetVSchema := & vschemapb.Keyspace {
1035+ Sharded : true ,
1036+ Vindexes : map [string ]* vschemapb.Vindex {
1037+ "hash" : {
1038+ Type : "hash" ,
1039+ },
1040+ },
1041+ Tables : map [string ]* vschemapb.Table {
1042+ "t1" : {
1043+ ColumnVindexes : []* vschemapb.ColumnVindex {{
1044+ Name : "hash" ,
1045+ Column : "id" ,
1046+ }},
1047+ },
1048+ },
1049+ }
1050+ err := env .ws .ts .SaveVSchema (ctx , & topo.KeyspaceVSchemaInfo {
1051+ Name : ms .TargetKeyspace ,
1052+ Keyspace : targetVSchema ,
1053+ })
1054+ require .NoError (t , err )
1055+
1056+ env .tmc .expectFetchAsAllPrivsQuery (startingTargetTabletUID , getNonEmptyTable , & sqltypes.Result {})
1057+
1058+ sourceDeleteQuery := fmt .Sprintf (sqlDeleteWorkflow , encodeString ("vt_sourceks" ), encodeString (ReverseWorkflowName (ms .Workflow )))
1059+ targetDeleteQuery := fmt .Sprintf (sqlDeleteWorkflow , encodeString ("vt_targetks" ), encodeString (ms .Workflow ))
1060+ env .tmc .expectVRQuery (startingSourceTabletUID , sourceDeleteQuery , & sqltypes.Result {})
1061+ env .tmc .expectVRQuery (startingTargetTabletUID , targetDeleteQuery , & sqltypes.Result {})
1062+
1063+ readCalls := 0
1064+ env .tmc .readVReplicationWorkflow = func (ctx context.Context , tablet * topodatapb.Tablet , request * tabletmanagerdatapb.ReadVReplicationWorkflowRequest ) (* tabletmanagerdatapb.ReadVReplicationWorkflowResponse , error ) {
1065+ readCalls ++
1066+ if readCalls == 1 {
1067+ return & tabletmanagerdatapb.ReadVReplicationWorkflowResponse {
1068+ Workflow : request .Workflow ,
1069+ WorkflowType : binlogdatapb .VReplicationWorkflowType_MoveTables ,
1070+ Streams : []* tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream {
1071+ {
1072+ Id : 1 ,
1073+ Bls : & binlogdatapb.BinlogSource {
1074+ Keyspace : ms .SourceKeyspace ,
1075+ Shard : "-" ,
1076+ Filter : & binlogdatapb.Filter {
1077+ Rules : []* binlogdatapb.Rule {{
1078+ Match : "t1" ,
1079+ Filter : "select * from t1" ,
1080+ }},
1081+ },
1082+ },
1083+ },
1084+ },
1085+ }, nil
1086+ }
1087+ return nil , errors .New ("read vreplication failed" )
1088+ }
1089+
1090+ _ , err = env .ws .MoveTablesCreate (ctx , & vtctldatapb.MoveTablesCreateRequest {
1091+ Workflow : ms .Workflow ,
1092+ SourceKeyspace : ms .SourceKeyspace ,
1093+ TargetKeyspace : ms .TargetKeyspace ,
1094+ IncludeTables : []string {"t1" },
1095+ })
1096+ require .ErrorContains (t , err , "read vreplication failed" )
1097+
1098+ got , err := env .ws .ts .GetVSchema (ctx , ms .TargetKeyspace )
1099+ require .NoError (t , err )
1100+ require .True (t , proto .Equal (got .Keyspace , targetVSchema ), "got: %v, want: %v" , got .Keyspace , targetVSchema )
1101+ }
1102+
1103+ func TestMoveTablesCreateUnshardedVSchemaRollback (t * testing.T ) {
1104+ ms := & vtctldatapb.MaterializeSettings {
1105+ Workflow : "workflow" ,
1106+ SourceKeyspace : "sourceks" ,
1107+ TargetKeyspace : "targetks" ,
1108+ TableSettings : []* vtctldatapb.TableMaterializeSettings {{
1109+ TargetTable : "t1" ,
1110+ SourceExpression : "select * from t1" ,
1111+ }},
1112+ }
1113+
1114+ ctx , cancel := context .WithCancel (context .Background ())
1115+ defer cancel ()
1116+ env := newTestMaterializerEnv (t , ctx , ms , []string {"-" }, []string {"-" })
1117+ defer env .close ()
1118+
1119+ originalVSchema := & vschemapb.Keyspace {
1120+ Tables : map [string ]* vschemapb.Table {
1121+ "t0" : {},
1122+ },
1123+ }
1124+ err := env .ws .ts .SaveVSchema (ctx , & topo.KeyspaceVSchemaInfo {
1125+ Name : ms .TargetKeyspace ,
1126+ Keyspace : originalVSchema ,
1127+ })
1128+ require .NoError (t , err )
1129+
1130+ env .tmc .expectFetchAsAllPrivsQuery (startingTargetTabletUID , getNonEmptyTable , & sqltypes.Result {})
1131+
1132+ sourceDeleteQuery := fmt .Sprintf (sqlDeleteWorkflow , encodeString ("vt_sourceks" ), encodeString (ReverseWorkflowName (ms .Workflow )))
1133+ targetDeleteQuery := fmt .Sprintf (sqlDeleteWorkflow , encodeString ("vt_targetks" ), encodeString (ms .Workflow ))
1134+ env .tmc .expectVRQuery (startingSourceTabletUID , sourceDeleteQuery , & sqltypes.Result {})
1135+ env .tmc .expectVRQuery (startingTargetTabletUID , targetDeleteQuery , & sqltypes.Result {})
1136+
1137+ conn , err := env .ws .ts .ConnForCell (ctx , topo .GlobalCell )
1138+ require .NoError (t , err )
1139+ current , changes , err := conn .Watch (ctx , path .Join (topo .KeyspacesPath , ms .TargetKeyspace , topo .VSchemaFile ))
1140+ require .NoError (t , err )
1141+ initialVersion := ""
1142+ if current != nil && current .Version != nil {
1143+ initialVersion = current .Version .String ()
1144+ }
1145+
1146+ failCh := make (chan struct {})
1147+ readCalls := 0
1148+ env .tmc .readVReplicationWorkflow = func (ctx context.Context , tablet * topodatapb.Tablet , request * tabletmanagerdatapb.ReadVReplicationWorkflowRequest ) (* tabletmanagerdatapb.ReadVReplicationWorkflowResponse , error ) {
1149+ readCalls ++
1150+ if readCalls == 1 {
1151+ return & tabletmanagerdatapb.ReadVReplicationWorkflowResponse {
1152+ Workflow : request .Workflow ,
1153+ WorkflowType : binlogdatapb .VReplicationWorkflowType_MoveTables ,
1154+ Streams : []* tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream {
1155+ {
1156+ Id : 1 ,
1157+ Bls : & binlogdatapb.BinlogSource {
1158+ Keyspace : ms .SourceKeyspace ,
1159+ Shard : "-" ,
1160+ Filter : & binlogdatapb.Filter {
1161+ Rules : []* binlogdatapb.Rule {{
1162+ Match : "t1" ,
1163+ Filter : "select * from t1" ,
1164+ }},
1165+ },
1166+ },
1167+ },
1168+ },
1169+ }, nil
1170+ }
1171+ <- failCh
1172+ return nil , errors .New ("read vreplication failed" )
1173+ }
1174+
1175+ errCh := make (chan error , 1 )
1176+ go func () {
1177+ _ , err := env .ws .MoveTablesCreate (ctx , & vtctldatapb.MoveTablesCreateRequest {
1178+ Workflow : ms .Workflow ,
1179+ SourceKeyspace : ms .SourceKeyspace ,
1180+ TargetKeyspace : ms .TargetKeyspace ,
1181+ IncludeTables : []string {"t1" },
1182+ })
1183+ errCh <- err
1184+ }()
1185+
1186+ updatedVersion := ""
1187+ assert .Eventually (t , func () bool {
1188+ select {
1189+ case wd := <- changes :
1190+ if wd == nil || wd .Err != nil || wd .Contents == nil {
1191+ return false
1192+ }
1193+ ks := & vschemapb.Keyspace {}
1194+ if err := ks .UnmarshalVT (wd .Contents ); err != nil {
1195+ return false
1196+ }
1197+ if ks .Tables ["t1" ] == nil {
1198+ return false
1199+ }
1200+ if wd .Version != nil {
1201+ updatedVersion = wd .Version .String ()
1202+ }
1203+ return true
1204+ default :
1205+ return false
1206+ }
1207+ }, 5 * time .Second , 50 * time .Millisecond )
1208+ require .NotEmpty (t , updatedVersion )
1209+ if initialVersion != "" {
1210+ require .NotEqual (t , initialVersion , updatedVersion )
1211+ }
1212+
1213+ close (failCh )
1214+ err = <- errCh
1215+ require .ErrorContains (t , err , "read vreplication failed" )
1216+
1217+ got , err := env .ws .ts .GetVSchema (ctx , ms .TargetKeyspace )
1218+ require .NoError (t , err )
1219+ require .True (t , proto .Equal (got .Keyspace , originalVSchema ), "got: %v, want: %v" , got .Keyspace , originalVSchema )
1220+ }
1221+
10171222func TestCreateLookupVindexFull (t * testing.T ) {
10181223 ms := & vtctldatapb.MaterializeSettings {
10191224 Workflow : "lookup" ,
0 commit comments