Skip to content

Commit 8868a33

Browse files
[release-22.0] VStream API: Reset stopPos in catchup (#18119) (#18123)
Signed-off-by: Noble Mittal <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
1 parent db2c533 commit 8868a33

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

go/vt/vttablet/tabletserver/vstreamer/copy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error {
7373

7474
errch := make(chan error, 1)
7575
go func() {
76+
uvs.stopPos = replication.Position{} // reset stopPos which was potentially set during fastforward
7677
startPos := replication.EncodePosition(uvs.pos)
7778
vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "catchup", uvs.vse, nil)
7879
uvs.setVs(vs)

go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
112112
ev.Keyspace = vse.keyspace
113113
ev.Shard = vse.shard
114114
}
115-
return send(evs)
115+
err := send(evs)
116+
if err != nil {
117+
log.Infof("uvstreamer replicate send() returned with err %v", err)
118+
}
119+
return err
116120
}
117121
uvs := &uvstreamer{
118122
ctx: ctx,
@@ -327,17 +331,20 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
327331
}
328332
err := uvs.send(evs2)
329333
if err != nil && err != io.EOF {
334+
log.Infof("uvstreamer catchup/fastforward send() returning with send error %v", err)
330335
return err
331336
}
332337
for _, ev := range evs2 {
333338
if ev.Type == binlogdatapb.VEventType_GTID {
334339
uvs.pos, _ = replication.DecodePosition(ev.Gtid)
335340
if !uvs.stopPos.IsZero() && uvs.pos.AtLeast(uvs.stopPos) {
341+
log.Infof("Reached stop position %v, returning io.EOF", uvs.stopPos)
336342
err = io.EOF
337343
}
338344
}
339345
}
340346
if err != nil {
347+
log.Infof("uvstreamer catchup/fastforward returning with EOF error %v", err)
341348
uvs.vse.errorCounts.Add("Send", 1)
342349
}
343350
return err

go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
255255

256256
numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
257257
numCopyEvents += 2 /* GTID + Event after all copy is done */
258-
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
258+
numCatchupEvents := 4 * 5 /* 3 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
259259
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
260260
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
261261
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */
@@ -398,6 +398,21 @@ func initTables(t *testing.T, tables []string) {
398398
}
399399
}
400400
}
401+
callbacks["LASTPK.*t2.*complete"] = func() {
402+
ctx := context.Background()
403+
idx := 1
404+
id := numInitialRows + 100
405+
table := "t1"
406+
query1 := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)
407+
queries := []string{
408+
"begin",
409+
query1,
410+
"commit",
411+
}
412+
env.Mysqld.ExecuteSuperQueryList(ctx, queries)
413+
log.Infof("Position after insert into t1 and t2 after t2 complete: %s", primaryPosition(t))
414+
415+
}
401416
positions["afterInitialInsert"] = primaryPosition(t)
402417
}
403418

@@ -528,6 +543,11 @@ var expectedEvents = []string{
528543
"type:BEGIN",
529544
"type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\"} completed:true}",
530545
"type:COMMIT",
546+
"type:BEGIN",
547+
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}",
548+
"type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:3 lengths:4 values:\"1101100\"}}}",
549+
"type:GTID",
550+
"type:COMMIT",
531551
fmt.Sprintf("type:OTHER gtid:\"%s t3\"", copyPhaseStart),
532552
"type:BEGIN",
533553
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}",

0 commit comments

Comments
 (0)