Skip to content

Commit 77d31e5

Browse files
[release-20.0] [Bugfix] Broken Heartbeat system in Row Streamer (vitessio#18390) (vitessio#18396)
Signed-off-by: siddharth16396 <[email protected]> Signed-off-by: Shlomi Noach <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Shlomi Noach <[email protected]>
1 parent 343e7eb commit 77d31e5

File tree

2 files changed

+105
-5
lines changed

2 files changed

+105
-5
lines changed

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,13 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
362362
heartbeatTicker := time.NewTicker(rowStreamertHeartbeatInterval)
363363
defer heartbeatTicker.Stop()
364364
go func() {
365-
select {
366-
case <-rs.ctx.Done():
367-
return
368-
case <-heartbeatTicker.C:
369-
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
365+
for {
366+
select {
367+
case <-rs.ctx.Done():
368+
return
369+
case <-heartbeatTicker.C:
370+
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
371+
}
370372
}
371373
}()
372374

go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ package vstreamer
1919
import (
2020
"context"
2121
"fmt"
22+
"os"
2223
"regexp"
2324
"testing"
25+
"time"
2426

27+
"github.com/spf13/pflag"
2528
"github.com/stretchr/testify/require"
2629

2730
"vitess.io/vitess/go/mysql"
2831
"vitess.io/vitess/go/mysql/collations"
2932
"vitess.io/vitess/go/sqltypes"
3033
"vitess.io/vitess/go/vt/log"
34+
"vitess.io/vitess/go/vt/servenv"
3135

3236
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
3337
)
@@ -432,6 +436,100 @@ func TestStreamRowsCancel(t *testing.T) {
432436
}
433437
}
434438

439+
// setFlag() sets a flag for a test in a non-racy way:
440+
// - it registers the flag using a different flagset scope
441+
// - clears other flags by passing a dummy os.Args() while parsing this flagset
442+
// - sets the specific flag, if it has not already been defined
443+
// - resets the os.Args() so that the remaining flagsets can be parsed correctly
444+
func setFlag(flagName, flagValue string) {
445+
flagSetName := "vttablet"
446+
var tmp []string
447+
tmp, os.Args = os.Args[:], []string{flagSetName}
448+
defer func() { os.Args = tmp }()
449+
450+
servenv.OnParseFor(flagSetName, func(fs *pflag.FlagSet) {
451+
if fs.Lookup(flagName) != nil {
452+
fmt.Printf("found %s: %+v", flagName, fs.Lookup(flagName).Value)
453+
return
454+
}
455+
})
456+
servenv.ParseFlags(flagSetName)
457+
458+
if err := pflag.Set(flagName, flagValue); err != nil {
459+
msg := "failed to set flag %q to %q: %v"
460+
log.Errorf(msg, flagName, flagValue, err)
461+
}
462+
}
463+
464+
func TestStreamRowsHeartbeat(t *testing.T) {
465+
if testing.Short() {
466+
t.Skip()
467+
}
468+
setFlag("vstream_packet_size", "10")
469+
defer setFlag("vstream_packet_size", "10000")
470+
471+
// Save original heartbeat interval and restore it after test
472+
originalInterval := rowStreamertHeartbeatInterval
473+
defer func() {
474+
rowStreamertHeartbeatInterval = originalInterval
475+
}()
476+
477+
// Set a very short heartbeat interval for testing (100ms)
478+
rowStreamertHeartbeatInterval = 10 * time.Millisecond
479+
480+
execStatements(t, []string{
481+
"create table t1(id int, val varchar(128), primary key(id))",
482+
"insert into t1 values (1, 'test1')",
483+
"insert into t1 values (2, 'test2')",
484+
"insert into t1 values (3, 'test3')",
485+
"insert into t1 values (4, 'test4')",
486+
"insert into t1 values (5, 'test5')",
487+
})
488+
489+
defer execStatements(t, []string{
490+
"drop table t1",
491+
})
492+
493+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
494+
defer cancel()
495+
496+
heartbeatCount := 0
497+
dataReceived := false
498+
499+
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
500+
if rows.Heartbeat {
501+
heartbeatCount++
502+
// After receiving at least 3 heartbeats, we can be confident the fix is working
503+
if heartbeatCount >= 3 {
504+
cancel()
505+
return nil
506+
}
507+
} else if len(rows.Rows) > 0 {
508+
dataReceived = true
509+
}
510+
// Add a small delay to allow heartbeats to be sent
511+
time.Sleep(50 * time.Millisecond)
512+
return nil
513+
})
514+
515+
// We expect context canceled error since we cancel after receiving heartbeats
516+
if err != nil && err.Error() != "stream ended: context canceled" {
517+
t.Errorf("unexpected error: %v", err)
518+
}
519+
520+
// Verify we received data
521+
if !dataReceived {
522+
t.Error("expected to receive data rows")
523+
}
524+
525+
// This is the critical test: we should receive multiple heartbeats
526+
// Without the fix (missing for loop), we would only get 1 heartbeat
527+
// With the fix, we should get at least 3 heartbeats
528+
if heartbeatCount < 3 {
529+
t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount)
530+
}
531+
}
532+
435533
func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery string, wantStream []string) {
436534
t.Helper()
437535

0 commit comments

Comments
 (0)