Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ type TestingKnobs struct {
// TableReaderStartScanCb, when non-nil, will be called whenever the
// TableReader processor starts its scan.
TableReaderStartScanCb func()

// SampleAggregatorTestingKnobRowHook, if non-nil, is called for each row
// processed by the sample aggregator. Used for testing, e.g., to inject panics.
SampleAggregatorTestingKnobRowHook func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
27 changes: 18 additions & 9 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,21 @@ func (s *sampleAggregator) Run(ctx context.Context, output execinfra.RowReceiver
ctx = s.StartInternal(ctx, sampleAggregatorProcName)
s.input.Start(ctx)

earlyExit, err := s.mainLoop(ctx, output)
if err != nil {
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
} else if !earlyExit {
execinfra.SendTraceData(ctx, s.FlowCtx, output)
s.input.ConsumerClosed()
output.ProducerDone()
}
s.MoveToDraining(nil /* err */)
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
var earlyExit bool
var err error
defer func() {
if err != nil {
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
} else if !earlyExit {
execinfra.SendTraceData(ctx, s.FlowCtx, output)
s.input.ConsumerClosed()
output.ProducerDone()
}
s.MoveToDraining(nil /* err */)
}()

earlyExit, err = s.mainLoop(ctx, output)
}

// Close is part of the execinfra.Processor interface.
Expand Down Expand Up @@ -249,6 +255,7 @@ func (s *sampleAggregator) mainLoop(
var da tree.DatumAlloc
for {
row, meta := s.input.Next()

if meta != nil {
if meta.SamplerProgress != nil {
rowsProcessed += meta.SamplerProgress.RowsProcessed
Expand Down Expand Up @@ -296,6 +303,8 @@ func (s *sampleAggregator) mainLoop(
}
if row == nil {
break
} else if s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook != nil {
s.FlowCtx.Cfg.TestingKnobs.SampleAggregatorTestingKnobRowHook()
}

// There are four kinds of rows. They should be identified in this order:
Expand Down
174 changes: 174 additions & 0 deletions pkg/sql/rowexec/sample_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
gosql "database/sql"
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
Expand All @@ -21,15 +23,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -451,3 +456,172 @@ func TestSampleAggregator(t *testing.T) {
})
}
}

// TestPanicDeadlock verifies that the defer-based cleanup fix prevents the deadlock
// using the real sampleAggregator processor.
//
// The original deadlock scenario (issue #160337):
// 1. A processor panics during execution (without deferred cleanup)
// 2. Producer goroutines are blocked sending to it via RowChannel
// 3. The panic is recovered but the processor never calls ConsumerClosed()
// 4. Producers remain stuck on channel sends indefinitely
// 5. Wait() blocks forever waiting for producer goroutines
// 6. Cleanup is never called, so UnregisterFlow never happens
// 7. Drain waits forever for the flow to unregister
//
// The fix uses defer to ensure ConsumerClosed() is called even on panic,
// which drains the channel and unblocks stuck producers.
func TestPanicDeadlock(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderStress(t, "test has a 10-second timeout to detect deadlock")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up minimal infrastructure for sampleAggregator
st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)

monitor := mon.NewMonitor(mon.Options{
Name: mon.MakeName("test"),
Settings: st,
})
monitor.Start(ctx, nil, mon.NewStandaloneBudget(1<<30))
defer monitor.Stop(ctx)

// Set up testing knob to inject panic after first row
var rowsSeen int
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Mon: monitor,
Cfg: &execinfra.ServerConfig{
Settings: st,
TestingKnobs: execinfra.TestingKnobs{
SampleAggregatorTestingKnobRowHook: func() {
rowsSeen++
if rowsSeen >= 1 {
panic("sampleAggregator test: injected panic")
}
},
},
},
}

// SampleAggregator expects sampler output format: original columns + 8 metadata columns
// Sampler adds: rank, sketch_idx, num_rows, num_nulls, size, sketch_data, inv_col_idx, inv_idx_key
samplerOutTypes := []*types.T{
types.Int, // original column (the data being sampled)
types.Int, // rank
types.Int, // sketch index
types.Int, // num rows
types.Int, // num nulls
types.Int, // size
types.Bytes, // sketch data
types.Int, // inverted column index
types.Bytes, // inverted index key
}

// Use unbuffered channel to ensure blocking happens immediately
rowChan := &execinfra.RowChannel{}
rowChan.InitWithBufSizeAndNumSenders(samplerOutTypes, 0 /* unbuffered */, 1 /* numSenders */)
rowChan.Start(ctx)

// Create the real sampleAggregator
spec := &execinfrapb.SampleAggregatorSpec{
SampleSize: 100,
MinSampleSize: 10,
Sketches: []execinfrapb.SketchSpec{
{
Columns: []uint32{0},
GenerateHistogram: false,
StatName: "test",
},
},
}
post := &execinfrapb.PostProcessSpec{}

proc, err := newSampleAggregator(ctx, flowCtx, 0 /* processorID */, spec, rowChan, post)
if err != nil {
t.Fatal(err)
}

// Create output channel (sampleAggregator outputs stats results)
outputChan := &execinfra.RowChannel{}
outputChan.InitWithBufSizeAndNumSenders([]*types.T{types.Bytes}, 10, 1)
outputChan.Start(ctx)

// Track producer goroutine
var producerWg sync.WaitGroup
producerWg.Add(1)

// Start producer goroutine that sends rows to the processor
go func() {
defer producerWg.Done()
defer rowChan.ProducerDone()

// Create a sampler-format row with all 9 columns
row := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // original column
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // rank
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // sketch index
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(1)), // num rows
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // num nulls
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(0)), // size
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // sketch data
rowenc.DatumToEncDatum(types.Int, tree.NewDInt(-1)), // inverted column index (-1 = not used)
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes("")), // inverted index key
}

