Skip to content

Commit d5d6132

Browse files
craig[bot]stevendanna
andcommitted
Merge #154717
154717: kvserver: return error from IntentScannerConstructor r=stevendanna a=stevendanna I tried to do git archeology to determine why we didn't return an error here. I think it is generally a result of the previous processor architecture which would have been running this constructor in a different async task. I'll note that if we _are_ currently seeing an error on this path, then it would potentially lead to a panic, so it seems unlikely we are seeing errors here often. If we do see an error here, it is almost surely much better to abandon this register attempt and not attach the process to the replica. My ulterior motive here is that we very much would like to be able to ensure that all messages sent to the rangefeed client flow through the same path (registration -> stream -> sender) as this helps reason about the correctness of changes. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents f30a668 + 069619f commit d5d6132

File tree

6 files changed

+78
-23
lines changed

6 files changed

+78
-23
lines changed

pkg/kv/kvserver/rangefeed/processor.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,20 @@ type Processor interface {
135135
// It is ok to start registering streams before background initialization
136136
// completes.
137137
//
138-
// The provided iterator is used to initialize the rangefeed's resolved
139-
// timestamp. It must obey the contract of an iterator used for an
140-
// initResolvedTSScan. The Processor promises to clean up the iterator by
141-
// calling its Close method when it is finished.
138+
// The provided IntentScannerConstructor is used to construct a lock table
139+
// iterator which will be used to initialize the rangefeed's resolved
140+
// timestamp. It must be called under the same raftMu lock as first call to
141+
// Register to ensure that there are no missing events.
142142
//
143-
// Note that newRtsIter must be called under the same lock as first
144-
// registration to ensure that all there would be no missing events.
145-
// This is currently achieved by Register function synchronizing with
146-
// the work loop before the lock is released.
143+
// If IntentScannerConstructor returns a non-nil error, the processor will be
144+
// stopped. Otherwise, the intent scanner is successfully initialized. It must
145+
// obey the contract of an iterator used for an initResolvedTSScan. The
146+
// processor promises to clean up the iterator by calling its Close method
147+
// when it's finished.
147148
//
148-
// If the iterator is nil then no initialization scan will be performed and
149-
// the resolved timestamp will immediately be considered initialized.
149+
// If the IntentScannerConstructor is nil then no initialization scan will be
150+
// performed and the resolved timestamp will immediately be considered
151+
// initialized. This should only be the case in tests.
150152
Start(stopper *stop.Stopper, newRtsIter IntentScannerConstructor) error
151153
// Stop processor and close all registrations.
152154
//
@@ -306,4 +308,4 @@ type logicalOpMetadata struct {
306308
// IntentScannerConstructor is used to construct an IntentScanner. It
307309
// should be called from underneath a stopper task to ensure that the
308310
// engine has not been closed.
309-
type IntentScannerConstructor func() IntentScanner
311+
type IntentScannerConstructor func() (IntentScanner, error)

pkg/kv/kvserver/rangefeed/processor_helpers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ func withMetrics(m *Metrics) option {
311311
func withRtsScanner(scanner IntentScanner) option {
312312
return func(config *testConfig) {
313313
if scanner != nil {
314-
config.isc = func() IntentScanner {
315-
return scanner
314+
config.isc = func() (IntentScanner, error) {
315+
return scanner, nil
316316
}
317317
}
318318
}

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2828
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2929
"github.com/cockroachdb/cockroach/pkg/util/mon"
30+
"github.com/cockroachdb/cockroach/pkg/util/stop"
3031
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3132
"github.com/cockroachdb/errors"
3233
"github.com/stretchr/testify/assert"
@@ -1582,3 +1583,52 @@ func TestProcessorContextCancellation(t *testing.T) {
15821583
}
15831584
})
15841585
}
1586+
1587+
// TestIntentScannerOnError tests that when a processor is given with an intent
1588+
// scanner constructor that fails to create a scanner, the processor will fail
1589+
// to start gracefully.
1590+
func TestIntentScannerOnError(t *testing.T) {
1591+
defer leaktest.AfterTest(t)()
1592+
1593+
ctx := context.Background()
1594+
stopper := stop.NewStopper()
1595+
defer stopper.Stop(ctx)
1596+
1597+
cfg := testConfig{
1598+
Config: Config{
1599+
RangeID: 2,
1600+
Stopper: stopper,
1601+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1602+
Metrics: NewMetrics(),
1603+
Priority: true,
1604+
},
1605+
}
1606+
sch := NewScheduler(SchedulerConfig{
1607+
Workers: 1,
1608+
PriorityWorkers: 1,
1609+
Metrics: NewSchedulerMetrics(time.Second),
1610+
})
1611+
require.NoError(t, sch.Start(ctx, stopper))
1612+
cfg.Scheduler = sch
1613+
1614+
s := NewProcessor(cfg.Config)
1615+
erroringScanConstructor := func() (IntentScanner, error) {
1616+
return nil, errors.New("scanner error")
1617+
}
1618+
err := s.Start(stopper, erroringScanConstructor)
1619+
require.ErrorContains(t, err, "scanner error")
1620+
1621+
// The processor should be stopped eventually.
1622+
p := (s).(*ScheduledProcessor)
1623+
testutils.SucceedsSoon(t, func() error {
1624+
select {
1625+
case <-p.stoppedC:
1626+
_, ok := sch.shards[shardIndex(p.ID(), len(sch.shards), p.Priority)].procs[p.ID()]
1627+
require.False(t, ok)
1628+
require.False(t, sch.priorityIDs.Contains(p.ID()))
1629+
return nil
1630+
default:
1631+
return errors.New("processor not stopped")
1632+
}
1633+
})
1634+
}

pkg/kv/kvserver/rangefeed/scheduled_processor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,17 +128,24 @@ func (p *ScheduledProcessor) Start(
128128
// Launch an async task to scan over the resolved timestamp iterator and
129129
// initialize the unresolvedIntentQueue.
130130
if rtsIterFunc != nil {
131-
rtsIter := rtsIterFunc()
131+
rtsIter, err := rtsIterFunc()
132+
if err != nil {
133+
// No need to close rtsIter if error is non-nil.
134+
p.scheduler.StopProcessor()
135+
return err
136+
}
137+
132138
initScan := newInitResolvedTSScan(p.Span, p, rtsIter)
133139
// TODO(oleg): we need to cap number of tasks that we can fire up across
134140
// all feeds as they could potentially generate O(n) tasks during start.
135-
err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run)
141+
err = stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run)
136142
if err != nil {
137143
initScan.Cancel()
138144
p.scheduler.StopProcessor()
139145
return err
140146
}
141147
} else {
148+
// This case should only be reached in tests.
142149
p.initResolvedTS(p.taskCtx, nil)
143150
}
144151

pkg/kv/kvserver/replica_rangefeed.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -514,19 +514,13 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
514514
p = rangefeed.NewProcessor(cfg)
515515

516516
// Start it with an iterator to initialize the resolved timestamp.
517-
rtsIter := func() rangefeed.IntentScanner {
517+
rtsIter := func() (rangefeed.IntentScanner, error) {
518518
// Assert that we still hold the raftMu when this is called to ensure
519519
// that the rtsIter reads from the current snapshot. The replica
520520
// synchronizes with the rangefeed Processor calling this function by
521521
// waiting for the Register call below to return.
522522
r.raftMu.AssertHeld()
523-
524-
scanner, err := rangefeed.NewSeparatedIntentScanner(streamCtx, r.store.TODOEngine(), desc.RSpan())
525-
if err != nil {
526-
stream.SendError(kvpb.NewError(err))
527-
return nil
528-
}
529-
return scanner
523+
return rangefeed.NewSeparatedIntentScanner(streamCtx, r.store.TODOEngine(), desc.RSpan())
530524
}
531525

532526
// NB: This only errors if the stopper is stopping, and we have to return here

pkg/server/node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2308,6 +2308,8 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error
23082308
// stream manager. If rangefeed disconnects with an error after being
23092309
// successfully registered, it calls streamSink.SendError.
23102310
if disconnector, err := n.stores.RangeFeed(streamCtx, req, streamSink, limiter); err != nil {
2311+
// The rangefeed was not registered, so it should be safe to send this
2312+
// error directly to the stream rather than via the registration.
23112313
streamSink.SendError(kvpb.NewError(err))
23122314
} else {
23132315
sm.AddStream(req.StreamID, disconnector)

0 commit comments

Comments
 (0)