Skip to content

[Draft] perf: batch-drain buffered events in memqueue runLoop#49939

Open
strawgate wants to merge 1 commit intoelastic:mainfrom
strawgate:perf-memqueue-batched-drain
Open

[Draft] perf: batch-drain buffered events in memqueue runLoop#49939
strawgate wants to merge 1 commit intoelastic:mainfrom
strawgate:perf-memqueue-batched-drain

Conversation

@strawgate
Copy link
Copy Markdown
Contributor

The memqueue runLoop processes one event per select loop iteration, paying full goroutine scheduling overhead for each event. When multiple producers are sending concurrently, the pushChan buffer fills up but is still drained one-at-a-time. When the pipeline is CPU constrained this has a significant impact on e2e performance. When the pipeline is I/O constrained (network to ES or from the data source), this has no real impact.

After handling the first event from the select, perform a non-blocking drain of up to 64 additional already-buffered events before returning to the main select. This amortizes scheduling overhead across batches while the cap prevents starvation of Get, Ack, and Close operations.

Benchmark results (Apple M4, null output pipeline, batch_size=2048):

  • BenchmarkFullPipeline/batch_2048: +19-24% throughput (p=0.010)
  • BenchmarkProducerThroughput (10 producers): neutral (p=0.442)
  • ES e2e (real Elasticsearch output): neutral, no regression (all p>0.1)

Includes 8 behavioral equivalence tests that pass on both the old and new code paths, covering backpressure, shutdown, multi-producer delivery, ack correctness, and rapid close scenarios.

@strawgate strawgate requested a review from a team as a code owner April 5, 2026 01:18
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Apr 5, 2026
@strawgate strawgate requested review from faec and mauri870 April 5, 2026 01:18
@botelastic
Copy link
Copy Markdown

botelastic bot commented Apr 5, 2026

This pull request doesn't have a Team:<team> label.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 5, 2026

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Apr 5, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @strawgate? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@strawgate strawgate force-pushed the perf-memqueue-batched-drain branch from 842a5c7 to 5d76081 Compare April 5, 2026 01:23
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 5, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 36e15457-85ef-4ff5-a2a9-c9ac708151f0

📥 Commits

Reviewing files that changed from the base of the PR and between 842a5c7 and 58e534e.

📒 Files selected for processing (2)
  • libbeat/publisher/queue/memqueue/queue_test.go
  • libbeat/publisher/queue/memqueue/runloop.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • libbeat/publisher/queue/memqueue/runloop.go
  • libbeat/publisher/queue/memqueue/queue_test.go

📝 Walkthrough

Walkthrough

Adds eight behavioral tests to libbeat/publisher/queue/memqueue/queue_test.go covering publish atomicity, backpressure blocking, shutdown unblocking, multi-producer delivery, TryPublish behavior when full, producer close semantics, and rapid-close ACK accounting. Modifies runloop.go so handleInsert uses a new insertEvent helper and performs a bounded drain of up to 64 additional buffered push requests per wakeup before calling maybeUnblockGetRequest.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Comment @coderabbitai help to get the list of available commands and usage tips.

The memqueue runLoop processes one event per select loop iteration, paying
full goroutine scheduling overhead for each event. When multiple producers
are sending concurrently, the pushChan buffer fills up but is still drained
one-at-a-time.

After handling the first event from the select, perform a non-blocking drain
of up to 64 additional already-buffered events before returning to the main
select. This amortizes scheduling overhead across batches while the cap
prevents starvation of Get, Ack, and Close operations.

Benchmark results (Apple M4, null output pipeline, batch_size=2048):
  - BenchmarkFullPipeline/batch_2048: +19-24% throughput (p=0.010)
  - BenchmarkProducerThroughput (10 producers): neutral (p=0.442)
  - ES e2e (real Elasticsearch output): neutral, no regression (all p>0.1)

Includes 7 behavioral equivalence tests that pass on both the old and new
code paths, covering backpressure, shutdown, multi-producer delivery, ack
correctness, and rapid close scenarios.
@strawgate strawgate force-pushed the perf-memqueue-batched-drain branch from 5d76081 to 58e534e Compare April 5, 2026 01:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

needs_team Indicates that the issue/PR needs a Team:* label

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant