Skip to content

Commit d43698f

Browse files
Merge 1f1b397 into blathers/backport-release-25.4.3-rc-160421
2 parents e28343e + 1f1b397 commit d43698f

File tree

4 files changed

+211
-18
lines changed

4 files changed

+211
-18
lines changed

pkg/sql/execinfra/server_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ type TestingKnobs struct {
345345
// TableReaderStartScanCb, when non-nil, will be called whenever the
346346
// TableReader processor starts its scan.
347347
TableReaderStartScanCb func()
348+
349+
// SampleAggregatorTestingKnobRowHook, if non-nil, is called for each row
350+
// processed by the sample aggregator. Used for testing, e.g., to inject panics.
351+
SampleAggregatorTestingKnobRowHook func()
348352
}
349353

350354
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/sql/rowexec/sample_aggregator.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,21 @@ func (s *sampleAggregator) Run(ctx context.Context, output execinfra.RowReceiver
191191
ctx = s.StartInternal(ctx, sampleAggregatorProcName)
192192
s.input.Start(ctx)
193193

194-
earlyExit, err := s.mainLoop(ctx, output)
195-
if err != nil {
196-
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
197-
} else if !earlyExit {
198-
execinfra.SendTraceData(ctx, s.FlowCtx, output)
199-
s.input.ConsumerClosed()
200-
output.ProducerDone()
201-
}
202-
s.MoveToDraining(nil /* err */)
194+
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
195+
var earlyExit bool
196+
var err error
197+
defer func() {
198+
if err != nil {
199+
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
200+
} else if !earlyExit {
201+
execinfra.SendTraceData(ctx, s.FlowCtx, output)
202+
s.input.ConsumerClosed()
203+
output.ProducerDone()
204+
}
205+
s.MoveToDraining(nil /* err */)
206+
}()
207+
208+
earlyExit, err = s.mainLoop(ctx, output)
203209
}
204210

