Skip to content

Commit 1f1b397

Browse files
mw5hclaude
andcommitted
rowexec: fix deadlock when processors panic during execution
Before this change, when the sampleAggregator or sampler processors panicked during row processing, cleanup code (ConsumerClosed() and ProducerDone()) was never executed. This left producer goroutines blocked indefinitely on channel sends, which prevented the flow from completing. During cluster drain operations, this caused the drain to hang indefinitely waiting for flows to finish. This change adds deferred cleanup to both processors' Run() methods, ensuring that ConsumerClosed() is called even when a panic occurs. This unblocks stuck producers and allows panics to be properly recovered without causing deadlocks. A new test verifies the fix by injecting a panic via testing knob and confirming that producer goroutines complete successfully. Fixes: #160337 Release note (bug fix): Fixed a deadlock that could occur when a statistics creation task panicked. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 7dae1ae commit 1f1b397

File tree

4 files changed

+211
-18
lines changed

4 files changed

+211
-18
lines changed

pkg/sql/execinfra/server_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ type TestingKnobs struct {
345345
// TableReaderStartScanCb, when non-nil, will be called whenever the
346346
// TableReader processor starts its scan.
347347
TableReaderStartScanCb func()
348+
349+
// SampleAggregatorTestingKnobRowHook, if non-nil, is called for each row
350+
// processed by the sample aggregator. Used for testing, e.g., to inject panics.
351+
SampleAggregatorTestingKnobRowHook func()
348352
}
349353

350354
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/sql/rowexec/sample_aggregator.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,21 @@ func (s *sampleAggregator) Run(ctx context.Context, output execinfra.RowReceiver
191191
ctx = s.StartInternal(ctx, sampleAggregatorProcName)
192192
s.input.Start(ctx)
193193

194-
earlyExit, err := s.mainLoop(ctx, output)
195-
if err != nil {
196-
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
197-
} else if !earlyExit {
198-
execinfra.SendTraceData(ctx, s.FlowCtx, output)
199-
s.input.ConsumerClosed()
200-
output.ProducerDone()
201-
}
202-
s.MoveToDraining(nil /* err */)
194+
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
195+
var earlyExit bool
196+
var err error
197+
defer func() {
198+
if err != nil {
199+
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
200+
} else if !earlyExit {
201+
execinfra.SendTraceData(ctx, s.FlowCtx, output)
202+
s.input.ConsumerClosed()
203+
output.ProducerDone()
204+
}
205+
s.MoveToDraining(nil /* err */)
206+
}()
207+
208+
earlyExit, err = s.mainLoop(ctx, output)
203209
}
204210

