fix: advance read session state for ignored command records#259
Conversation
When IgnoreCommandRecords is set, the loop skipped handleRecord entirely for command records, failing to advance nextSeq and counters. On retry, the session would re-request from the stale position, re-processing the same command records. Extract state advancement into advanceState and call it for ignored command records before continuing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR fixes a real bug in the streaming read loop: when The fix extracts the state-advancement block from Key changes
Observations
Confidence Score: 5/5Safe to merge — the core fix is correct and all remaining findings are P2 style/documentation suggestions. The fix correctly addresses a real infinite-reconnect bug by ensuring No files require special attention; all concerns are in Important Files Changed
Sequence DiagramsequenceDiagram
participant R as streamReader.run
participant RO as runOnce
participant S as Server
Note over R,S: IgnoreCommandRecords=true
R->>S: GET /records?seq=0&count=10
S-->>RO: batch [cmd@0, cmd@1, data@2, ...]
loop for each record
alt IsCommandRecord (old)
Note over RO: continue (state NOT advanced)<br/>nextSeq stays at 0!
else IsCommandRecord (new)
RO->>RO: advanceState(record)<br/>nextSeq = seqNum+1
Note over RO: continue
else data record
RO->>RO: handleRecord → advanceState(record)
end
end
RO-->>R: disconnect error
alt Old (buggy)
R->>S: GET /records?seq=0&count=10 ♻️ infinite loop
else New (fixed)
R->>R: limitsReached() check
R->>S: GET /records?seq=nextSeq&count=remaining ✅
end
Prompt To Fix All With AIThis is a comment left during a code review.
Path: s2/read.go
Line: 555-564
Comment:
**`recordsRead`/`bytesRead` count toward `Count`/`Bytes` limits for ignored records**
`advanceState` increments `recordsRead` and `bytesRead` for command records that are never delivered to the caller's `Records()` channel. This has two downstream effects worth verifying are intentional:
1. **`limitsReached()` can fire before the caller receives `Count` data records.** If the user creates a session with `Count=10, IgnoreCommandRecords=true` and the stream has 10 command records followed by 10 data records, `limitsReached()` returns `true` after the first 10 and the session stops — delivering 0 records to the consumer.
2. **`buildAttemptOptions` computes `remaining = baseCount - recordsRead`.** Because `recordsRead` now includes command records, the reconnect request's `Count` parameter is reduced by the number of ignored command records. Since the server counts all records (including command records) toward its own `Count` limit, this is the correct server-side arithmetic — but it means a `Count=10` session may ultimately surface fewer than 10 data records to the application when command records are present.
This is likely the intended trade-off (correct server-side budget tracking beats the previous infinite-loop), but it changes observable behavior for anyone relying on `Count` meaning "N data records". A comment explaining the semantics (or a note in the `ReadOptions.Count` doc) would help future readers.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: s2/read.go
Line: 542-547
Comment:
**No unit test for the reconnect-with-command-records regression**
The PR description lists three test-plan items but none are checked, and the existing test file (`read_session_retry_test.go`) does not include a case that exercises the specific scenario being fixed: a stream reader with `IgnoreCommandRecords=true` that receives command-only (or command-leading) batches and then reconnects.
Without this test the regression could silently return. Consider adding a test along the lines of:
- Simulate a batch of command records followed by a disconnect.
- Assert `NextReadPosition()` advances past all command records.
- Assert a second run (or the retry path) does not re-request from the stale position.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "fix: advance read session state for igno..." | Re-trigger Greptile |
| func (r *streamReader) advanceState(record SequencedRecord) { | ||
| r.stateMu.Lock() | ||
| r.recordsRead++ | ||
| r.bytesRead += MeteredSequencedRecordBytes(record) | ||
| r.lastRecordTime = time.Now() | ||
| r.nextSeq = record.SeqNum + 1 | ||
| r.hasNextSeq = true | ||
| r.nextTS = record.Timestamp | ||
| r.stateMu.Unlock() | ||
| } |
There was a problem hiding this comment.
recordsRead/bytesRead count toward Count/Bytes limits for ignored records
advanceState increments recordsRead and bytesRead for command records that are never delivered to the caller's Records() channel. This has two downstream effects worth verifying are intentional:
-
limitsReached()can fire before the caller receivesCountdata records. If the user creates a session withCount=10, IgnoreCommandRecords=trueand the stream has 10 command records followed by 10 data records,limitsReached()returnstrueafter the first 10 and the session stops — delivering 0 records to the consumer. -
buildAttemptOptionscomputesremaining = baseCount - recordsRead. BecauserecordsReadnow includes command records, the reconnect request'sCountparameter is reduced by the number of ignored command records. Since the server counts all records (including command records) toward its ownCountlimit, this is the correct server-side arithmetic — but it means aCount=10session may ultimately surface fewer than 10 data records to the application when command records are present.
This is likely the intended trade-off (correct server-side budget tracking beats the previous infinite-loop), but it changes observable behavior for anyone relying on Count meaning "N data records". A comment explaining the semantics (or a note in the ReadOptions.Count doc) would help future readers.
Prompt To Fix With AI
This is a comment left during a code review.
Path: s2/read.go
Line: 555-564
Comment:
**`recordsRead`/`bytesRead` count toward `Count`/`Bytes` limits for ignored records**
`advanceState` increments `recordsRead` and `bytesRead` for command records that are never delivered to the caller's `Records()` channel. This has two downstream effects worth verifying are intentional:
1. **`limitsReached()` can fire before the caller receives `Count` data records.** If the user creates a session with `Count=10, IgnoreCommandRecords=true` and the stream has 10 command records followed by 10 data records, `limitsReached()` returns `true` after the first 10 and the session stops — delivering 0 records to the consumer.
2. **`buildAttemptOptions` computes `remaining = baseCount - recordsRead`.** Because `recordsRead` now includes command records, the reconnect request's `Count` parameter is reduced by the number of ignored command records. Since the server counts all records (including command records) toward its own `Count` limit, this is the correct server-side arithmetic — but it means a `Count=10` session may ultimately surface fewer than 10 data records to the application when command records are present.
This is likely the intended trade-off (correct server-side budget tracking beats the previous infinite-loop), but it changes observable behavior for anyone relying on `Count` meaning "N data records". A comment explaining the semantics (or a note in the `ReadOptions.Count` doc) would help future readers.
How can I resolve this? If you propose a fix, please make it concise.| ignoreCommandRecords := r.baseOpts != nil && r.baseOpts.IgnoreCommandRecords | ||
| for _, record := range batch.Records { | ||
| if ignoreCommandRecords && record.IsCommandRecord() { | ||
| r.advanceState(record) | ||
| continue | ||
| } |
There was a problem hiding this comment.
No unit test for the reconnect-with-command-records regression
The PR description lists three test-plan items but none are checked, and the existing test file (read_session_retry_test.go) does not include a case that exercises the specific scenario being fixed: a stream reader with IgnoreCommandRecords=true that receives command-only (or command-leading) batches and then reconnects.
Without this test the regression could silently return. Consider adding a test along the lines of:
- Simulate a batch of command records followed by a disconnect.
- Assert
NextReadPosition()advances past all command records. - Assert a second run (or the retry path) does not re-request from the stale position.
Prompt To Fix With AI
This is a comment left during a code review.
Path: s2/read.go
Line: 542-547
Comment:
**No unit test for the reconnect-with-command-records regression**
The PR description lists three test-plan items but none are checked, and the existing test file (`read_session_retry_test.go`) does not include a case that exercises the specific scenario being fixed: a stream reader with `IgnoreCommandRecords=true` that receives command-only (or command-leading) batches and then reconnects.
Without this test the regression could silently return. Consider adding a test along the lines of:
- Simulate a batch of command records followed by a disconnect.
- Assert `NextReadPosition()` advances past all command records.
- Assert a second run (or the retry path) does not re-request from the stale position.
How can I resolve this? If you propose a fix, please make it concise.
closes #235
Summary
IgnoreCommandRecordsis set, the streaming read loop skippedhandleRecordentirely for command records, failing to advancenextSeq,recordsRead, andbytesRead.advanceState()and call it for ignored command records beforecontinue, so the session position always advances.Test plan
go build ./...passesIgnoreCommandRecords: truecorrectly advancesNextReadPosition()past command records🤖 Generated with Claude Code