205211
// Close is part of the execinfra.Processor interface.
@@ -249,6 +255,7 @@ func (s *sampleAggregator) mainLoop(
249255
var da tree.DatumAlloc
250256
for {
251257
row, meta := s.input.Next()
258+
252259
if meta != nil {
253260
if meta.SamplerProgress != nil {
254261
rowsProcessed += meta.SamplerProgress.RowsProcessed
@@ -296,6 +303,8 @@ func (s *sampleAggregator) mainLoop(
296303
}
297304
if row == nil {
298305
break
306+
} else if s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook != nil {
307+
s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook()
299308
}
300309

301310
// There are four kinds of rows. They should be identified in this order:

pkg/sql/rowexec/sample_aggregator_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
gosql "database/sql"
1111
"fmt"
1212
"reflect"
13+
"sync"
1314
"testing"
15+
"time"
1416

1517
"github.com/cockroachdb/cockroach/pkg/base"
1618
"github.com/cockroachdb/cockroach/pkg/gossip"
@@ -21,15 +23,18 @@ import (
2123
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2224
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
2325
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
26+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2427
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2528
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2629
"github.com/cockroachdb/cockroach/pkg/sql/stats"
2730
"github.com/cockroachdb/cockroach/pkg/sql/types"
2831
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
2932
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
33+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3034
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3135
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3236
"github.com/cockroachdb/cockroach/pkg/util/log"
37+
"github.com/cockroachdb/cockroach/pkg/util/mon"
3338
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
3439
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3540
"github.com/cockroachdb/errors"
@@ -451,3 +456,172 @@ func TestSampleAggregator(t *testing.T) {
451456
})
452457
}
453458
}
459+
460+
// TestPanicDeadlock verifies that the defer-based cleanup fix prevents the deadlock
461+
// using the real sampleAggregator processor.
462+
//
463+
// The original deadlock scenario (issue #160337):
464+
// 1. A processor panics during execution (without deferred cleanup)
465+
// 2. Producer goroutines are blocked sending to it via RowChannel
466+
// 3. The panic is recovered but the processor never calls ConsumerClosed()
467+
// 4. Producers remain stuck on channel sends indefinitely
468+
// 5. Wait() blocks forever waiting for producer goroutines
469+
// 6. Cleanup is never called, so UnregisterFlow never happens
470+
// 7. Drain waits forever for the flow to unregister
471+
//
472+
// The fix uses defer to ensure ConsumerClosed() is called even on panic,
473+
// which drains the channel and unblocks stuck producers.
474+
func TestPanicDeadlock(t *testing.T) {
475+
defer leaktest.AfterTest(t)()
476+
skip.UnderStress(t, "test has a 10-second timeout to detect deadlock")
477+
478+
ctx, cancel := context.WithCancel(context.Background())
479+
defer cancel()
480+
481+
// Set up minimal infrastructure for sampleAggregator
482+
st := cluster.MakeTestingClusterSettings()
483+
evalCtx := eval.MakeTestingEvalContext(st)
484+
defer evalCtx.Stop(ctx)
485+
486+
monitor := mon.NewMonitor(mon.Options{
487+
Name: mon.MakeName("test"),
488+
Settings: st,
489+
})
490+
monitor.Start(ctx, nil, mon.NewStandaloneBudget(1<<30))
491+
defer monitor.Stop(ctx)
492+
493+
// Set up testing knob to inject panic after first row
494+
var rowsSeen int
495+
flowCtx := &execinfra.FlowCtx{
496+
EvalCtx: &evalCtx,
497+
Mon: monitor,
498+
Cfg: &execinfra.ServerConfig{
499+
Settings: st,
500+
TestingKnobs: execinfra.TestingKnobs{
501+
SampleAggregatorTestingKnobRowHook: func() {
502+
rowsSeen++
503+
if rowsSeen >= 1 {
504+
panic("sampleAggregator test: injected panic")
505+
}
506+
},
507+
},
508+
},
509+
}
510+
511+
// SampleAggregator expects sampler output format: original columns + 8 metadata columns
512+
// Sampler adds: rank, sketch_idx, num_rows, num_nulls, size, sketch_data, inv_col_idx, inv_idx_key
513+
samplerOutTypes := []*types.T{
514+
types.Int, // original column (the data being sampled)
515+
types.Int, // rank
516+
types.Int, // sketch index
517+
types.Int, // num rows
518+
types.Int, // num nulls
519+
types.Int, // size
520+
types.Bytes, // sketch data
521+
types.Int, // inverted column index
522+
types.Bytes, // inverted index key
523+
}
524+
525+
// Use unbuffered channel to ensure blocking happens immediately
526+
rowChan := &execinfra.RowChannel{}
527+
rowChan.InitWithBufSizeAndNumSenders(samplerOutTypes, 0 /* unbuffered */, 1 /* numSenders */)
528+
rowChan.Start(ctx)
529+
530+
// Create the real sampleAggregator
531+
spec := &execinfrapb.SampleAggregatorSpec{
532+
SampleSize: 100,
533+
MinSampleSize: 10,
534+
Sketches: []execinfrapb.SketchSpec{
535+
{
536+
Columns: []uint32{0},
537+
GenerateHistogram: false,
538+
StatName: "test",
539+
},
540+
},
541+
}
542+
post := &execinfrapb.PostProcessSpec{}
543+
544+
proc, err := newSampleAggregator(ctx, flowCtx, 0 /* processorID */, spec, rowChan, post)
545+
if err != nil {
546+
t.Fatal(err)
547+
}
548+
549+
// Create output channel (sampleAggregator outputs stats results)
550+
outputChan := &execinfra.RowChannel{}
551+
outputChan.InitWithBufSizeAndNumSenders([]*types.T{types.Bytes}, 10, 1)
552+
outputChan.Start(ctx)
553+
554+
// Track producer goroutine
555+
var producerWg sync.WaitGroup
556+
producerWg.Add(1)
557+
558+
// Start producer goroutine that sends rows to the processor
559+
go func() {
560+
defer producerWg.Done()
561+
defer rowChan.ProducerDone()
562+
563+
// Create a sampler-format row with all 9 columns
564+
row := rowenc.EncDatumRow{
565+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // original column
566+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // rank
567+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // sketch index
568+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // num rows
569+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // num nulls
570+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // size
571+
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // sketch data
572+
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(-1)), // inverted column index (-1 = not used)
573+
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // inverted index key
574+
}
575+
576+
// Send multiple rows. The processor will panic after reading the first one.
577+
// WITH FIX: The defer in sampleAggregator.Run() calls ConsumerClosed() which drains the channel
578+
// WITHOUT FIX: Producer would stay blocked forever
579+
for i := 0; i < 5; i++ {
580+
rowChan.Push(row, nil)
581+
}
582+
}()
583+
584+
// Run the processor in a separate goroutine (simulates flow.Run)
585+
processorDone := make(chan bool)
586+
var processorPanic interface{}
587+
588+
go func() {
589+
defer func() {
590+
// Simulates Wait() catching the panic
591+
processorPanic = recover()
592+
if processorPanic != nil {
593+
// Simulates ctxCancel() being called
594+
cancel()
595+
}
596+
close(processorDone)
597+
}()
598+
599+
proc.Run(ctx, outputChan)
600+
}()
601+
602+
// Wait for processor to panic and exit
603+
<-processorDone
604+
605+
if processorPanic == nil {
606+
t.Fatal("expected processor to panic, but it didn't")
607+
}
608+
609+
// Now try to wait for producer goroutine with a timeout
610+
// WITH THE FIX: This should complete quickly because the defer calls
611+
// ConsumerClosed(), which drains the channel and unblocks the producer
612+
producersDone := make(chan bool)
613+
go func() {
614+
producerWg.Wait()
615+
close(producersDone)
616+
}()
617+
618+
select {
619+
case <-producersDone:
620+
// SUCCESS: Producer finished (this is what we want with the fix)
621+
t.Log("Producer finished successfully after panic - fix is working")
622+
case <-time.After(10 * time.Second):
623+
// FAILURE: Producer is deadlocked (the fix is not working)
624+
t.Fatal("DEADLOCK: Producer goroutine is still blocked 10 seconds after panic. " +
625+
"The defer-based fix in sampleAggregator is not working correctly.")
626+
}
627+
}

pkg/sql/rowexec/sampler.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -213,15 +213,21 @@ func (s *samplerProcessor) Run(ctx context.Context, output execinfra.RowReceiver
213213
ctx = s.StartInternal(ctx, samplerProcName)
214214
s.input.Start(ctx)
215215

216-
earlyExit, err := s.mainLoop(ctx, output)
217-
if err != nil {
218-
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
219-
} else if !earlyExit {
220-
execinfra.SendTraceData(ctx, s.FlowCtx, output)
221-
s.input.ConsumerClosed()
222-
output.ProducerDone()
223-
}
224-
s.MoveToDraining(nil /* err */)
216+
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
217+
var earlyExit bool
218+
var err error
219+
defer func() {
220+
if err != nil {
221+
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
222+
} else if !earlyExit {
223+
execinfra.SendTraceData(ctx, s.FlowCtx, output)
224+
s.input.ConsumerClosed()
225+
output.ProducerDone()
226+
}
227+
s.MoveToDraining(nil /* err */)
228+
}()
229+
230+
earlyExit, err = s.mainLoop(ctx, output)
225231
}
226232

227233
func (s *samplerProcessor) mainLoop(

0 commit comments

Comments
 (0)