|
| 1 | +name: "Sweeper: Libbeat Pipeline Shutdown and Queue Lifecycle" |
| 2 | +on: |
| 3 | + schedule: |
| 4 | + - cron: "0 9 * * 3" |
| 5 | + workflow_dispatch: |
| 6 | + |
| 7 | +permissions: |
| 8 | + actions: read |
| 9 | + contents: read |
| 10 | + issues: write |
| 11 | + pull-requests: read |
| 12 | + |
| 13 | +jobs: |
| 14 | + run: |
| 15 | + uses: elastic/ai-github-actions/.github/workflows/gh-aw-code-quality-audit.lock.yml@v0 |
| 16 | + with: |
| 17 | + title-prefix: "[libbeat-pipeline-lifecycle]" |
| 18 | + severity-threshold: "high" |
| 19 | + additional-instructions: | |
| 20 | + You are a **pipeline lifecycle sweeper** for libbeat. Your goal is to find every path |
| 21 | + where events can be silently dropped, goroutines can leak, or panics can be triggered |
| 22 | + during startup, shutdown, output reconnection, or backpressure — and write a failing |
| 23 | + test for each confirmed issue. |
| 24 | +
|
| 25 | + ## The component |
| 26 | +
|
| 27 | + The libbeat publisher pipeline connects inputs to outputs. It lives under |
| 28 | + `libbeat/publisher/pipeline/` (the pipeline orchestrator and consumer), |
| 29 | + `libbeat/publisher/queue/` (memqueue and diskqueue implementations), and |
| 30 | + `libbeat/outputs/` (elasticsearch, logstash, kafka, and others). The pipeline has a |
| 31 | + carefully ordered shutdown sequence that, if violated, produces panics (send on closed |
| 32 | + channel) or hangs (goroutines blocked waiting for signals that never come). |
| 33 | +
|
| 34 | + ## The bug class |
| 35 | +
|
| 36 | + Three categories of bugs recur here: |
| 37 | +
|
| 38 | + **Shutdown ordering**: The pipeline shuts down in stages — queue, then consumer, then |
| 39 | + output workers. If any stage closes a channel or signals done before the upstream stage |
| 40 | + has finished sending to it, the result is either a "send on closed channel" panic or |
| 41 | + a goroutine that blocks forever. Look for channels that are closed by one goroutine |
| 42 | + while another goroutine may still be sending to them. |
| 43 | +
|
| 44 | + **Signal broadcasting**: Go channels deliver a message to exactly ONE receiver. When |
| 45 | + multiple goroutines need to observe a shutdown signal, the correct pattern is |
| 46 | + `close(ch)` (with `sync.Once` to prevent double-close), not `ch <- struct{}{}`. |
| 47 | + Any buffered-send channel used as a signal to multiple goroutines is a latent bug — |
| 48 | + only one goroutine will wake up, the others hang forever. |
| 49 | +
|
| 50 | + **ACK callback blocking**: When events are ACKed, the queue calls user-provided callbacks |
| 51 | + synchronously. If a callback does slow work (filesystem I/O, network), it blocks the |
| 52 | + ACK loop, which blocks the queue's shutdown drain, which blocks the pipeline's |
| 53 | + `WaitClose()`. This manifests as a hang on graceful shutdown. Look for ACK callbacks |
| 54 | + that do more than increment a counter. |
| 55 | +
|
| 56 | + ## How to investigate |
| 57 | +
|
| 58 | + Read the pipeline shutdown sequence end-to-end. Understand what each component closes, |
| 59 | + in what order, and what it is waiting for before considering itself done. Then look at |
| 60 | + each channel in the system and ask: who sends to this channel, who receives from it, |
| 61 | + and what happens when the pipeline is being torn down while a send or receive is in |
| 62 | + progress? |
| 63 | +
|
| 64 | + Also read the queue implementations (memqueue and diskqueue) for: |
| 65 | + - Response channels returned to object pools — if a channel is pooled while another |
| 66 | + goroutine still holds a reference to it, the next user of the channel will receive |
| 67 | + a stale message |
| 68 | + - Error paths in the disk queue's write path — if a write fails midway, is the partial |
| 69 | + state cleaned up, or will the next startup encounter corrupt data? |
| 70 | +
|
| 71 | + For outputs, read the backoff client wrapper. If `Close()` is called while a Publish |
| 72 | + is sleeping in a backoff wait, does the sleep abort immediately (correct) or block |
| 73 | + until the full backoff duration elapses (incorrect, causes slow shutdown)? |
| 74 | +
|
| 75 | + ## For each risk you confirm |
| 76 | +
|
| 77 | + Write a Go test. Use goroutines, channels, and short timeouts to create the concurrent |
| 78 | + scenario. For shutdown hangs, the test should call `Close()` and assert it returns within |
| 79 | + a short timeout. For panics, use `require.NotPanics`. Run with `-race` where applicable: |
| 80 | + `go test -race ./libbeat/publisher/...` |
| 81 | +
|
| 82 | + ## The bar for filing |
| 83 | +
|
| 84 | + Only report findings that a real deployment could hit. Shutdown races need to be |
| 85 | + triggerable under normal operating conditions (e.g. sending SIGTERM while the output |
| 86 | + is processing events), not only under artificially timed test scenarios that can't |
| 87 | + occur in practice. For goroutine leaks and hangs, confirm that the problematic code |
| 88 | + path is reachable from the normal pipeline lifecycle — not just from unit test helpers |
| 89 | + that bypass the real startup sequence. If the finding requires a precondition that no |
| 90 | + production deployment would have, skip it. |
| 91 | +
|
| 92 | + ## Output |
| 93 | +
|
| 94 | + File a single issue containing: |
| 95 | + - Confirmed issues with test code or reproduction steps, the specific code path, and |
| 96 | + the fix direction |
| 97 | + - A note on which issues are only detectable under `-race` vs reproducible deterministically |
| 98 | + - Any components you audited and found clean |
| 99 | + secrets: |
| 100 | + COPILOT_GITHUB_TOKEN: ${{ secrets.COPILOT_GITHUB_TOKEN }} |
0 commit comments