-
Notifications
You must be signed in to change notification settings - Fork 99
Open
Description
Describe the bug
The PostgresWriter uses an unbuffered channel (lastError) to communicate errors from the background poll() goroutine to the main Write() thread. When a database error occurs, flush() blocks trying to send to lastError. Since Write() only reads from this channel non-blockingly, the background goroutine hangs indefinitely. Eventually, the input buffer fills up, causing all subsequent Write() calls to block and the entire application to hang.
To Reproduce
Run the following test case:
func TestLastErrorChannelDeadlock(t *testing.T) {
conn, err := pgxmock.NewPool()
require.NoError(t, err)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Create a PostgresWriter with a very short batching delay
// so flush() gets called quickly
pgw := &PostgresWriter{
ctx: ctx,
sinkDb: conn,
opts: &CmdOpts{BatchingDelay: 10 * time.Millisecond},
input: make(chan metrics.MeasurementEnvelope, cacheLimit),
lastError: make(chan error), // UNBUFFERED - this is the bug
metricSchema: DbStorageSchemaTimescale,
partitionMapMetric: make(map[string]ExistingPartitionInfo),
partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
}
// Set up mock to fail on ensure partition (first error path in flush)
conn.ExpectExec("select \\* from admin.ensure_partition_timescale").
WithArgs("test_metric").
WillReturnError(errors.New("simulated partition error"))
// Start the poll goroutine
go pgw.poll()
// Write a message - this will be queued and processed by poll()
msg := metrics.MeasurementEnvelope{
MetricName: "test_metric",
DBName: "test_db",
Data: metrics.Measurements{
{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(42)},
},
}
// Write succeeds (just queues the message)
err = pgw.Write(msg)
assert.NoError(t, err)
// Now we need to detect if poll() is stuck.
// We'll try to write another message after some time.
// If poll() is deadlocked, the input channel will eventually fill up
// and Write() will timeout.
// Give poll() time to process the first message and hit the error
time.Sleep(100 * time.Millisecond)
// Track if we can still write
var writeSucceeded atomic.Bool
// Try to fill the input channel - if poll() is stuck, this will block
go func() {
for i := 0; i < cacheLimit+10; i++ {
select {
case pgw.input <- msg:
// Message sent
case <-time.After(500 * time.Millisecond):
// Timeout - channel is full, poll() is likely stuck
return
}
}
writeSucceeded.Store(true)
}()
// Wait to see if we can overflow the channel
time.Sleep(700 * time.Millisecond)
// If poll() is working correctly, it would drain the channel.
// But if it's stuck on lastError <- err, the channel fills up.
//
// NOTE: This test demonstrates the bug. When the bug is fixed,
// writeSucceeded should be true because poll() keeps draining.
// With the bug present, writeSucceeded will be false because
// poll() is blocked trying to send to lastError.
if !writeSucceeded.Load() {
t.Log("BUG CONFIRMED: poll() goroutine is deadlocked!")
t.Log("The flush() function is blocked on `pgw.lastError <- err` because")
t.Log("lastError is an unbuffered channel and no receiver is waiting.")
t.Fail()
}
}Expected behavior
Concurrent calls to Write() and the background flush() should not block the ingestion pipeline on error reporting.
Actual behavior
BUG CONFIRMED: poll() goroutine is deadlocked!
The flush() function is blocked on `pgw.lastError <- err` because
lastError is an unbuffered channel and no receiver is waiting.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels