Skip to content

Commit a83eeee

Browse files
[release-18.0] VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (vitessio#16217) (vitessio#16221)
Signed-off-by: Rohit Nayak <[email protected]> Co-authored-by: Rohit Nayak <[email protected]>
1 parent d45b70b commit a83eeee

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

go/vt/vttablet/tabletmanager/rpc_vreplication.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package tabletmanager
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"strings"
2223

2324
"google.golang.org/protobuf/encoding/prototext"
@@ -49,6 +50,8 @@ const (
4950
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
5051
// Update the configuration values for a workflow's vreplication stream.
5152
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
53+
// Check if workflow is still copying.
54+
sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d"
5255
)
5356

5457
func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
@@ -227,6 +230,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
227230
return resp, nil
228231
}
229232

233+
func isStreamCopying(tm *TabletManager, id int64) (bool, error) {
234+
query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id)
235+
res, err := tm.VREngine.Exec(query)
236+
if err != nil {
237+
return false, err
238+
}
239+
if res != nil && len(res.Rows) > 0 {
240+
return true, nil
241+
}
242+
return false, nil
243+
}
244+
230245
// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
231246
// record(s) for this tablet's vreplication workflow stream(s). If there
232247
// are no streams for the given workflow on the tablet then a nil result
@@ -302,6 +317,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
302317
if !textutil.ValueIsSimulatedNull(req.State) {
303318
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
304319
}
320+
if state == binlogdatapb.VReplicationWorkflowState_Running.String() {
321+
// `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set
322+
// the state as Copying.
323+
isCopying, err := isStreamCopying(tm, id)
324+
if err != nil {
325+
return nil, err
326+
}
327+
if isCopying {
328+
state = binlogdatapb.VReplicationWorkflowState_Copying.String()
329+
}
330+
}
305331
bindVars = map[string]*querypb.BindVariable{
306332
"st": sqltypes.StringBindVariable(state),
307333
"sc": sqltypes.StringBindVariable(string(source)),

go/vt/vttablet/tabletmanager/rpc_vreplication_test.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,10 +489,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
489489
fmt.Sprintf("%d", vreplID),
490490
)
491491

492+
getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID))
493+
copyStatusFields := sqltypes.MakeTestFields(
494+
"id",
495+
"int64",
496+
)
497+
notCopying := sqltypes.MakeTestResult(copyStatusFields)
498+
copying := sqltypes.MakeTestResult(copyStatusFields, "1")
499+
492500
tests := []struct {
493-
name string
494-
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
495-
query string
501+
name string
502+
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
503+
query string
504+
isCopying bool
496505
}{
497506
{
498507
name: "update cells",
@@ -572,6 +581,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
572581
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
573582
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
574583
},
584+
{
585+
name: "update to running while copying",
586+
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
587+
Workflow: workflow,
588+
State: binlogdatapb.VReplicationWorkflowState_Running,
589+
Cells: textutil.SimulatedNullStringSlice,
590+
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
591+
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
592+
},
593+
isCopying: true,
594+
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
595+
keyspace, shard, cells[0], tabletTypes[0], vreplID),
596+
},
575597
}
576598

577599
for _, tt := range tests {
@@ -590,6 +612,17 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
590612
// These are the same for each RPC call.
591613
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
592614
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
615+
616+
if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running ||
617+
tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) {
618+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
619+
if tt.isCopying {
620+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil)
621+
} else {
622+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil)
623+
624+
}
625+
}
593626
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
594627
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)
595628

0 commit comments

Comments
 (0)