fix: drain buffered ACKs when errorsCh closes before acksCh#258
fix: drain buffered ACKs when errorsCh closes before acksCh#258infiniteregrets merged 1 commit intomainfrom
Conversation
readAcksLoop closes errorsCh before acksCh (LIFO defer order). The select in readAcks could non-deterministically pick the closed errorsCh and return, abandoning buffered ACKs in acksCh. This caused successful appends to remain in inflightQueue, time out, and get duplicated on retry. Drain acksCh when errorsCh closes to process all pending ACKs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR fixes a data-duplication bug in Key changes:
Confidence Score: 5/5Safe to merge — the fix is minimal, logically sound, and does not introduce new error paths or races. The only findings are P2 style/improvement suggestions (defensive context check during drain, missing regression test). The core fix correctly exploits the invariant that No files require special attention; the single changed file is a targeted, low-risk bug fix. Important Files Changed
Sequence DiagramsequenceDiagram
participant RL as readAcksLoop (goroutine)
participant ACK as acksCh (buf=20)
participant ERR as errorsCh (buf=1)
participant RA as readAcks (goroutine)
Note over RL: Normal/EOF exit — loop ends
RL->>ACK: (buffered ACKs already in channel)
RL->>ERR: defer close(errorsCh) [LIFO #1]
ERR-->>RA: select picks closed errorsCh
alt BEFORE fix
RA->>RA: return immediately
Note over ACK: buffered ACKs abandoned → entries time out → duplicates
else AFTER fix
RA->>RA: for ack := range acksCh
RA->>RA: handleAck(ack) × N
RL->>ACK: defer close(acksCh) [LIFO #2]
ACK-->>RA: channel closed — range exits
RA->>RA: return cleanly
Note over ACK: all ACKs processed → no duplicates
end
Prompt To Fix All With AIThis is a comment left during a code review.
Path: s2/append_session.go
Line: 428-430
Comment:
**No `pumpCtx` cancellation check during drain**
The new drain loop — `for ack := range session.acksCh` — does not observe `r.pumpCtx.Done()`. This means that if the pump context is cancelled (e.g. `Close()` is called concurrently) while a handful of ACKs are still being processed, the goroutine will finish draining before exiting, rather than honouring the cancellation signal.
In practice this is harmless because:
1. `close(p.acksCh)` fires in the very next deferred statement after `close(p.errorsCh)` (both run on the same goroutine, sequentially), so the drain window is tiny.
2. The buffer cap is 20, so at most 20 `handleAck` calls can occur.
3. `handleAck` itself is safe to call after shutdown — it returns immediately when the inflight queue is empty.
If you'd like to be defensive anyway, you could break early on context cancellation:
```go
for ack := range session.acksCh {
select {
case <-r.pumpCtx.Done():
return
default:
}
r.handleAck(session, ack)
}
```
This is purely a hardening suggestion — the current code is functionally correct.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: s2/append_session.go
Line: 424-431
Comment:
**No regression test for the buffered-ACK drain path**
The existing test suite in `append_session_retry_test.go` doesn't appear to include a test that verifies ACKs buffered in `acksCh` are not lost when `errorsCh` closes first. Without a test, it would be easy for a future refactor of the defer order in `readAcksLoop` to silently reintroduce this bug.
A minimal test could:
1. Pre-populate `acksCh` with 1–N `*AppendAck` values.
2. Close `errorsCh` (simulating LIFO defer firing first).
3. Then close `acksCh`.
4. Assert that all submitted entries are resolved with their ACKs (not timed out or retried).
This would give clear coverage of the exact race the PR description documents.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "fix: drain buffered ACKs when errorsCh c..." | Re-trigger Greptile |
| for ack := range session.acksCh { | ||
| r.handleAck(session, ack) | ||
| } |
There was a problem hiding this comment.
No
pumpCtx cancellation check during drain
The new drain loop — for ack := range session.acksCh — does not observe r.pumpCtx.Done(). This means that if the pump context is cancelled (e.g. Close() is called concurrently) while a handful of ACKs are still being processed, the goroutine will finish draining before exiting, rather than honouring the cancellation signal.
In practice this is harmless because:
close(p.acksCh)fires in the very next deferred statement afterclose(p.errorsCh)(both run on the same goroutine, sequentially), so the drain window is tiny.- The buffer cap is 20, so at most 20
handleAckcalls can occur. handleAckitself is safe to call after shutdown — it returns immediately when the inflight queue is empty.
If you'd like to be defensive anyway, you could break early on context cancellation:
for ack := range session.acksCh {
select {
case <-r.pumpCtx.Done():
return
default:
}
r.handleAck(session, ack)
}This is purely a hardening suggestion — the current code is functionally correct.
Prompt To Fix With AI
This is a comment left during a code review.
Path: s2/append_session.go
Line: 428-430
Comment:
**No `pumpCtx` cancellation check during drain**
The new drain loop — `for ack := range session.acksCh` — does not observe `r.pumpCtx.Done()`. This means that if the pump context is cancelled (e.g. `Close()` is called concurrently) while a handful of ACKs are still being processed, the goroutine will finish draining before exiting, rather than honouring the cancellation signal.
In practice this is harmless because:
1. `close(p.acksCh)` fires in the very next deferred statement after `close(p.errorsCh)` (both run on the same goroutine, sequentially), so the drain window is tiny.
2. The buffer cap is 20, so at most 20 `handleAck` calls can occur.
3. `handleAck` itself is safe to call after shutdown — it returns immediately when the inflight queue is empty.
If you'd like to be defensive anyway, you could break early on context cancellation:
```go
for ack := range session.acksCh {
select {
case <-r.pumpCtx.Done():
return
default:
}
r.handleAck(session, ack)
}
```
This is purely a hardening suggestion — the current code is functionally correct.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| case err, ok := <-session.errorsCh: | ||
| if !ok { | ||
| // errorsCh closes before acksCh (defer order), so | ||
| // drain any buffered ACKs before returning. | ||
| for ack := range session.acksCh { | ||
| r.handleAck(session, ack) | ||
| } | ||
| return |
There was a problem hiding this comment.
No regression test for the buffered-ACK drain path
The existing test suite in append_session_retry_test.go doesn't appear to include a test that verifies ACKs buffered in acksCh are not lost when errorsCh closes first. Without a test, it would be easy for a future refactor of the defer order in readAcksLoop to silently reintroduce this bug.
A minimal test could:
- Pre-populate
acksChwith 1–N*AppendAckvalues. - Close
errorsCh(simulating LIFO defer firing first). - Then close
acksCh. - Assert that all submitted entries are resolved with their ACKs (not timed out or retried).
This would give clear coverage of the exact race the PR description documents.
Prompt To Fix With AI
This is a comment left during a code review.
Path: s2/append_session.go
Line: 424-431
Comment:
**No regression test for the buffered-ACK drain path**
The existing test suite in `append_session_retry_test.go` doesn't appear to include a test that verifies ACKs buffered in `acksCh` are not lost when `errorsCh` closes first. Without a test, it would be easy for a future refactor of the defer order in `readAcksLoop` to silently reintroduce this bug.
A minimal test could:
1. Pre-populate `acksCh` with 1–N `*AppendAck` values.
2. Close `errorsCh` (simulating LIFO defer firing first).
3. Then close `acksCh`.
4. Assert that all submitted entries are resolved with their ACKs (not timed out or retried).
This would give clear coverage of the exact race the PR description documents.
How can I resolve this? If you propose a fix, please make it concise.
closes #233
Summary
readAcksLoopdefersclose(errorsCh)beforeclose(acksCh)(LIFO), soerrorsChcloses first.selectinreadAckscould non-deterministically pick the closederrorsChand return, abandoning buffered ACKs inacksCh(buffer size 20).inflightQueue, which then time out and get retried on a new session, causing data duplication.acksChwhenerrorsChcloses to process all pending ACKs before returning.Test plan
go build ./...passes🤖 Generated with Claude Code