File tree Expand file tree Collapse file tree 4 files changed +17
-8
lines changed
Expand file tree Collapse file tree 4 files changed +17
-8
lines changed Original file line number Diff line number Diff line change @@ -276,7 +276,7 @@ func (gs *groups) newGroup(name string) *group {
276276 pending : make (map [string ]* groupMember ),
277277 staticMembers : make (map [string ]string ),
278278 protocols : make (map [string ]int ),
279- reqCh : make (chan * clientReq ),
279+ reqCh : make (chan * clientReq , 16 ),
280280 controlCh : make (chan func (), 1 ), // buffer 1: holds a pending notifyTopicChange
281281 quitCh : make (chan struct {}),
282282 }
Original file line number Diff line number Diff line change 7272
7373## 68_consumer_group_heartbeat (KIP-848)
7474- SubscribedTopicRegex: RE2 syntax, not Java RE2J
75- - Member epoch advancement only when fully converged: kfake's conservative
76- approach is safe and simpler. Advancing in UNRELEASED_PARTITIONS state
77- only affects convergence speed, not correctness.
78- - No ASSIGNING/UNRELEASED_PARTITIONS states: kfake uses empty/reconciling/stable.
79- Missing intermediate states don't affect kgo client behavior.
80- - Metadata hash / metadata expiration: deliberate divergence; kfake recomputes
81- assignments on every relevant heartbeat rather than caching a metadata hash
75+ - Metadata hash: kfake uses notifyTopicChange() from the cluster run loop
76+ instead of hashing metadata. Functionally equivalent for in-process use.
77+ - Group downgrade (consumer -> classic): not supported. Once a group
78+ upgrades to "consumer" type, it stays that way.
8279
8380## General
8481- Tiered storage: Not implemented (KIP-405, KIP-1005)
Original file line number Diff line number Diff line change @@ -860,6 +860,9 @@ func TestGroupSimple(t *testing.T) {
860860 } {
861861 t .Run (tc .name , func (t * testing.T ) {
862862 t .Parallel ()
863+ if tc .enable848 && ! allow848 {
864+ t .Skip ("broker does not support KIP-848 (requires ConsumerGroupHeartbeat, key 68)" )
865+ }
863866
864867 t1 , cleanup := tmpTopicPartitions (t , 1 )
865868 defer cleanup ()
Original file line number Diff line number Diff line change 4444 // Static membership (KIP-345) requires JoinGroup v5+.
4545 allowStaticMembership = false
4646
47+ // KIP-848 requires ConsumerGroupHeartbeat (key 68).
48+ allow848 = false
49+
4750 // KGO_TEST_TLS: DSL syntax is ({ca|cert|key}:path),{1,3}
4851 testCert * tls.Config
4952
@@ -211,6 +214,9 @@ func init() {
211214 if v , ok := versions .LookupMaxKeyVersion (11 ); ok && v >= 5 { // 11 = JoinGroup
212215 allowStaticMembership = true
213216 }
217+ if _ , ok := versions .LookupMaxKeyVersion (68 ); ok { // 68 = ConsumerGroupHeartbeat
218+ allow848 = true
219+ }
214220 break
215221 }
216222 if time .Now ().After (deadline ) {
@@ -564,6 +570,9 @@ func testChainETL(
564570 if instanceID != "" && ! allowStaticMembership {
565571 t .Skip ("broker does not support static membership (requires JoinGroup v5+, KIP-345)" )
566572 }
573+ if enable848 && ! allow848 {
574+ t .Skip ("broker does not support KIP-848 (requires ConsumerGroupHeartbeat, key 68)" )
575+ }
567576 errs := make (chan error )
568577 var (
569578 /////////////
You can’t perform that action at this time.
0 commit comments