Skip to content

Commit f9c10b9

Browse files
authored
Avoid terminating atomic copy workflows on error if they are out of copy phase (vitessio#18475)
Signed-off-by: Noble Mittal <[email protected]>
1 parent f4a735b commit f9c10b9

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

go/test/endtoend/vreplication/fk_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"vitess.io/vitess/go/sqltypes"
3131
"vitess.io/vitess/go/test/endtoend/cluster"
3232
"vitess.io/vitess/go/vt/log"
33+
vttablet "vitess.io/vitess/go/vt/vttablet/common"
3334

3435
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
3536
)
@@ -102,6 +103,49 @@ func TestFKWorkflow(t *testing.T) {
102103
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
103104
targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace]
104105
targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet
106+
107+
// Stop the LoadSimulator while we are testing for workflow error, so that
108+
// we don't error out in the LoadSimulator as we will be shutting down source dbServer.
109+
if withLoad {
110+
cancel()
111+
<-ch
112+
}
113+
114+
sourceTab := vc.Cells[cellName].Keyspaces[sourceKeyspace].Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, 100)]
115+
116+
// Stop the source database server to simulate an error during replication phase
117+
// This should cause recoverable errors that atomic workflows should retry
118+
// as it is already out of copy phase.
119+
err := sourceTab.DbServer.Stop()
120+
require.NoError(t, err)
121+
122+
// Give some time for the workflow to encounter errors and potentially retry
123+
time.Sleep(2 * vttablet.GetDefaultVReplicationConfig().RetryDelay)
124+
125+
// Verify workflow is still running and hasn't terminated due to errors
126+
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
127+
128+
// Restart the source database to allow workflow to continue
129+
err = sourceTab.DbServer.StartProvideInit(false)
130+
require.NoError(t, err)
131+
132+
err = vc.VtctldClient.ExecuteCommand("SetWritable", fmt.Sprintf("%s-%d", cellName, 100), "true")
133+
require.NoError(t, err)
134+
135+
// Restart the LoadSimulator.
136+
if withLoad {
137+
ctx, cancel = context.WithCancel(context.Background())
138+
ls = newFKLoadSimulator(t, ctx)
139+
defer func() {
140+
select {
141+
case <-ctx.Done():
142+
default:
143+
cancel()
144+
}
145+
}()
146+
go ls.simulateLoad()
147+
}
148+
105149
require.NotNil(t, targetTab)
106150
catchup(t, targetTab, workflowName, "MoveTables")
107151
vdiff(t, targetKeyspace, workflowName, cellName, nil)

go/vt/vttablet/tabletmanager/vreplication/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
305305
// it's a FAILED_PRECONDITION vterror, OR we cannot identify this as
306306
// non-recoverable BUT it has persisted beyond the retry limit
307307
// (maxTimeToRetryError). In addition, we cannot restart a workflow
308-
// started with AtomicCopy which has _any_ error.
309-
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
308+
// started with AtomicCopy which has _any_ error during copy phase.
309+
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) && vr.state == binlogdatapb.VReplicationWorkflowState_Copying) ||
310310
isUnrecoverableError(err) ||
311311
!ct.lastWorkflowError.ShouldRetry() {
312312
err = vterrors.Wrapf(err, TerminalErrorIndicator)

0 commit comments

Comments
 (0)