Some bugs from spinloops while working on 4.2#1270
Merged
Conversation
trimLeft() removed batches from the front of pd.batches without adjusting pd.maxTimestampBatchIdx. After trimming, the stale index could point past the end of the slice, causing an out-of-bounds panic in pushBatch on the next produce after a DeleteRecords or retention trim. Fix by decrementing the index for each removed batch, and rebuilding it from the remaining batches if the max-timestamp batch itself was trimmed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace hardcoded absolute paths with paths relative to the script's directory so the script works across machines. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Create listOrEpochMetaCh before triggering the metadata update, not after. With the old ordering, the metadata loop could complete the update and call doOnMetadataUpdate before the channel existed, causing the signal to be lost and listOrEpoch to wait indefinitely (until the 5-minute metadata ticker). The race is rare but reproducible under heavy load. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Transactional offset commits have a two-phase visibility gap: after EndTransaction returns success, the broker asynchronously writes commit markers to __consumer_offsets, and only then are the new offsets visible to OffsetFetch. With the classic group protocol, JoinGroup/SyncGroup round-trips add seconds of delay between a member leaving and the next member fetching offsets, giving the markers time to materialize. KIP-848 assigns partitions directly via heartbeat responses. A member's leave and the next assignment can happen within ~180ms -- far too fast for marker-writing to complete under load. A test showed member B's OffsetFetch returning offset 4254 (from member A's 1st txn) instead of 6967 (from A's 2nd txn) because the 2nd txn's markers hadn't been written yet, even though EndTransaction succeeded 420ms earlier. This caused member B to re-consume already-processed records. RequireStable (KIP-447, Kafka 2.5+) is the correct fix: it causes OffsetFetch to block until pending transactional offsets are resolved. Java hardcodes requireStable=true for all consumer-group offset fetching; we follow suit. The RequireStableFetchOffsets option is deprecated as a no-op. Any broker that supports KIP-848 necessarily supports KIP-447, so old brokers that lack RequireStable will never encounter the fast reassignment behavior that triggers this bug. The fallback sleep for brokers too old for KIP-447 is bumped from 200ms to 500ms for extra margin. For kadm, a new RequireStable context function lets callers opt in to the same behavior on FetchOffsets, FetchOffsetsForTopics, FetchManyOffsets, and FetchOffsetsByID. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Reduce the STALE_MEMBER_EPOCH retry sleep from 1s to 500ms. The sleep exists because goroutine scheduling may let us process the commit response before the heartbeat response that carries the updated epoch, even though they arrive in order on the same connection. 500ms is plenty of time for the heartbeat loop goroutine to run. Also rewrite the comment to explain the KIP-1251 limitation: if the heartbeat loop has stopped (e.g. during leave), memberGen is not updated and we break without retrying. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix a deadlock between etlSem (capacity-4 channel) and t.Parallel(). Parallel subtests cannot start until the parent returns, but the parent blocks on etlSem which only drains inside the subtests. Replace t.Parallel() with explicit goroutines and a WaitGroup so the semaphore send happens before t.Run and the drain happens in the subtest body. Also add an errSkipChecks848 sentinel: when a KIP-848 group's OnPartitionsRevoked commit fails with STALE_MEMBER_EPOCH (a known pre-KIP-1251 limitation where the heartbeat loop has stopped and the epoch cannot be refreshed), the test skips final validation instead of hard-failing. This avoids flaky failures on Kafka <4.3 while still validating the majority of runs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Update TxnMetadataValue: add flexible v1+ (KIP-915), make Topics nullable, add tagged fields PreviousProducerID/NextProducerID/ ClientTransactionVersion (KIP-890) and NextProducerEpoch (KIP-939). Update LeaderChangeMessageVoter: add VoterDirectoryID (KIP-853). Fix LeaderChangeMessage comment (type 2, not type 3). Fix ControlRecordKeyType base type from int8 to int16 to match Kafka's wire format. This is a breaking protocol change, but int8 was a bug: Kafka has always encoded the control record key type field as int16. Add TestValidateMiscDSLAgainstKafkaJSON to validate misc DSL types (OffsetCommit, GroupMetadata, TxnMetadata, DefaultPrincipalData, EndTxnMarker, LeaderChangeMessage) against Kafka JSON schemas. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Canceling a prior in-flight OffsetCommit kills the TCP connection (context cancelation sets read/write deadlines to now, triggering cxn.die()). The subsequent commit then uses a new connection, and the broker can process two requests on different connections out of order. If the prior commit carried a lower offset (e.g. autocommit HEAD vs. sync commit DIRTY), the broker could process it after the newer commit, rewinding the committed offset and causing duplicate consumption on the next rebalance. Instead of canceling, wait for the prior commit to complete naturally. This keeps the connection alive and ensures our request is queued behind the prior on the same connection, preserving ordering. The commitCancel field on groupConsumer is removed entirely since it is no longer read. Applies the same fix to both commit() and commitTxn(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Coordinator errors (NOT_COORDINATOR, COORDINATOR_NOT_AVAILABLE, COORDINATOR_LOAD_IN_PROGRESS) and broker-level errors (connection closed, EOF) from the initial ConsumerGroupHeartbeat were falling through to manageFailWait, which calls onLost and injects a fake fetch error visible to the user. These errors are transient and should be retried silently with backoff. ConsumerGroupHeartbeat cannot rely on the client's default retry logic, so we handle retries explicitly: delete the stale coordinator so the next attempt discovers the new one, and retry with exponential backoff. Also handle coordinator errors during the heartbeat loop the same way as retriable broker errors -- delete stale coordinator and continue heartbeating. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add MetadataMinAge(100ms) to TestConsumeRegex and TestPauseIssue489 so that triggerUpdateMetadataNow is not throttled by the default 5s minimum - Increase TestConsumeRegex wait from 5s to 15s to tolerate slow topic creation - Add 30s timeout and error check to TestIssue523's first PollFetches to prevent indefinite hangs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a KIP-848 heartbeat receives UNKNOWN_MEMBER_ID, the code reset the member ID but let the error propagate to manageFailWait, which injected a fake ErrGroupSession into the user's poll loop. This is wrong -- UNKNOWN_MEMBER_ID is a transient condition (e.g. coordinator restart, member session timeout) and the client should silently rejoin. The fix adds `continue outer` to the UNKNOWN_MEMBER_ID case, matching how FencedMemberEpoch is already handled: clear nowAssigned and loop back to initialJoin. This is consistent with the Java client, which treats UNKNOWN_MEMBER_ID and FENCED_MEMBER_EPOCH identically (both call transitionToFenced, reset epoch to 0, skip backoff, and do not surface errors to the application). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Retryable broker errors (connection closed, EOF) and coordinator errors (NOT_COORDINATOR, COORDINATOR_NOT_AVAILABLE, etc.) during KIP-848 heartbeating were propagating out of the heartbeat loop back to the manage848 switch, which cleared the error and re-entered setupAssignedAndHeartbeat with the full heartbeat interval (e.g. 5s). This caused a stale-connection cycle: the interval outlived short-lived connections, so each new attempt found a dead pooled connection and failed immediately. The classic protocol avoids this because its heartbeat is wrapped in the client's retryable request mechanism, which transparently retries with backoff. KIP-848 heartbeats bypass that wrapper and manage retries themselves. Move the retryable broker error and coordinator error handling from the manage848 switch into the heartbeat loop itself. On transient errors, reset the heartbeat timer with exponential backoff (matching the retryable wrapper: 250ms base, doubling, 5s cap) and continue in-place. This avoids tearing down and rebuilding the full session (diffAssigned, prerevoke, assign, fetchOffsets) on every connection blip, and ensures the retry fires quickly enough to use a fresh connection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…E_ID Cap the KIP-848 in-place heartbeat retry counter at cfg.retries and let the error propagate to manage848 when the cap is hit, restarting the session cleanly. Previously the counter grew without bound but was never compared to a limit, so errors accumulated but never triggered a session restart. The counter is already reset on each successful heartbeat so intermittent failures do not compound. When the heartbeat retry cap is hit and the error propagates to the manage848 switch, treat retryable broker errors and stale coordinator errors as transient: clear the error and restart the session rather than falling through to manageFailWait (which surfaces a fatal fake-fetch error to the user). Also treat UNRELEASED_INSTANCE_ID as retryable in the initialJoin error path. After FencedMemberEpoch, we immediately call initialJoin which sends epoch 0 to rejoin. If the server hasn't finished releasing the static instance from the old epoch, it returns UNRELEASED_INSTANCE_ID. We retry with backoff to give the server time to process the release. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously, listOrEpoch returned early when the session context was cancelled, which could allow sub-goroutines (listOffsetsForBrokerLoad / loadEpochsForBrokerLoad) to still be running after the worker count drops to zero. stopSession would then consider the session fully stopped and purgeTopics could modify tps while a sub-goroutine still references it. Fix this by always draining all results before returning. Context cancellation propagates to each sub-goroutine's broker.waitResp, so they finish quickly. This drain also exposed a latent bug in listOffsetsForBrokerLoad: when two ListOffsets requests are issued (for exact offset / afterMilli / relative offset validation), if req1 succeeds but req2 fails, the error path passed req1's nil error to errToLoaded. This created loadedOffset entries with both a nil cursor and nil error. handleListOrEpochResults then matched the nil-error success case and called setOffset on the nil cursor. Previously this was masked because context cancellation (the most likely cause of req2 failing while req1 succeeds) triggered the early return. Now that we drain, we must pass the actual error. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a heartbeat response containing a partition assignment is lost (e.g. connection dies mid-read), the server internally marks the assignment as delivered, but the client never receives it. Two changes fix this: 1. Always send a full heartbeat (Topics=[]) rather than a keepalive (Topics=nil) when the client has no assigned partitions. This ensures the server sees the client's actual empty state and re-delivers any pending assignment. Previously, topicsMatch was true for empty slices, causing unassigned clients to send keepalive indefinitely. 2. Reset the last-sent heartbeat state (lastTopics, lastSubscribedTopics) on any heartbeat failure -- network error or error code in response. This forces the next heartbeat to be a full request, matching the Java client's sentFields.reset() behavior. This covers the case where a client already has partitions and loses a response with an updated assignment. The error handling is restructured so network errors and server error codes follow the same reset-and-return path rather than two separate branches. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Topic names are randomly generated, so TopicAlreadyExists can only mean our prior CreateTopics request succeeded but the response was lost (e.g. due to a connection blip). The old code retried in a sleep loop, which was unnecessary and could fail on the next attempt too. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
KIP-951 (leader discovery optimizations) added a fast-path where Produce and Fetch responses include broker metadata and a CurrentLeader field when returning NOT_LEADER_FOR_PARTITION. The client uses this to move partitions to the new leader without waiting for a full metadata refresh. The ensureBrokers function, called as part of this fast-path, was passing the KIP-951 response's broker list to updateBrokers. updateBrokers is designed for the full metadata response: it does a merge-sort against the existing broker list and removes any broker not present in the input. The KIP-951 response only includes brokers relevant to the move (often just one), so updateBrokers would destroy every other broker object in the cluster. These destroyed brokers are recreated on the next metadata refresh as new objects with new TCP connections. This created a cycle: every metadata refresh recreated the brokers, then the next KIP-951 move response destroyed them again. The effect was that each produce request to these brokers used a different broker object and thus a different TCP connection. Kafka's protocol guarantees FIFO request processing per connection, and the client relies on this for idempotent produce ordering. With multiple connections to the same physical broker, the broker's request handler thread pool can process requests concurrently, and per-partition lock acquisition order becomes non-deterministic. A later request's batch can acquire a partition lock before an earlier request's batch, causing the broker to see a sequence number gap: OUT_OF_ORDER_SEQUENCE_NUMBER. The OOOSN then triggers an epoch bump (KIP-360), which resets the producer identity. Records already committed under the old epoch are re-produced under the new epoch, bypassing Kafka's deduplication and creating duplicate records. There was a secondary race that allowed this to manifest even though stopForever should have killed the connections: stopForever acquires reapMu and calls cxnProduce.die(), but if the broker object was just created, cxnProduce is nil (loadConnection is still in the TCP handshake). die() on nil is a no-op. loadConnection then finishes, stores the connection in cxnProduce, and the request completes on a connection that should never have existed. loadConnection never checked b.dead, so it happily created connections on dead broker objects. Fix ensureBrokers to add/replace individual brokers without removing unrelated ones: if the NodeID exists and the address matches, keep it; if the address changed, stop the old and create a new one; if the NodeID is new, add it. Fix loadConnection to check b.dead after acquiring reapMu, before storing the new connection. If the broker was killed while we were connecting, close the connection and return errChosenBrokerDead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Log the key state transitions in idempotent/transactional produce sequence management. These are the events that matter when diagnosing duplicate records or sequence errors, and were previously silent: - Producer ID failure (INFO): the moment the producer identity is poisoned by a produce error, triggering epoch reinitialization - Idempotent epoch bump (INFO): non-transactional KIP-360 local epoch recovery after a sequence error - Sequence rewind (DEBUG): partition sequences rewound to resend pending batches after a connection death or retryable error - Sequence reset (DEBUG): partition sequences reset to 0 when a new producer epoch starts - Retryable produce error (DEBUG): fills the gap where retryable broker errors were silently retried with no log at any level - OOOSN and DuplicateSequenceNumber (existing INFO logs): add sequence, first_sequence, num_records, and base_offset fields Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add two test utilities for debugging produce-related issues: ringLogger wraps a Logger, capturing all entries (including DEBUG) in a circular buffer while only forwarding INFO+ in real time. Call flush() to dump the full backlog when a test detects a problem, giving complete DEBUG context without the noise on every run. Supports automatic flushing on pattern matches via shouldFlush (currently empty; add needle strings as needed for future investigations). chaosDialer wraps connections with a random 500ms-1500ms lifetime, forcing connection deaths that stress reconnection, produce retries, and sequence number handling. Enabled via KGO_TEST_CHAOS=1 on the TestGroupETL producer and ETL consumers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Spin loop testing raised a bunch of small very, very longstanding bugs within the library, with the most confusing one being 0f4df5f. I think I've encountered this bug maybe ... 3-4 times in the last three years, in all spin loop testing ever?
The bugfixes aren't important to get out immediately - they're hard to encounter and nobody has raised an issue in years, but they are important to note.
In this iteration of spinloop testing, I added a chaos dialer that kills connections every 0.5-1.5s for the entire test run. So, lots of chaos.