Skip to content

Commit 5d76081

Browse files
committed
perf: batch-drain buffered events in memqueue runLoop
The memqueue runLoop processes one event per select loop iteration, paying full goroutine scheduling overhead for each event. When multiple producers are sending concurrently, the pushChan buffer fills up but is still drained one-at-a-time. After handling the first event from the select, perform a non-blocking drain of up to 64 additional already-buffered events before returning to the main select. This amortizes scheduling overhead across batches while the cap prevents starvation of Get, Ack, and Close operations. Benchmark results (Apple M4, null output pipeline, batch_size=2048): - BenchmarkFullPipeline/batch_2048: +19-24% throughput (p=0.010) - BenchmarkProducerThroughput (10 producers): neutral (p=0.442) - ES e2e (real Elasticsearch output): neutral, no regression (all p>0.1) Includes 7 behavioral equivalence tests that pass on both the old and new code paths, covering backpressure, shutdown, multi-producer delivery, ack correctness, and rapid close scenarios.
1 parent c47d134 commit 5d76081

File tree

2 files changed

+327
-0
lines changed

2 files changed

+327
-0
lines changed

libbeat/publisher/queue/memqueue/queue_test.go

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,3 +442,309 @@ func BenchmarkProducerThroughput(b *testing.B) {
442442
cancel()
443443
testQueue.Close(true)
444444
}
445+
446+
// ---------------------------------------------------------------------------
447+
// Behavioral equivalence tests: these verify the guarantees that callers
448+
// depend on. Every test here must pass with and without the batched drain
449+
// optimization in handleInsert.
450+
// ---------------------------------------------------------------------------
451+
452+
// TestPublishReturnsOnlyAfterInsert verifies that when Publish returns true,
453+
// the event is already in the ring buffer (not just buffered in a channel).
454+
// We confirm this by immediately calling Get after Publish and verifying
455+
// the event is available.
456+
func TestPublishReturnsOnlyAfterInsert(t *testing.T) {
457+
q := NewQueue(logp.NewNopLogger(), nil,
458+
Settings{
459+
Events: 100,
460+
MaxGetRequest: 100,
461+
FlushTimeout: 0, // no flush delay
462+
}, 0, nil)
463+
defer q.Close(true)
464+
465+
p := q.Producer(queue.ProducerConfig{})
466+
467+
// Publish one event
468+
_, ok := p.Publish("event-1")
469+
require.True(t, ok)
470+
471+
// The event must be immediately available for consumption
472+
batch, err := q.Get(1)
473+
require.NoError(t, err)
474+
require.Equal(t, 1, batch.Count())
475+
batch.Done()
476+
}
477+
478+
// TestBackpressureBlocksAtQueueCapacity verifies that Publish blocks when
479+
// the ring buffer is full (backpressure) and unblocks when space is freed.
480+
func TestBackpressureBlocksAtQueueCapacity(t *testing.T) {
481+
const queueSize = 5
482+
q := NewQueue(logp.NewNopLogger(), nil,
483+
Settings{
484+
Events: queueSize,
485+
MaxGetRequest: queueSize,
486+
FlushTimeout: 0,
487+
}, 0, nil)
488+
defer q.Close(true)
489+
490+
p := q.Producer(queue.ProducerConfig{})
491+
492+
// Fill the queue to capacity
493+
for i := 0; i < queueSize; i++ {
494+
_, ok := p.Publish(i)
495+
require.True(t, ok, "Publish %d should succeed", i)
496+
}
497+
498+
// Next publish must block (queue is full).
499+
// Use a goroutine + timer to detect blocking.
500+
published := make(chan bool, 1)
501+
go func() {
502+
_, ok := p.Publish("overflow")
503+
published <- ok
504+
}()
505+
506+
select {
507+
case <-published:
508+
t.Fatal("Publish should block when queue is full")
509+
case <-time.After(50 * time.Millisecond):
510+
// Expected: blocked
511+
}
512+
513+
// Free one slot by consuming and acknowledging
514+
batch, err := q.Get(1)
515+
require.NoError(t, err)
516+
batch.Done()
517+
518+
// The blocked publish should now succeed
519+
select {
520+
case ok := <-published:
521+
require.True(t, ok, "Publish should succeed after space freed")
522+
case <-time.After(time.Second):
523+
t.Fatal("Publish did not unblock after freeing space")
524+
}
525+
}
526+
527+
// TestShutdownUnblocksProducer verifies that a producer blocked on a full
528+
// queue is unblocked by queue.Close and Publish returns false.
529+
func TestShutdownUnblocksProducer(t *testing.T) {
530+
const queueSize = 3
531+
q := NewQueue(logp.NewNopLogger(), nil,
532+
Settings{
533+
Events: queueSize,
534+
MaxGetRequest: queueSize,
535+
FlushTimeout: 0,
536+
}, 0, nil)
537+
538+
p := q.Producer(queue.ProducerConfig{})
539+
540+
// Fill the queue
541+
for i := 0; i < queueSize; i++ {
542+
_, ok := p.Publish(i)
543+
require.True(t, ok)
544+
}
545+
546+
// Blocked publish in goroutine
547+
result := make(chan bool, 1)
548+
go func() {
549+
_, ok := p.Publish("blocked")
550+
result <- ok
551+
}()
552+
553+
// Give the goroutine time to block
554+
time.Sleep(20 * time.Millisecond)
555+
556+
// Close the queue — should unblock the producer with ok=false
557+
q.Close(false)
558+
559+
select {
560+
case ok := <-result:
561+
require.False(t, ok, "Publish should return false when queue is closing")
562+
case <-time.After(time.Second):
563+
t.Fatal("Blocked producer was not unblocked by queue.Close")
564+
}
565+
}
566+
567+
// TestMultiProducerAllEventsDelivered verifies no events are lost when
568+
// multiple producers publish concurrently and all events are consumed.
569+
func TestMultiProducerAllEventsDelivered(t *testing.T) {
570+
const (
571+
queueSize = 200
572+
numProducers = 10
573+
eventsPerProducer = 500
574+
totalEvents = numProducers * eventsPerProducer
575+
)
576+
577+
testQueue := NewQueue(logp.NewNopLogger(), nil,
578+
Settings{
579+
Events: queueSize,
580+
MaxGetRequest: 100,
581+
FlushTimeout: time.Millisecond,
582+
}, 0, nil)
583+
584+
var consumedCount atomic.Int64
585+
586+
// Consumer: read and acknowledge everything
587+
var wg sync.WaitGroup
588+
wg.Add(1)
589+
go func() {
590+
defer wg.Done()
591+
for {
592+
batch, err := testQueue.Get(100)
593+
if err != nil {
594+
return
595+
}
596+
consumedCount.Add(int64(batch.Count()))
597+
batch.Done()
598+
}
599+
}()
600+
601+
// Producers: each publishes exactly eventsPerProducer events
602+
var publishedCount atomic.Int64
603+
for i := 0; i < numProducers; i++ {
604+
wg.Add(1)
605+
go func(producerID int) {
606+
defer wg.Done()
607+
p := testQueue.Producer(queue.ProducerConfig{})
608+
for j := 0; j < eventsPerProducer; j++ {
609+
_, ok := p.Publish(fmt.Sprintf("p%d-e%d", producerID, j))
610+
if ok {
611+
publishedCount.Add(1)
612+
}
613+
}
614+
}(i)
615+
}
616+
617+
// Wait for all events to be consumed
618+
require.Eventually(
619+
t,
620+
func() bool { return consumedCount.Load() == totalEvents },
621+
10*time.Second,
622+
time.Millisecond,
623+
"expected %d consumed events, got %d", totalEvents, consumedCount.Load())
624+
625+
testQueue.Close(false)
626+
<-testQueue.Done()
627+
wg.Wait()
628+
629+
require.Equal(t, int64(totalEvents), publishedCount.Load(),
630+
"all Publish calls should succeed")
631+
require.Equal(t, int64(totalEvents), consumedCount.Load(),
632+
"all published events should be consumed")
633+
}
634+
635+
// TestTryPublishDropsWhenFull verifies that TryPublish does not succeed
636+
// when the queue's ring buffer is at capacity.
637+
func TestTryPublishDropsWhenFull(t *testing.T) {
638+
const queueSize = 3
639+
q := NewQueue(logp.NewNopLogger(), nil,
640+
Settings{
641+
Events: queueSize,
642+
MaxGetRequest: queueSize,
643+
FlushTimeout: 0,
644+
}, 0, nil)
645+
defer q.Close(true)
646+
647+
p := q.Producer(queue.ProducerConfig{})
648+
649+
// Fill the queue
650+
for i := 0; i < queueSize; i++ {
651+
_, ok := p.Publish(i)
652+
require.True(t, ok)
653+
}
654+
655+
// TryPublish on a full queue should not succeed. With a buffered
656+
// pushChan the non-blocking send may land in the channel buffer, but
657+
// the runLoop won't drain it because the ring buffer is full, so
658+
// TryPublish blocks on the response channel. Either way, we verify
659+
// it does not return true within a short window.
660+
result := make(chan bool, 1)
661+
go func() {
662+
_, ok := p.TryPublish("overflow")
663+
result <- ok
664+
}()
665+
666+
select {
667+
case ok := <-result:
668+
require.False(t, ok, "TryPublish must not return true when queue is full")
669+
case <-time.After(100 * time.Millisecond):
670+
// Acceptable: TryPublish is blocked, which means it did not succeed
671+
}
672+
}
673+
674+
// TestProducerCloseDoesNotBlockOnFullQueue verifies that closing a producer
675+
// does not deadlock even when the queue is full.
676+
func TestProducerCloseDoesNotBlockOnFullQueue(t *testing.T) {
677+
const queueSize = 2
678+
q := NewQueue(logp.NewNopLogger(), nil,
679+
Settings{
680+
Events: queueSize,
681+
MaxGetRequest: queueSize,
682+
FlushTimeout: 0,
683+
}, 0, nil)
684+
defer q.Close(true)
685+
686+
p := q.Producer(queue.ProducerConfig{})
687+
688+
// Fill the queue
689+
for i := 0; i < queueSize; i++ {
690+
_, ok := p.Publish(i)
691+
require.True(t, ok)
692+
}
693+
694+
// Close the producer — should not deadlock
695+
done := make(chan struct{})
696+
go func() {
697+
p.Close()
698+
close(done)
699+
}()
700+
701+
select {
702+
case <-done:
703+
// Success
704+
case <-time.After(time.Second):
705+
t.Fatal("Producer.Close() deadlocked on full queue")
706+
}
707+
}
708+
709+
// TestRapidCloseAfterPublish verifies that closing the queue immediately
710+
// after a successful Publish does not lose the event — it should still
711+
// be counted either as published (and acked) or the Publish returns false.
712+
func TestRapidCloseAfterPublish(t *testing.T) {
713+
for iter := 0; iter < 50; iter++ {
714+
var acked atomic.Int64
715+
q := NewQueue(logp.NewNopLogger(), nil,
716+
Settings{
717+
Events: 10,
718+
MaxGetRequest: 10,
719+
FlushTimeout: 0,
720+
}, 0, nil)
721+
722+
p := q.Producer(queue.ProducerConfig{
723+
ACK: func(count int) { acked.Add(int64(count)) },
724+
})
725+
726+
// Consumer
727+
go func() {
728+
for {
729+
batch, err := q.Get(10)
730+
if err != nil {
731+
return
732+
}
733+
batch.Done()
734+
}
735+
}()
736+
737+
var published int64
738+
_, ok := p.Publish("event-1")
739+
if ok {
740+
published++
741+
}
742+
743+
q.Close(false)
744+
<-q.Done()
745+
746+
// Ensure we never ack more than we published
747+
require.GreaterOrEqual(t, published, acked.Load(),
748+
"iter %d: acked (%d) > published (%d)", iter, acked.Load(), published)
749+
}
750+
}

libbeat/publisher/queue/memqueue/runloop.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,27 @@ func (l *runLoop) handleInsert(req *pushRequest) {
235235
l.nextEntryID++
236236
l.eventCount++
237237

238+
// Drain additional buffered events without returning to the select loop.
239+
// This amortizes the cost of goroutine scheduling when multiple producers
240+
// are sending concurrently: instead of one select wakeup per event, we
241+
// handle up to maxDrainPerWakeup already-buffered events in a tight loop.
242+
// The cap prevents starvation of Get, Ack, and Close operations.
243+
const maxDrainPerWakeup = 64
244+
drained := 0
245+
drain:
246+
for l.eventCount < len(l.broker.buf) && drained < maxDrainPerWakeup {
247+
select {
248+
case req := <-l.broker.pushChan:
249+
l.insert(&req, l.nextEntryID)
250+
req.resp <- l.nextEntryID
251+
l.nextEntryID++
252+
l.eventCount++
253+
drained++
254+
default:
255+
break drain
256+
}
257+
}
258+
238259
// See if this gave us enough for a new batch
239260
l.maybeUnblockGetRequest()
240261
}

0 commit comments

Comments
 (0)