diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 17f7fc34c004..03d2f68051aa 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -345,6 +345,10 @@ type TestingKnobs struct { // TableReaderStartScanCb, when non-nil, will be called whenever the // TableReader processor starts its scan. TableReaderStartScanCb func() + + // SampleAggregatorTestingKnobRowHook, if non-nil, is called for each row + // processed by the sample aggregator. Used for testing, e.g., to inject panics. + SampleAggregatorTestingKnobRowHook func() } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index e45c5a3f17a9..893b880c5f86 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -191,15 +191,21 @@ func (s *sampleAggregator) Run(ctx context.Context, output execinfra.RowReceiver ctx = s.StartInternal(ctx, sampleAggregatorProcName) s.input.Start(ctx) - earlyExit, err := s.mainLoop(ctx, output) - if err != nil { - execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err) - } else if !earlyExit { - execinfra.SendTraceData(ctx, s.FlowCtx, output) - s.input.ConsumerClosed() - output.ProducerDone() - } - s.MoveToDraining(nil /* err */) + // Use defer to ensure cleanup happens even on panic (fix for issue #160337). + var earlyExit bool + var err error + defer func() { + if err != nil { + execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err) + } else if !earlyExit { + execinfra.SendTraceData(ctx, s.FlowCtx, output) + s.input.ConsumerClosed() + output.ProducerDone() + } + s.MoveToDraining(nil /* err */) + }() + + earlyExit, err = s.mainLoop(ctx, output) } // Close is part of the execinfra.Processor interface. @@ -249,6 +255,7 @@ func (s *sampleAggregator) mainLoop( var da tree.DatumAlloc for { row, meta := s.input.Next() + if meta != nil { if meta.SamplerProgress != nil { rowsProcessed += meta.SamplerProgress.RowsProcessed @@ -296,6 +303,8 @@ func (s *sampleAggregator) mainLoop( } if row == nil { break + } else if s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook != nil { + s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook() } // There are four kinds of rows. They should be identified in this order: diff --git a/pkg/sql/rowexec/sample_aggregator_test.go b/pkg/sql/rowexec/sample_aggregator_test.go index 3a5e1f9b4262..0f5acf618e81 100644 --- a/pkg/sql/rowexec/sample_aggregator_test.go +++ b/pkg/sql/rowexec/sample_aggregator_test.go @@ -10,7 +10,9 @@ import ( gosql "database/sql" "fmt" "reflect" + "sync" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -21,15 +23,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execversion" "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" @@ -451,3 +456,172 @@ func TestSampleAggregator(t *testing.T) { }) } } + +// TestPanicDeadlock verifies that the defer-based cleanup fix prevents the deadlock +// using the real sampleAggregator processor. +// +// The original deadlock scenario (issue #160337): +// 1. A processor panics during execution (without deferred cleanup) +// 2. Producer goroutines are blocked sending to it via RowChannel +// 3. The panic is recovered but the processor never calls ConsumerClosed() +// 4. Producers remain stuck on channel sends indefinitely +// 5. Wait() blocks forever waiting for producer goroutines +// 6. Cleanup is never called, so UnregisterFlow never happens +// 7. Drain waits forever for the flow to unregister +// +// The fix uses defer to ensure ConsumerClosed() is called even on panic, +// which drains the channel and unblocks stuck producers. +func TestPanicDeadlock(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.UnderStress(t, "test has a 10-second timeout to detect deadlock") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up minimal infrastructure for sampleAggregator + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + + monitor := mon.NewMonitor(mon.Options{ + Name: mon.MakeName("test"), + Settings: st, + }) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(1<<30)) + defer monitor.Stop(ctx) + + // Set up testing knob to inject panic after first row + var rowsSeen int + flowCtx := &execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Mon: monitor, + Cfg: &execinfra.ServerConfig{ + Settings: st, + TestingKnobs: execinfra.TestingKnobs{ + SampleAggregatorTestingKnobRowHook: func() { + rowsSeen++ + if rowsSeen >= 1 { + panic("sampleAggregator test: injected panic") + } + }, + }, + }, + } + + // SampleAggregator expects sampler output format: original columns + 8 metadata columns + // Sampler adds: rank, sketch_idx, num_rows, num_nulls, size, sketch_data, inv_col_idx, inv_idx_key + samplerOutTypes := []*types.T{ + types.Int, // original column (the data being sampled) + types.Int, // rank + types.Int, // sketch index + types.Int, // num rows + types.Int, // num nulls + types.Int, // size + types.Bytes, // sketch data + types.Int, // inverted column index + types.Bytes, // inverted index key + } + + // Use unbuffered channel to ensure blocking happens immediately + rowChan := &execinfra.RowChannel{} + rowChan.InitWithBufSizeAndNumSenders(samplerOutTypes, 0 /* unbuffered */, 1 /* numSenders */) + rowChan.Start(ctx) + + // Create the real sampleAggregator + spec := &execinfrapb.SampleAggregatorSpec{ + SampleSize: 100, + MinSampleSize: 10, + Sketches: []execinfrapb.SketchSpec{ + { + Columns: []uint32{0}, + GenerateHistogram: false, + StatName: "test", + }, + }, + } + post := &execinfrapb.PostProcessSpec{} + + proc, err := newSampleAggregator(ctx, flowCtx, 0 /* processorID */, spec, rowChan, post) + if err != nil { + t.Fatal(err) + } + + // Create output channel (sampleAggregator outputs stats results) + outputChan := &execinfra.RowChannel{} + outputChan.InitWithBufSizeAndNumSenders([]*types.T{types.Bytes}, 10, 1) + outputChan.Start(ctx) + + // Track producer goroutine + var producerWg sync.WaitGroup + producerWg.Add(1) + + // Start producer goroutine that sends rows to the processor + go func() { + defer producerWg.Done() + defer rowChan.ProducerDone() + + // Create a sampler-format row with all 9 columns + row := rowenc.EncDatumRow{ + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // original column + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // rank + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // sketch index + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // num rows + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // num nulls + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // size + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // sketch data + rowenc.DatumToEncDatum(types.Int, tree.NewDInt(-1)), // inverted column index (-1 = not used) + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // inverted index key + } + + // Send multiple rows. The processor will panic after reading the first one. + // WITH FIX: The defer in sampleAggregator.Run() calls ConsumerClosed() which drains the channel + // WITHOUT FIX: Producer would stay blocked forever + for i := 0; i < 5; i++ { + rowChan.Push(row, nil) + } + }() + + // Run the processor in a separate goroutine (simulates flow.Run) + processorDone := make(chan bool) + var processorPanic interface{} + + go func() { + defer func() { + // Simulates Wait() catching the panic + processorPanic = recover() + if processorPanic != nil { + // Simulates ctxCancel() being called + cancel() + } + close(processorDone) + }() + + proc.Run(ctx, outputChan) + }() + + // Wait for processor to panic and exit + <-processorDone + + if processorPanic == nil { + t.Fatal("expected processor to panic, but it didn't") + } + + // Now try to wait for producer goroutine with a timeout + // WITH THE FIX: This should complete quickly because the defer calls + // ConsumerClosed(), which drains the channel and unblocks the producer + producersDone := make(chan bool) + go func() { + producerWg.Wait() + close(producersDone) + }() + + select { + case <-producersDone: + // SUCCESS: Producer finished (this is what we want with the fix) + t.Log("Producer finished successfully after panic - fix is working") + case <-time.After(10 * time.Second): + // FAILURE: Producer is deadlocked (the fix is not working) + t.Fatal("DEADLOCK: Producer goroutine is still blocked 10 seconds after panic. " + + "The defer-based fix in sampleAggregator is not working correctly.") + } +} diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 4a0aa9d573c5..81b0cc98dc12 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -213,15 +213,21 @@ func (s *samplerProcessor) Run(ctx context.Context, output execinfra.RowReceiver ctx = s.StartInternal(ctx, samplerProcName) s.input.Start(ctx) - earlyExit, err := s.mainLoop(ctx, output) - if err != nil { - execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err) - } else if !earlyExit { - execinfra.SendTraceData(ctx, s.FlowCtx, output) - s.input.ConsumerClosed() - output.ProducerDone() - } - s.MoveToDraining(nil /* err */) + // Use defer to ensure cleanup happens even on panic (fix for issue #160337). + var earlyExit bool + var err error + defer func() { + if err != nil { + execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err) + } else if !earlyExit { + execinfra.SendTraceData(ctx, s.FlowCtx, output) + s.input.ConsumerClosed() + output.ProducerDone() + } + s.MoveToDraining(nil /* err */) + }() + + earlyExit, err = s.mainLoop(ctx, output) } func (s *samplerProcessor) mainLoop(