Conversation
The ConsumerGroupDescribe handler already implements IncludeAuthorizedOperations via groupAuthorizedOps - the skipped_features entry was stale. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add APPEND and SUBTRACT operations for list-type configs (cleanup.policy, sasl.enabled.mechanisms, super.users). kadm exposes AppendConfig/SubtractConfig which use these ops. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Report transaction.version and group.version in both SupportedFeatures and FinalizedFeatures, gated on whether the relevant API keys are available via maxVersions config. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
kfake has one replica per partition so election always succeeds. Validates topic/partition existence and cluster ALTER ACL. Enables kadm.ElectLeaders() to be tested against kfake. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement log compaction that deduplicates records by key. The last batch (active segment) is never compacted. Null-keyed records are dropped, superseded records are removed, and tombstones older than delete.retention.ms are cleaned up. Orphaned transaction control batches (where no data remains for the PID) are also removed. A background ticker driven by log.cleaner.backoff.ms runs compaction automatically. The public Compact() method allows manual triggering. Fetch gaps from compacted offsets are handled by skipping to the next available batch in searchOffset. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Support instance IDs (KIP-345) for both classic and KIP-848 consumer groups. Classic groups accept instanceID in Join/Sync/Heartbeat/Leave/ OffsetCommit, with replaceStaticMember handling rejoin without rebalance when protocol is unchanged. 848 groups support epoch -2 (static leave), fence by instanceID on join, and preserve static mapping across session timeouts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add range/static, cooperative-sticky/static, and sticky/848/static entries to the GroupETL and TxnEtl test matrices. Each goroutine gets a unique instanceID and sends a raw LeaveGroup with instanceID on cleanup. Gate concurrent test cases to 4 via a semaphore. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add applyRetention to trim expired or oversized batches per partition. The background cleaner ticker now also fires for topics with retention configs, not just compaction. Add ApplyRetention() for manual triggering. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reuse the 4-byte read buffer on brokerCxn instead of allocating per read. Reuse the linger timer via Reset and store the method value closure once to eliminate 2 allocs per linger cycle. Skip header slice allocation when records have no headers. Replace deprecated reflect.SliceHeader with unsafe.Slice in StringRecord/KeyStringRecord. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- KIP-814: static member leader rejoin skips rebalance, sets SkipAssignment - group.max.size: reject new members with GROUP_MAX_SIZE_REACHED - CreateTopics: accept ReplicaAssignment with Kafka-compatible validation - transactional.id.expiration.ms: expire idle transactional IDs via pids.byTxid - OffsetFetch: validate MemberID/MemberEpoch for 848 groups (v9+) - TxnOffsetCommit: validate instanceID against group staticMembers - 848 range assignor: sort static members (by instanceID) before dynamic - group.min/max.session.timeout.ms: expose as broker configs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add kafka_tests for static membership, group max size, and replica assignment. Remove Test848ThreeConsumersFairDistribution and Test848AllMembersLeave (covered by kafka_tests). Reduce sleeps in static membership and txn tests. Fix TestRetentionTime flake under -race by increasing retention.ms from 1 to 100. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- helpers_test.go: randsha() dedup check used == instead of <=, allowing duplicates when 3+ calls happen at the same clock tick - data.go: topic UUID generation used sha256(time.Now()), replaced with crypto/rand-based randUUID() - txns.go: pids.create() looked up by FNV-64 hash of txnID which could collide between different txnIDs; now looks up byTxid map first and falls back to randomID() on hash collision. Also simplified findTxnID() to use byTxid map directly. - groups.go: handleDeleteGroups called delete(gs.gs) inside a waitControl callback (manage goroutine). If the callback called quitOnce(), waitControl could return early, and the delete would race with gs.gs iteration in Cluster.run() (notifyTopicChange). Move the delete to Cluster.run() after waitControl returns. - txns.go: doTxnOffsetCommit ignored waitControl return value, allowing a data race on errCode if the group dies mid-callback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add test coverage for three previously untested handlers: - TestElectLeaders: verifies preferred leader election for specific partitions and all-partitions mode - TestIncrementalAlterConfigAppendSubtract: verifies APPEND and SUBTRACT operations on list-type broker configs - TestApiVersionsSupportedFeatures: verifies that the SupportedFeatures field is populated in ApiVersions responses Also add AuthorizedOperations assertion to Test848DescribeGroup to verify KIP-430 compliance. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous round-robin assignUniform redistributed all partitions on every rebalance. With KIP-848 cooperative rebalancing, each moved partition requires a revoke-confirm cycle, so unnecessary movement causes cascading delays as members wait for revocations to complete. Rewrite assignUniform as a sticky algorithm: 1. Retain each member's existing target partitions that are still valid (member is subscribed, partition exists, within quota) 2. Reclaim excess partitions from over-quota members 3. Distribute free partitions to under-quota members, preferring members that previously owned them This matches the behavior of Kafka's server-side uniform assignor and eliminates partition thrashing during incremental membership changes. Add TestAssignUniformStickyOnLeave and TestAssignUniformStickyRapidJoinsThenLeave to verify that existing assignments are preserved across member joins and leaves. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
With KIP-890 (EndTxn v5+), the producer epoch is bumped on every EndTxn. A race exists between producerID() and createReq() in the sink's produce path: 1. Sink A calls producerID(), gets epoch N 2. Meanwhile, the last produce response completes on another sink, bufferedRecords hits 0, Flush returns 3. EndTxn bumps the epoch to N+1 4. User produces new records for the next transaction 5. Sink A resumes, calls createReq(epoch=N), picks up the new records stamped with the stale epoch The broker rejects with INVALID_PRODUCER_EPOCH, permanently failing the records. Fix by checking the producer epoch after createReq. If it changed, undo the drain (resetBatchDrainIdx + decInflight) and retry. Use a raw atomic load to avoid side effects from producerID(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The OffsetFetch response has both a group-level ErrorCode and per-partition error codes. fetchOffsets only checked per-partition errors, silently ignoring the group-level error. For KIP-848 consumer groups, the server validates MemberEpoch on OffsetFetch requests. When the epoch changes between the heartbeat that assigned partitions and the subsequent OffsetFetch (due to a concurrent rebalance), the server returns STALE_MEMBER_EPOCH at the group level with no topics. The client saw an empty response and assigned no partitions, leaving those partitions permanently stuck. Fix by checking resp.ErrorCode after the request succeeds. On STALE_MEMBER_EPOCH, force a heartbeat to update the epoch and retry, capping at 5 attempts. All other group-level errors are returned as fatal. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the init() that globally lowered the 848 heartbeat interval to 100ms in test binaries. This affected all kfake clusters, including the standalone server when run via go test, and made it impossible to test with the real 5s default. Instead, set group.consumer.heartbeat.interval.ms=100 explicitly in the newCluster test helpers. Also remove the redundant -c flag from run_tests.sh since the standalone server should use the real default. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ponse condition Advance the 848 member epoch when the member's own revocations are done (UNRELEASED_PARTITIONS), rather than waiting for full convergence. This matches real Kafka behavior and avoids serializing all revocation round-trips through 5s heartbeat intervals. The response-sending condition is simplified to match Kafka's hasAssignedPartitionsChanged: full request OR reconciled assignment changed. The old m.memberEpoch < g.generation catch-all is removed. Fix subscription comparison to avoid spurious generation bumps: sort SubscribedTopicNames before comparing (order from map iteration is random), and compare SubscribedTopicRegex by source string rather than unconditionally recomputing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sort SubscribedTopicNames and SubscribedTopicRegex patterns before sending in heartbeat requests. Go map iteration is unordered, so each heartbeat could send topics in a different order. If the server compares subscriptions with order sensitivity (the KIP does not require any particular order), this triggers unnecessary generation bumps. Track unresolved topic IDs from heartbeat assignment responses. When the server assigns a topic ID the client hasn't seen in metadata yet (e.g. newly created topic matching a regex), store it and include it in subsequent heartbeat Topics so the server sees acknowledgment. Resolve on subsequent heartbeats after metadata refreshes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Gate reconciledAssignment behind a revocation state check: when a member has pending revocations and the client still reports owning them (or sent a keepalive), skip recomputation entirely and return the member unchanged. This matches Kafka's CurrentAssignmentBuilder where UNREVOKED_PARTITIONS state freezes the member until the client confirms all revocations. Previously, reconciledAssignment ran unconditionally every heartbeat. When the target changed during in-flight revocations (e.g. range balancer reshuffling), the intersection in step 1 would shrink further, cascading into sent=0 with unrecoverable pending. Also removes the bandaid freeze that was inside reconciledAssignment (returning lastReconciledSent unchanged when pending > 0) - the heartbeat-level gate replaces it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When the server sends an explicit empty assignment (resp.Assignment non-nil with zero topics, meaning "revoke everything"), handleResp returned nil (no change detected) because len(newAssigned) == 0. The client never revoked, kept sending keepalives, and the server's pending revocations were never cleared - deadlocking reconciliation. Now the early return only fires for keepalive responses (no assignment at all) or when all assigned topics are still unresolved (waiting for metadata). An explicit empty assignment falls through to the comparison against the client's current owned set, correctly detecting the change. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When an OffsetCommit and heartbeat are pipelined on the same broker connection, the server may process the heartbeat first (bumping the member epoch) before seeing the OffsetCommit (which still carries the old epoch). The old retry code called g.g848.mkreq() from the commit goroutine to force a heartbeat, which caused a data race on unresolvedAssigned. The new code simply reloads the latest epoch from memberGen (atomic) - by the time the client sees the STALE error, the pipelined heartbeat response has already stored the updated epoch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PREVIOUSLY: When the 848 heartbeat closure detects an assignment change, it returns RebalanceInProgress but the heartbeat loop continues sending heartbeats while waiting for s.revoke to complete. If the timer fires before the revoke finishes, a second heartbeat goes out. The server may have already revoked the just-assigned partition (e.g. a member left and the range assignor shifted), but the second heartbeat's handleResp compares against stale nowAssigned (not yet persisted) and misses the revocation. The client then enters the next iteration believing it owns the revoked partition, permanently blocking the group from converging. NOW: Fix this by introducing errReassigned848: when the 848 closure detects a change, it returns this new internal error instead of RebalanceInProgress. The heartbeat loop recognizes it, suppresses further heartbeat calls (while still waiting for s.revoke), and converts it to RebalanceInProgress for the existing revoke/return logic. EXTRA: Also removes redundant balancer subtests (reduces test suite from 20 things doing the same-ish thing to 12), fixes leaveGroupStatic for 848 groups, and fixes the TestIssue865 flush condition.
… static membership Per-partition assignment epochs (KIP-1251): replace the single assignmentEpoch field with per-partition tracking. Retained partitions keep their existing epoch across reconciliation; newly assigned partitions get the member's current epoch. OffsetCommit validates the request epoch per-partition, rejecting STALE_MEMBER_EPOCH only for partitions assigned at a newer epoch. Static membership (epoch -2): rework consumerStaticLeave to keep the member in consumerMembers at epoch -2 with re-stamped partition epochs, matching Kafka's consumerGroupStaticMemberGroupLeave. This preserves partition reservations so other members cannot claim them during the static member's absence. On rejoin, the new member inherits lastReconciledSent, targetAssignment, and partAssignmentEpochs from the departed member before fencing it. Also adds consumerRejoin for active non-static members rejoining with epoch 0 (updates subscription in place), fixes heartbeat/join/sync dispatch to retry on dead group goroutines, tightens OffsetFetch validation for admin vs member fetches, and restructures the reconciliation to separate assignment sending from state updates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When a client disconnects, read() closes the connection and returns, but write() blocks forever on respCh / c.die with no way to detect the disconnect. Over time this leaks write goroutines (observed 505 leaked goroutines via pprof). Add a done channel to clientConn that read() closes on exit. Select on done in write() and all respCh send sites (Cluster.run and group.reply) so they unblock when the reader exits. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add group name to "storing member and epoch" / "storing epoch" debug logs so they can be correlated across concurrent groups. Remove the verbose unresolved topic ID INFO log - the metadata trigger reason already explains why the update was requested. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Logging: - Remove high-volume debug logs: handleJoin/Sync/Leave, rebalance, completeRebalance, completeLeaderSync, OffsetFetch/OffsetCommit per-partition, waitControl slow timer, fenceConsumer, computeTarget, client disconnect - Keep structured logs: consumerJoin/StaticLeave (INF), consumerHeartbeatLate (WRN), RECONCILE SEND/SUMMARY/REVOKE-FIRST (DBG), pendingRevoke cleared (DBG), OffsetCommit STALE (WRN) - Add EndTxn slow (WRN), txn long-running (WRN), txn timeout abort (WRN) to txns.go Standalone server: - Add -pprof flag (default :6060) for heap/goroutine profiling Test runner (run_tests.sh): - Leave server running on ctrl+c (use --clean to kill) - Add --pprof flag passthrough - Increase race timeout to 300s - Pass heartbeat interval 1s via -c flag - Show per-run timing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Since CI runs against 1.1 and prior
One more pass! LLM analyzed differences between kfake and kafka, and then we take those differences and repeatedly smash kfake with a hammer until the behavior is identical. Next pass is going to be simplifications, if possible. It's necessary to match behavior exactly because subtle changes here have caused bug after bug after bug to be papered over - moving kfake to be more like kafka actually raised bugs in kgo itself.
One test hung unexplainably.
Extract repeated patterns into helpers (logName, evictConsumerMember, updateMemberSubscriptions, countPartitionEpochs), reuse existing helpers (copyAssignment, topicsToAssignment, removeMember), replace custom code with stdlib (maps.EqualFunc, maps.Copy, slices.Collect, slices.Contains), and remove dead code. Net reduction of ~106 lines with no behavior change. Fix Test848CooperativeRevocationDuringConsumption to use a per-poll timeout so one slow client can't block polling the others for the entire deadline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This adds support to kfake for a few more missing features - the biggest being static group membership, but then some other "minor" ones: log compaction (new
Compact()API), retention (retention.ms,retention.bytes, andApplyRetention()), ElectLeaders (change leaders for partitions), create topics ReplicaAssignment, group max size, append/subtract for incremental configs, some bug fixes and improvements.Through these additions and with a lot of looping kgo tests against kfake, we actually found multiple kgo bugs - not really relevant to release NOW but are important for the next release.
The first, a TOCTOU race that is hard to encounter : requires KIP-890 and an extremely fast request/response from the broker. See the commit.
The others, a whole lot of bugs in the 848 implementation See the commits with a "kgo:" prefix to understand them - they're very interesting IMO.
There are a few kgo perf improvements in here, tiny ones but eh why not - helps.
This also now runs - in addition to the existing kgo tests - the kgo tests with static membership.
More kfake tests added (generated).