205211
// Close is part of the execinfra.Processor interface.
@@ -249,6 +255,7 @@ func (s *sampleAggregator) mainLoop(
249255
var da tree.DatumAlloc
250256
for {
251257
row, meta := s.input.Next()
258+
252259
if meta != nil {
253260
if meta.SamplerProgress != nil {
254261
rowsProcessed += meta.SamplerProgress.RowsProcessed
@@ -296,6 +303,8 @@ func (s *sampleAggregator) mainLoop(
296303
}
297304
if row == nil {
298305
break
306+
} else if s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook != nil {
307+
s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook()
299308
}
300309

301310
// There are four kinds of rows. They should be identified in this order:

pkg/sql/rowexec/sample_aggregator_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
gosql "database/sql"
1111
"fmt"
1212
"reflect"
13+
"sync"
1314
"testing"
15+
"time"
1416

1517
"github.com/cockroachdb/cockroach/pkg/base"
1618
"github.com/cockroachdb/cockroach/pkg/gossip"
@@ -21,15 +23,18 @@ import (
2123
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2224
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
2325
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
26+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2427
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2528
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2629
"github.com/cockroachdb/cockroach/pkg/sql/stats"
2730
"github.com/cockroachdb/cockroach/pkg/sql/types"
2831
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
2932
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
33+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3034
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3135
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3236
"github.com/cockroachdb/cockroach/pkg/util/log"
37+
"github.com/cockroachdb/cockroach/pkg/util/mon"
3338
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
3439
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3540
"github.com/cockroachdb/errors"
@@ -451,3 +456,172 @@ func TestSampleAggregator(t *testing.T) {
451456
})
452457
}
453458
}
459+
460+
// TestPanicDeadlock verifies that the defer-based cleanup fix prevents the deadlock
461+
// using the real sampleAggregator processor.
462+
//
463+
// The original deadlock scenario (issue #160337):
464+
// 1. A processor panics during execution (without deferred cleanup)
465+
// 2. Producer goroutines are blocked sending to it via RowChannel
466+
// 3. The panic is recovered but the processor never calls ConsumerClosed()
467+
// 4. Producers remain stuck on channel sends indefinitely
468+
// 5. Wait() blocks forever waiting for producer goroutines
469+
// 6. Cleanup is never called, so UnregisterFlow never happens
470+
// 7. Drain waits forever for the flow to unregister
471+
//
472+
// The fix uses defer to ensure ConsumerClosed() is called even on panic,
473+
// which drains the channel and unblocks stuck producers.
474+
func TestPanicDeadlock(t *testing.T) {
475+
defer leaktest.AfterTest(t)()
476+
skip.UnderStress(t, "test has a 10-second timeout to detect deadlock")
477+
478+
ctx, cancel := context.WithCancel(context.Background())
479+
defer cancel()
480+
481+
// Set up minimal infrastructure for sampleAggregator
482+
st := cluster.MakeTestingClusterSettings()
483+
evalCtx := eval.MakeTestingEvalContext(st)
484+
defer evalCtx.Stop(ctx)
485+
486+
monitor := mon.NewMonitor(mon.Options{
487+
Name: mon.MakeName("test"),
488+
Settings: st,
489+
})
490+
monitor.Start(ctx, nil, mon.NewStandaloneBudget(1<<30))
491+
defer monitor.Stop(ctx)
492+
493+
// Set up testing knob to inject panic after first row
494+
var rowsSeen int
495+
flowCtx := &execinfra.FlowCtx{
496+
EvalCtx: &evalCtx,
497+
Mon: monitor,
498+
Cfg: &execinfra.ServerConfig{
499+
Settings: st,
500+
TestingKnobs: execinfra.TestingKnobs{
501+
SampleAggregatorTestingKnobRowHook: func() {
502+
rowsSeen++
503+
if rowsSeen >= 1 {
504+
panic("sampleAggregator test: injected panic")
505+
}
506+
},
507+
},
508+
},
509+
}
510+
511+
// SampleAggregator expects sampler output format: original columns + 8 metadata columns
512+
// Sampler adds: rank, sketch_idx, num_rows, num_nulls, size, sketch_data, inv_col_idx, inv_idx_key
513+
samplerOutTypes := []*types.T{
514+
types.Int, // original column (the data being sampled)
515+
types.Int, // rank
516+
types.Int, // sketch index
517+
types.Int, // num rows
518+
types.Int, // num nulls
519+
types.Int, // size
520+
types.Bytes, // sketch data
521+
types.Int, // inverted column index
522+
types.Bytes, // inverted index key
523+
}
524+
525+
// Use unbuffered channel to ensure blocking happens immediately
526+
rowChan := &execinfra.RowChannel{}
527+
rowChan.InitWithBufSizeAndNumSenders(samplerOutTypes, 0 /* unbuffered */, 1 /* numSenders */)
528+
rowChan.Start(ctx)
529+
530+
// Create the real sampleAggregator
531+
spec := &execinfrapb.SampleAggregatorSpec{
532+
SampleSize: 100,
533+
MinSampleSize: 10,
534+
Sketches: []execinfrapb.SketchSpec{
535+
{
536+
Columns: []uint32{0},
537+
GenerateHistogram: false,
538+
StatName: "test",
539+
},
540+
},
541+
}
542+
post := &execinfrapb.PostProcessSpec{}
543+
544+
proc, err := newSampleAggregator(ctx, flowCtx, 0 /* processorID */, spec, rowChan, post)
545+
if err != nil {
546+
t.Fatal(err)
547+
}
548+
549+
// Create output channel (sampleAggregator outputs stats results)
550+
outputChan := &execinfra.RowChannel{}
551+
outputChan.InitWithBufSizeAndNumSenders([]*types.T{types.Bytes}, 10, 1)
552+
outputChan.Start(ctx)
553+
554+
// Track producer goroutine
555+
var producerWg sync.WaitGroup
556+
producerWg.Add(1)
557+
558+
// Start producer goroutine that sends rows to the processor
559+
go func() {
560+
defer producerWg.Done()
561+
defer rowChan.ProducerDone()
562+
563+
// Create a sampler-format row with all 9 columns
564+
row := rowenc.EncDatumRow{
565+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // original column
566+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // rank
567+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // sketch index
568+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // num rows
569+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // num nulls
570+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // size
571+
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // sketch data
572+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(-1)), // inverted column index (-1 = not used)
573+
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // inverted index key
574+
}
575+
576+
// Send multiple rows. The processor will panic after reading the first one.
577+
// WITH FIX: The defer in sampleAggregator.Run() calls ConsumerClosed() which drains the channel
578+
// WITHOUT FIX: Producer would stay blocked forever
579+
for i := 0; i < 5; i++ {
580+
rowChan.Push(row, nil)
581+
}
582+
}()
583+
584+
// Run the processor in a separate goroutine (simulates flow.Run)
585+
processorDone := make(chan bool)
586+
var processorPanic interface{}
587+
588+
go func() {
589+
defer func() {
590+
// Simulates Wait() catching the panic
591+
processorPanic = recover()
592+
if processorPanic != nil {
593+
// Simulates ctxCancel() being called
594+
cancel()
595+
}
596+
close(processorDone)
597+
}()
598+
599+
proc.Run(ctx, outputChan)
600+
}()
601+
602+
// Wait for processor to panic and exit
603+
<-processorDone
604+
605+
if processorPanic == nil {
606+
t.Fatal("expected processor to panic, but it didn't")
607+
}
608+
609+
// Now try to wait for producer goroutine with a timeout
610+
// WITH THE FIX: This should complete quickly because the defer calls
611+
// ConsumerClosed(), which drains the channel and unblocks the producer
612+
producersDone := make(chan bool)
613+
go func() {
614+
producerWg.Wait()
615+
close(producersDone)
616+
}()
617+
618+
select {
619+
case <-producersDone:
620+
// SUCCESS: Producer finished (this is what we want with the fix)
621+
t.Log("Producer finished successfully after panic - fix is working")
622+
case <-time.After(10 * time.Second):
623+
// FAILURE: Producer is deadlocked (the fix is not working)
624+
t.Fatal("DEADLOCK: Producer goroutine is still blocked 10 seconds after panic. " +
625+
"The defer-based fix in sampleAggregator is not working correctly.")
626+
}
627+
}

pkg/sql/rowexec/sampler.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -213,15 +213,21 @@ func (s *samplerProcessor) Run(ctx context.Context, output execinfra.RowReceiver
213213
ctx = s.StartInternal(ctx, samplerProcName)
214214
s.input.Start(ctx)
215215

216-
earlyExit, err := s.mainLoop(ctx, output)
217-
if err != nil {
218-
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
219-
} else if !earlyExit {
220-
execinfra.SendTraceData(ctx, s.FlowCtx, output)
221-
s.input.ConsumerClosed()
222-
output.ProducerDone()
223-
}
224-
s.MoveToDraining(nil /* err */)
216+
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
217+
var earlyExit bool
218+
var err error
219+
defer func() {
220+
if err != nil {
221+
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
222+
} else if !earlyExit {
223+
execinfra.SendTraceData(ctx, s.FlowCtx, output)
224+
s.input.ConsumerClosed()
225+
output.ProducerDone()
226+
}
227+
s.MoveToDraining(nil /* err */)
228+
}()
229+
230+
earlyExit, err = s.mainLoop(ctx, output)
225231
}
226232

227233
func (s *samplerProcessor) mainLoop(

0 commit comments

Comments
 (0)