// Send multiple rows. The processor will panic after reading the first one.
// WITH FIX: The defer in sampleAggregator.Run() calls ConsumerClosed() which drains the channel
// WITHOUT FIX: Producer would stay blocked forever
for i := 0; i < 5; i++ {
rowChan.Push(row, nil)
}
}()

// Run the processor in a separate goroutine (simulates flow.Run)
processorDone := make(chan bool)
var processorPanic interface{}

go func() {
defer func() {
// Simulates Wait() catching the panic
processorPanic = recover()
if processorPanic != nil {
// Simulates ctxCancel() being called
cancel()
}
close(processorDone)
}()

proc.Run(ctx, outputChan)
}()

// Wait for processor to panic and exit
<-processorDone

if processorPanic == nil {
t.Fatal("expected processor to panic, but it didn't")
}

// Now try to wait for producer goroutine with a timeout
// WITH THE FIX: This should complete quickly because the defer calls
// ConsumerClosed(), which drains the channel and unblocks the producer
producersDone := make(chan bool)
go func() {
producerWg.Wait()
close(producersDone)
}()

select {
case <-producersDone:
// SUCCESS: Producer finished (this is what we want with the fix)
t.Log("Producer finished successfully after panic - fix is working")
case <-time.After(10 * time.Second):
// FAILURE: Producer is deadlocked (the fix is not working)
t.Fatal("DEADLOCK: Producer goroutine is still blocked 10 seconds after panic. " +
"The defer-based fix in sampleAggregator is not working correctly.")
}
}
24 changes: 15 additions & 9 deletions pkg/sql/rowexec/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,21 @@ func (s *samplerProcessor) Run(ctx context.Context, output execinfra.RowReceiver
ctx = s.StartInternal(ctx, samplerProcName)
s.input.Start(ctx)

earlyExit, err := s.mainLoop(ctx, output)
if err != nil {
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
} else if !earlyExit {
execinfra.SendTraceData(ctx, s.FlowCtx, output)
s.input.ConsumerClosed()
output.ProducerDone()
}
s.MoveToDraining(nil /* err */)
// Use defer to ensure cleanup happens even on panic (fix for issue #160337).
var earlyExit bool
var err error
defer func() {
if err != nil {
execinfra.DrainAndClose(ctx, s.FlowCtx, s.input, output, err)
} else if !earlyExit {
execinfra.SendTraceData(ctx, s.FlowCtx, output)
s.input.ConsumerClosed()
output.ProducerDone()
}
s.MoveToDraining(nil /* err */)
}()

earlyExit, err = s.mainLoop(ctx, output)
}

func (s *samplerProcessor) mainLoop(
Expand Down