Skip to content

Commit 1f76649

Browse files
committed
kvserver: log receiver snapshot trace on context errors
In cockroachdb#106363, the receiver side of snapshots was modified to log the trace of the current context when context errors (such as timeouts or client-side context cancellation) occur. This refactors that change to log these traces whenever there is any context-based errors that occur during receiver-side handling of the MultiRaft/RaftSnapshot streaming RPC. This includes errors while receiving requests or sending responses from the server side of a snapshot - i.e. any time that the traces cannot be collected and returned to the client. Additionally, testing for these scenarios has been incorporated as well. Part of: cockroachdb#105820 Release note: None
1 parent a6dbdee commit 1f76649

File tree

8 files changed

+243
-76
lines changed

8 files changed

+243
-76
lines changed

pkg/kv/kvserver/client_merge_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4813,7 +4813,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
48134813
1: {
48144814
Knobs: base.TestingKnobs{
48154815
Store: &kvserver.StoreTestingKnobs{
4816-
ReceiveSnapshot: func(header *kvserverpb.SnapshotRequest_Header) error {
4816+
ReceiveSnapshot: func(_ context.Context, header *kvserverpb.SnapshotRequest_Header) error {
48174817
val := delaySnapshotTrap.Load()
48184818
if val != nil {
48194819
fn := val.(func() error)

pkg/kv/kvserver/client_migration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
139139
blockSnapshotsCh := make(chan struct{})
140140
knobs, ltk := makeReplicationTestKnobs()
141141
ltk.storeKnobs.DisableRaftSnapshotQueue = true // we'll control it ourselves
142-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
142+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
143143
// We'll want a signal for when the snapshot was received by the sender.
144144
once.Do(func() { close(blockUntilSnapshotCh) })
145145

pkg/kv/kvserver/client_raft_test.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"math"
1818
"math/rand"
1919
"reflect"
20+
"regexp"
2021
"strconv"
2122
"strings"
2223
"sync"
@@ -58,10 +59,13 @@ import (
5859
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
5960
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
6061
"github.com/cockroachdb/cockroach/pkg/util/log"
62+
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
6163
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
6264
"github.com/cockroachdb/cockroach/pkg/util/randutil"
6365
"github.com/cockroachdb/cockroach/pkg/util/stop"
6466
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
67+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
68+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
6569
"github.com/cockroachdb/cockroach/pkg/util/uuid"
6670
"github.com/cockroachdb/errors"
6771
"github.com/stretchr/testify/assert"
@@ -1460,6 +1464,187 @@ func (c fakeSnapshotStream) Send(request *kvserverpb.SnapshotResponse) error {
14601464
return nil
14611465
}
14621466

1467+
type snapshotTestSignals struct {
1468+
// Receiver-side wait channels.
1469+
receiveErrCh chan error
1470+
batchReceiveReadyCh chan struct{}
1471+
1472+
// Sender-side wait channels.
1473+
svrContextDone <-chan struct{}
1474+
receiveStartedCh chan struct{}
1475+
batchReceiveStartedCh chan struct{}
1476+
receiverDoneCh chan struct{}
1477+
}
1478+
1479+
// TestReceiveSnapshotLogging tests that a snapshot receiver properly captures
1480+
// the collected tracing spans in the last response, or logs the span if the
1481+
// context is cancelled from the client side.
1482+
func TestReceiveSnapshotLogging(t *testing.T) {
1483+
defer leaktest.AfterTest(t)()
1484+
defer log.Scope(t).Close(t)
1485+
1486+
const senderNodeIdx = 0
1487+
const receiverNodeIdx = 1
1488+
const dummyEventMsg = "test receive snapshot logging - dummy event"
1489+
1490+
setupTest := func(t *testing.T) (context.Context, *testcluster.TestCluster, *roachpb.RangeDescriptor, *snapshotTestSignals) {
1491+
ctx := context.Background()
1492+
1493+
signals := &snapshotTestSignals{
1494+
receiveErrCh: make(chan error),
1495+
batchReceiveReadyCh: make(chan struct{}),
1496+
1497+
svrContextDone: nil,
1498+
receiveStartedCh: make(chan struct{}),
1499+
batchReceiveStartedCh: make(chan struct{}),
1500+
receiverDoneCh: make(chan struct{}, 1),
1501+
}
1502+
1503+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
1504+
ServerArgs: base.TestServerArgs{
1505+
Knobs: base.TestingKnobs{
1506+
Store: &kvserver.StoreTestingKnobs{
1507+
DisableRaftSnapshotQueue: true,
1508+
},
1509+
},
1510+
},
1511+
ReplicationMode: base.ReplicationManual,
1512+
ServerArgsPerNode: map[int]base.TestServerArgs{
1513+
receiverNodeIdx: {
1514+
Knobs: base.TestingKnobs{
1515+
Store: &kvserver.StoreTestingKnobs{
1516+
DisableRaftSnapshotQueue: true,
1517+
ThrottleEmptySnapshots: true,
1518+
ReceiveSnapshot: func(ctx context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
1519+
t.Logf("incoming snapshot on n2")
1520+
log.Event(ctx, dummyEventMsg)
1521+
signals.svrContextDone = ctx.Done()
1522+
close(signals.receiveStartedCh)
1523+
return <-signals.receiveErrCh
1524+
},
1525+
BeforeRecvAcceptedSnapshot: func() {
1526+
t.Logf("receiving on n2")
1527+
signals.batchReceiveStartedCh <- struct{}{}
1528+
<-signals.batchReceiveReadyCh
1529+
},
1530+
HandleSnapshotDone: func() {
1531+
t.Logf("receiver on n2 completed")
1532+
signals.receiverDoneCh <- struct{}{}
1533+
},
1534+
},
1535+
},
1536+
},
1537+
},
1538+
})
1539+
1540+
_, scratchRange, err := tc.Servers[0].ScratchRangeEx()
1541+
require.NoError(t, err)
1542+
1543+
return ctx, tc, &scratchRange, signals
1544+
}
1545+
1546+
snapshotAndValidateLogs := func(t *testing.T, ctx context.Context, tc *testcluster.TestCluster, rngDesc *roachpb.RangeDescriptor, signals *snapshotTestSignals, expectTraceOnSender bool) error {
1547+
t.Helper()
1548+
1549+
repl := tc.GetFirstStoreFromServer(t, senderNodeIdx).LookupReplica(rngDesc.StartKey)
1550+
chgs := kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(receiverNodeIdx))
1551+
1552+
testStartTs := timeutil.Now()
1553+
_, pErr := repl.ChangeReplicas(ctx, rngDesc, kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs)
1554+
1555+
// When ready, flush logs and check messages from store_raft.go since
1556+
// call to repl.ChangeReplicas(..).
1557+
<-signals.receiverDoneCh
1558+
log.Flush()
1559+
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
1560+
math.MaxInt64, 100, regexp.MustCompile(`store_raft\.go`), log.WithMarkedSensitiveData)
1561+
require.NoError(t, err)
1562+
1563+
errRegexp, err := regexp.Compile(`incoming snapshot stream failed with error`)
1564+
require.NoError(t, err)
1565+
foundEntry := false
1566+
var entry logpb.Entry
1567+
for _, entry = range entries {
1568+
if errRegexp.MatchString(entry.Message) {
1569+
foundEntry = true
1570+
break
1571+
}
1572+
}
1573+
expectTraceOnReceiver := !expectTraceOnSender
1574+
require.Equal(t, expectTraceOnReceiver, foundEntry)
1575+
if expectTraceOnReceiver {
1576+
require.Contains(t, entry.Message, dummyEventMsg)
1577+
}
1578+
1579+
// Check that receiver traces were imported in sender's context on success.
1580+
clientTraces := tracing.SpanFromContext(ctx).GetConfiguredRecording()
1581+
_, receiverTraceFound := clientTraces.FindLogMessage(dummyEventMsg)
1582+
require.Equal(t, expectTraceOnSender, receiverTraceFound)
1583+
1584+
return pErr
1585+
}
1586+
1587+
t.Run("cancel on header", func(t *testing.T) {
1588+
ctx, tc, scratchRange, signals := setupTest(t)
1589+
defer tc.Stopper().Stop(ctx)
1590+
1591+
ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
1592+
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
1593+
defer sp.Finish()
1594+
1595+
ctx, cancel := context.WithCancel(ctx)
1596+
go func() {
1597+
<-signals.receiveStartedCh
1598+
cancel()
1599+
<-signals.svrContextDone
1600+
time.Sleep(10 * time.Millisecond)
1601+
signals.receiveErrCh <- errors.Errorf("header is bad")
1602+
}()
1603+
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, false /* expectTraceOnSender */)
1604+
require.Error(t, err)
1605+
})
1606+
t.Run("cancel during receive", func(t *testing.T) {
1607+
ctx, tc, scratchRange, signals := setupTest(t)
1608+
defer tc.Stopper().Stop(ctx)
1609+
1610+
ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
1611+
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
1612+
defer sp.Finish()
1613+
1614+
ctx, cancel := context.WithCancel(ctx)
1615+
close(signals.receiveErrCh)
1616+
go func() {
1617+
<-signals.receiveStartedCh
1618+
<-signals.batchReceiveStartedCh
1619+
cancel()
1620+
<-signals.svrContextDone
1621+
time.Sleep(10 * time.Millisecond)
1622+
close(signals.batchReceiveReadyCh)
1623+
}()
1624+
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, false /* expectTraceOnSender */)
1625+
require.Error(t, err)
1626+
})
1627+
t.Run("successful send", func(t *testing.T) {
1628+
ctx, tc, scratchRange, signals := setupTest(t)
1629+
defer tc.Stopper().Stop(ctx)
1630+
1631+
ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
1632+
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
1633+
defer sp.Finish()
1634+
1635+
ctx, cancel := context.WithCancel(ctx)
1636+
defer cancel()
1637+
close(signals.receiveErrCh)
1638+
close(signals.batchReceiveReadyCh)
1639+
go func() {
1640+
<-signals.receiveStartedCh
1641+
<-signals.batchReceiveStartedCh
1642+
}()
1643+
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, true /* expectTraceOnSender */)
1644+
require.NoError(t, err)
1645+
})
1646+
}
1647+
14631648
// TestFailedSnapshotFillsReservation tests that failing to finish applying an
14641649
// incoming snapshot still cleans up the outstanding reservation that was made.
14651650
func TestFailedSnapshotFillsReservation(t *testing.T) {

pkg/kv/kvserver/replica_learner_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestAddReplicaViaLearner(t *testing.T) {
142142
var receivedSnap int64
143143
blockSnapshotsCh := make(chan struct{})
144144
knobs, ltk := makeReplicationTestKnobs()
145-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
145+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
146146
if atomic.CompareAndSwapInt64(&receivedSnap, 0, 1) {
147147
close(blockUntilSnapshotCh)
148148
} else {
@@ -238,7 +238,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
238238
activateBlocking := int64(1)
239239
var count int64
240240
knobs, ltk := makeReplicationTestKnobs()
241-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
241+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
242242
if atomic.LoadInt64(&activateBlocking) > 0 {
243243
// Signal waitForRebalanceToBlockCh to indicate the testing knob was hit.
244244
close(waitForRebalanceToBlockCh)
@@ -250,7 +250,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
250250
ltk.storeKnobs.BeforeSendSnapshotThrottle = func() {
251251
atomic.AddInt64(&count, 1)
252252
}
253-
ltk.storeKnobs.AfterSendSnapshotThrottle = func() {
253+
ltk.storeKnobs.AfterSnapshotThrottle = func() {
254254
atomic.AddInt64(&count, -1)
255255
}
256256
ctx := context.Background()
@@ -492,7 +492,7 @@ func TestDelegateSnapshotFails(t *testing.T) {
492492
}
493493

494494
setupFn := func(t *testing.T,
495-
receiveFunc func(*kvserverpb.SnapshotRequest_Header) error,
495+
receiveFunc func(context.Context, *kvserverpb.SnapshotRequest_Header) error,
496496
sendFunc func(*kvserverpb.DelegateSendSnapshotRequest),
497497
processRaft func(roachpb.StoreID) bool,
498498
) (
@@ -618,7 +618,7 @@ func TestDelegateSnapshotFails(t *testing.T) {
618618
var block atomic.Int32
619619
tc, scratchKey := setupFn(
620620
t,
621-
func(h *kvserverpb.SnapshotRequest_Header) error {
621+
func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
622622
// TODO(abaptist): Remove this check once #96841 is fixed.
623623
if h.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE {
624624
return nil
@@ -862,7 +862,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
862862
runTest := func(t *testing.T, replicaType roachpb.ReplicaType) {
863863
var rejectSnapshotErr atomic.Value // error
864864
knobs, ltk := makeReplicationTestKnobs()
865-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
865+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
866866
if err := rejectSnapshotErr.Load().(error); err != nil {
867867
return err
868868
}
@@ -1374,7 +1374,7 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
13741374
blockSnapshotsCh := make(chan struct{})
13751375
knobs, ltk := makeReplicationTestKnobs()
13761376
ltk.storeKnobs.DisableRaftSnapshotQueue = true
1377-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
1377+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
13781378
select {
13791379
case <-blockSnapshotsCh:
13801380
case <-time.After(10 * time.Second):
@@ -1438,7 +1438,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
14381438
blockUntilSnapshotCh := make(chan struct{}, 2)
14391439
blockSnapshotsCh := make(chan struct{})
14401440
knobs, ltk := makeReplicationTestKnobs()
1441-
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
1441+
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
14421442
blockUntilSnapshotCh <- struct{}{}
14431443
<-blockSnapshotsCh
14441444
return nil
@@ -1991,7 +1991,7 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
19911991
// Disable load-based splitting, so that the absence of sufficient
19921992
// QPS measurements do not prevent ranges from merging.
19931993
DisableLoadBasedSplitting: true,
1994-
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
1994+
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
19951995
if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 {
19961996
// While the snapshot RPC should only happen once given
19971997
// that the cluster is running under manual replication,

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) {
840840
t, 4, base.TestClusterArgs{
841841
ReplicationMode: base.ReplicationManual,
842842
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
843-
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
843+
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
844844
if atomic.LoadInt64(&rejectSnapshots) == 1 {
845845
return errors.Newf("boom")
846846
}
@@ -967,7 +967,7 @@ func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
967967
t, 4, base.TestClusterArgs{
968968
ReplicationMode: base.ReplicationManual,
969969
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
970-
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
970+
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
971971
if atomic.LoadInt64(&rejectSnapshots) == 1 {
972972
return errors.Newf("boom")
973973
}

pkg/kv/kvserver/store_raft.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,22 @@ func (s *Store) HandleDelegatedSnapshot(
203203
func (s *Store) HandleSnapshot(
204204
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream SnapshotResponseStream,
205205
) error {
206+
if fn := s.cfg.TestingKnobs.HandleSnapshotDone; fn != nil {
207+
defer fn()
208+
}
206209
ctx = s.AnnotateCtx(ctx)
207210
const name = "storage.Store: handle snapshot"
208211
return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error {
209212
s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1)
210213

211-
return s.receiveSnapshot(ctx, header, stream)
214+
err := s.receiveSnapshot(ctx, header, stream)
215+
if err != nil && ctx.Err() != nil {
216+
// Log trace of incoming snapshot on context cancellation (e.g.
217+
// times out or caller goes away).
218+
log.Infof(ctx, "incoming snapshot stream failed with error: %v\ntrace:\n%v",
219+
err, tracing.SpanFromContext(ctx).GetConfiguredRecording())
220+
}
221+
return err
212222
})
213223
}
214224

0 commit comments

Comments
 (0)