Skip to content

Commit bb3b3fb

Browse files
authored
Merge pull request #1270 from twmb/some-4.2-and-bugs
Some bugs from spinloops while working on 4.2
2 parents 0873149 + e688274 commit bb3b3fb

24 files changed

+1073
-184
lines changed

DESIGN.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
claude --resume 8707654f-2524-4c51-8f7e-8d161eee8dd0
2-
31
# franz-go Internals
42

53
This document is your guide to working on the `kgo` package. It covers how the

docs/transactions.md

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -140,31 +140,38 @@ Within Kafka when a transaction ends, Kafka propagates a commit marker to all
140140
partitions that were a part of the transaction. If a rebalance finishes and the
141141
new consumer fetches offsets _before_ the commit marker is propagated, then the
142142
new consumer will fetch the previously committed offsets, not the newly
143-
committed offsets. There is nothing a client can do to reliably prevent this
144-
scenario. Here, franz-go takes a heuristic approach: the assumption is that
145-
inter-broker communication is always inevitably faster than broker `<=>` client
146-
communication. On successful commit, if the client is not speaking to a 2.5+
147-
cluster (KIP-447 cluster) _or_ the client does not have
148-
`RequireStableFetchOffsets` enabled, then the client will sleep 200ms before
149-
releasing the lock that allows a rebalance to continue. The assumption is that
150-
200ms is enough time for Kafka to propagate transactional markers: the
151-
propagation should finish before a client is able to do the following: re-join,
152-
have a new leader assign partitions, sync the assignment, and issue the offset
153-
fetch request. In effect, the 200ms here is an attempt to provide KIP-447
154-
semantics (waiting for stable fetch offsets) in place it matters most even
155-
though the cluster does not support the wait officially. Internally, the sleep
156-
is concurrent and only blocks a rebalance from beginning, it does not block
157-
you from starting a new transaction (but, it does prevent you from _ending_
158-
a new transaction).
143+
committed offsets. This is especially dangerous with KIP-848, where heartbeat-
144+
driven reassignment can move partitions in under 200ms -- far faster than the
145+
classic JoinGroup/SyncGroup round-trips that previously gave markers time to
146+
materialize.
147+
148+
KIP-447 (Kafka 2.5+) solves this with `RequireStable`: the broker blocks an
149+
OffsetFetch response until all pending transactional offset commits are
150+
resolved. franz-go now always sets `RequireStable=true` on group consumer
151+
OffsetFetch requests, matching the Java client's behavior. Any broker that
152+
supports KIP-848 necessarily supports KIP-447, so the fast reassignment path
153+
is always protected.
154+
155+
For older brokers that do not support KIP-447, franz-go takes a heuristic
156+
approach: the assumption is that inter-broker communication is always
157+
inevitably faster than broker `<=>` client communication. On successful commit,
158+
the client sleeps 500ms before releasing the lock that allows a rebalance to
159+
continue. The assumption is that 500ms is enough time for Kafka to propagate
160+
transactional markers: the propagation should finish before a client is able to
161+
do the following: re-join, have a new leader assign partitions, sync the
162+
assignment, and issue the offset fetch request. Internally, the sleep is
163+
concurrent and only blocks a rebalance from beginning, it does not block you
164+
from starting a new transaction (but, it does prevent you from _ending_ a new
165+
transaction).
159166

160167
One last flaw of the above approach is that a lot of it is dependent on timing.
161168
If the servers you are running on do not have reliable clocks and may be very
162169
out of sync, then the timing aspects above may not work. However, it is likely
163170
your cluster will have other issues if some broker clocks are very off. It is
164171
recommended to have alerts on ntp clock drift.
165172

166-
Thus, although we do support 2.5+ behavior, the client itself works around
167-
duplicates in a pre-2.5 world with a lot of edge case handling. It is
168-
_strongly_ recommended to use a 2.5+ cluster and to always enable
169-
`RequireStableFetchOffsets`. The option itself has more documentation on
170-
what other settings may need to be tweaked.
173+
Thus, the client works around duplicates in a pre-2.5 world with a lot of edge
174+
case handling, but it is _strongly_ recommended to use a 2.5+ cluster where
175+
`RequireStable` provides a proper server-side guarantee. The
176+
`RequireStableFetchOffsets` option is deprecated and is now a no-op; stable
177+
fetch offsets are always enabled.

generate/definitions/enums

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,11 @@ QuotasMatchType int8 (
146146
// MISC //
147147
//////////
148148

149-
ControlRecordKeyType int8 (
149+
ControlRecordKeyType int16 (
150150
0: ABORT
151151
1: COMMIT
152+
// QUORUM_REASSIGNMENT was renamed to LEADER_CHANGE in Kafka 3.0
153+
// (KAFKA-12952 commit d3ec9f940c), but the meaning is the same.
152154
2: QUORUM_REASSIGNMENT
153155
3: SNAPSHOT_HEADER
154156
4: SNAPSHOT_FOOTER

generate/definitions/misc

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,12 @@ TxnMetadataKey => not top level, with version field
321321

322322
// TxnMetadataValue is the value for the Kafka internal __transaction_state
323323
// topic if the key is of TxnMetadataKey type.
324-
TxnMetadataValue => not top level, with version field
324+
//
325+
// Version 0 was introduced in 0.11.0.
326+
//
327+
// KAFKA-14869 commit 4b6dcf19dc, proposed in KIP-915 and included in 3.5
328+
// released version 1.
329+
TxnMetadataValue => not top level, with version field, flexible v1+
325330
// Version is the version of this value.
326331
Version: int16
327332
// ProducerID is the ID in use by the transactional ID.
@@ -335,7 +340,7 @@ TxnMetadataValue => not top level, with version field
335340
// CompleteCommit, 5 is CompleteAbort, 6 is Dead, and 7 is PrepareEpochFence.
336341
State: enum-TransactionState
337342
// Topics are topics that are involved in this transaction.
338-
Topics: [=>]
343+
Topics: nullable[=>]
339344
// Topic is a topic involved in this transaction.
340345
Topic: string
341346
// Partitions are partitions in this topic involved in the transaction.
@@ -345,6 +350,20 @@ TxnMetadataValue => not top level, with version field
345350
LastUpdateTimestamp: int64
346351
// StartTimestamp is the timestamp in millis of when this transaction started.
347352
StartTimestamp: int64
353+
// PreviousProducerID is the producer ID used by the last committed
354+
// transaction. KAFKA-14562 commit ede0c94aaa, proposed in KIP-890 and
355+
// included in 4.0.
356+
PreviousProducerID: int64(-1) // tag 0
357+
// NextProducerID is the latest producer ID sent to the producer for the
358+
// given transactional ID. KAFKA-14562 commit ede0c94aaa, proposed in
359+
// KIP-890 and included in 4.0.
360+
NextProducerID: int64(-1) // tag 1
361+
// ClientTransactionVersion is the transaction version used by the client.
362+
// KAFKA-14562 commit ede0c94aaa, proposed in KIP-890 and included in 4.0.
363+
ClientTransactionVersion: int16 // tag 2
364+
// NextProducerEpoch is the producer epoch associated with NextProducerID.
365+
// KAFKA-15370 commit 247c0f0ba5, proposed in KIP-939 and included in 4.1.0.
366+
NextProducerEpoch: int16(-1) // tag 3
348367

349368
// StickyMemberMetadata is is what is encoded in UserData for
350369
// ConsumerMemberMetadata in group join requests with the sticky partitioning
@@ -461,8 +480,11 @@ EndTxnMarker => not top level, with version field
461480

462481
LeaderChangeMessageVoter => not top level, no encoding, flexible v0+
463482
VoterID: int32
483+
// VoterDirectoryID is the directory ID of the voter.
484+
// KAFKA-16915 commit da8fe6355b, proposed in KIP-853 and included in 3.9.
485+
VoterDirectoryID: uuid // v1+
464486

465-
// LeaderChangeMessage is the value for a control record when the key is type 3.
487+
// LeaderChangeMessage is the value for a control record when the key is type 2.
466488
LeaderChangeMessage => not top level, with version field, flexible v0+
467489
Version: int16
468490
// The ID of the newly elected leader.

generate/validate_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,163 @@ func innerStructFields(typ Type) []StructField {
611611
return nil
612612
}
613613

614+
// miscEffectiveMaxVersion computes the effective max version for a non-top-level
615+
// DSL struct, since these don't have MaxVersion set. The effective max is the
616+
// highest of the flexible version and all field MinVersion values, recursing
617+
// into sub-structs.
618+
func miscEffectiveMaxVersion(s *Struct) int {
619+
max := 0
620+
if s.FromFlexible && s.FlexibleAt > max {
621+
max = s.FlexibleAt
622+
}
623+
miscEffectiveMaxVersionFields(s.Fields, &max)
624+
return max
625+
}
626+
627+
func miscEffectiveMaxVersionFields(fields []StructField, max *int) {
628+
for _, f := range fields {
629+
if f.Tag >= 0 && f.MinVersion == -1 {
630+
continue // tag-only fields don't define a version
631+
}
632+
if f.MinVersion > *max {
633+
*max = f.MinVersion
634+
}
635+
if inner := innerStructFields(f.Type); inner != nil {
636+
miscEffectiveMaxVersionFields(inner, max)
637+
}
638+
}
639+
}
640+
641+
func TestValidateMiscDSLAgainstKafkaJSON(t *testing.T) {
642+
kafkaDir := os.Getenv("KAFKA_DIR")
643+
if kafkaDir == "" {
644+
t.Skip("KAFKA_DIR not set; skipping Kafka JSON validation")
645+
}
646+
647+
initDSL(t)
648+
649+
// Build a map from name → DSL struct for non-top-level types.
650+
dslByName := make(map[string]*Struct)
651+
for i := range newStructs {
652+
s := &newStructs[i]
653+
if !s.TopLevel {
654+
dslByName[s.Name] = s
655+
}
656+
}
657+
658+
// Misc type mappings: Kafka JSON path (relative to KAFKA_DIR) → DSL name.
659+
type miscMapping struct {
660+
jsonPath string
661+
dslName string
662+
}
663+
mappings := []miscMapping{
664+
{"group-coordinator/src/main/resources/common/message/OffsetCommitKey.json", "OffsetCommitKey"},
665+
{"group-coordinator/src/main/resources/common/message/OffsetCommitValue.json", "OffsetCommitValue"},
666+
{"group-coordinator/src/main/resources/common/message/GroupMetadataKey.json", "GroupMetadataKey"},
667+
{"group-coordinator/src/main/resources/common/message/GroupMetadataValue.json", "GroupMetadataValue"},
668+
{"transaction-coordinator/src/main/resources/common/message/TransactionLogKey.json", "TxnMetadataKey"},
669+
{"transaction-coordinator/src/main/resources/common/message/TransactionLogValue.json", "TxnMetadataValue"},
670+
{"clients/src/main/resources/common/message/DefaultPrincipalData.json", "DefaultPrincipalData"},
671+
{"clients/src/main/resources/common/message/EndTxnMarker.json", "EndTxnMarker"},
672+
{"clients/src/main/resources/common/message/LeaderChangeMessage.json", "LeaderChangeMessage"},
673+
}
674+
675+
for _, m := range mappings {
676+
jsonPath := filepath.Join(kafkaDir, m.jsonPath)
677+
data, err := os.ReadFile(jsonPath) //nolint:gosec // path is constructed from test constant + env var, not user input
678+
if err != nil {
679+
t.Errorf("reading %s: %v", m.jsonPath, err)
680+
continue
681+
}
682+
cleaned := stripJSONComments(data)
683+
var msg kafkaMessage
684+
if err := json.Unmarshal(cleaned, &msg); err != nil {
685+
t.Errorf("parsing %s: %v", m.jsonPath, err)
686+
continue
687+
}
688+
689+
dsl, ok := dslByName[m.dslName]
690+
if !ok {
691+
t.Errorf("%s: DSL struct %q not found", m.jsonPath, m.dslName)
692+
continue
693+
}
694+
695+
t.Run(m.dslName, func(t *testing.T) {
696+
validateMiscMessage(t, msg, dsl)
697+
})
698+
}
699+
}
700+
701+
func validateMiscMessage(t *testing.T, msg kafkaMessage, dsl *Struct) {
702+
t.Helper()
703+
704+
validVR := parseVersionRange(msg.ValidVersions)
705+
flexVR := parseVersionRange(msg.FlexibleVersions)
706+
707+
jsonMax := validVR.maxVer()
708+
dslMax := miscEffectiveMaxVersion(dsl)
709+
710+
if jsonMax >= 0 && dslMax > jsonMax {
711+
t.Errorf("max version: DSL %d > JSON %d", dslMax, jsonMax)
712+
} else if jsonMax >= 0 && dslMax < jsonMax {
713+
fields := collectMissingFields(msg.Name, dslMax+1, jsonMax, msg.Fields)
714+
detail := fmt.Sprintf("DSL v%d < JSON v%d", dslMax, jsonMax)
715+
if len(fields) > 0 {
716+
detail += ", new fields: " + strings.Join(fields, ", ")
717+
}
718+
t.Errorf("max version: %s", detail)
719+
}
720+
721+
// Validate flexible version.
722+
if flexVR.none {
723+
if dsl.FromFlexible {
724+
t.Errorf("flexible version: DSL has flexible at %d but JSON has none", dsl.FlexibleAt)
725+
}
726+
} else {
727+
if !dsl.FromFlexible {
728+
t.Errorf("flexible version: JSON has flexible at %d but DSL has none", flexVR.min)
729+
} else if dsl.FlexibleAt != flexVR.min {
730+
t.Errorf("flexible version: DSL %d != JSON %d", dsl.FlexibleAt, flexVR.min)
731+
}
732+
}
733+
734+
// Build commonStructs lookup.
735+
commons := make(map[string]kafkaStruct)
736+
for _, cs := range msg.CommonStructs {
737+
commons[cs.Name] = cs
738+
}
739+
740+
// The DSL's "with version field" adds Version as the first field,
741+
// but coordinator JSON schemas don't include it. Skip it when the
742+
// JSON has no corresponding Version field.
743+
dslFields := dsl.Fields
744+
if dsl.WithVersionField && len(dslFields) > 0 {
745+
jsonHasVersion := false
746+
for _, jf := range msg.Fields {
747+
if strings.EqualFold(jf.Name, "Version") {
748+
jsonHasVersion = true
749+
break
750+
}
751+
}
752+
if !jsonHasVersion && strings.EqualFold(dslFields[0].FieldName, "Version") {
753+
dslFields = dslFields[1:]
754+
}
755+
}
756+
757+
flexibleAt := -1
758+
if dsl.FromFlexible {
759+
flexibleAt = dsl.FlexibleAt
760+
}
761+
762+
maxV := dslMax
763+
if jsonMax >= 0 && jsonMax < maxV {
764+
maxV = jsonMax
765+
}
766+
for v := validVR.min; v <= maxV; v++ {
767+
compareFieldsAtVersion(t, msg.Name, v, flexibleAt, msg.Fields, dslFields, commons)
768+
}
769+
}
770+
614771
// collectMissingFields returns a summary of JSON fields new in versions fromV..toV.
615772
func collectMissingFields(path string, fromV, toV int, jsonFields []kafkaField) []string {
616773
var out []string

pkg/kadm/groups.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ import (
1111
"github.com/twmb/franz-go/pkg/kmsg"
1212
)
1313

14+
var requireStable = func() *string { s := "require_stable"; return &s }()
15+
16+
// RequireStable returns a context that causes [FetchOffsets],
17+
// [FetchOffsetsForTopics], and [FetchManyOffsets] to
18+
// set RequireStable on the underlying OffsetFetch request. When enabled,
19+
// the broker blocks until any pending transactional offset commits are
20+
// resolved (KIP-447, Kafka 2.5+). On older brokers, this field is
21+
// silently ignored.
22+
func RequireStable(ctx context.Context) context.Context {
23+
return context.WithValue(ctx, requireStable, requireStable)
24+
}
25+
1426
// GroupMemberMetadata is the metadata that a client sent in a JoinGroup request.
1527
// This can have one of three types:
1628
//
@@ -885,10 +897,16 @@ func (cl *Client) CommitAllOffsets(ctx context.Context, group string, os Offsets
885897
// fetch, this only returns an auth error if you are not authorized to describe
886898
// the group at all.
887899
//
900+
// Use [RequireStable] to block until pending transactional offset commits are
901+
// resolved.
902+
//
888903
// This method requires talking to Kafka v0.11+.
889904
func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetResponses, error) {
890905
req := kmsg.NewPtrOffsetFetchRequest()
891906
req.Group = group
907+
if ctx.Value(requireStable) != nil {
908+
req.RequireStable = true
909+
}
892910
resp, err := req.RequestWith(ctx, cl.cl)
893911
if err != nil {
894912
return nil, err
@@ -946,6 +964,9 @@ const FetchAllGroupTopics = "|fetch-all-group-topics|"
946964
// By default, this function returns offsets for only the requested topics. You
947965
// can use the special "topic" [FetchAllGroupTopics] to return all committed-to
948966
// topics in addition to all requested topics.
967+
//
968+
// Use [RequireStable] to block until pending transactional offset commits are
969+
// resolved.
949970
func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error) {
950971
os := make(Offsets)
951972

@@ -1097,13 +1118,19 @@ func (rs FetchOffsetsResponses) Error() error {
10971118
// CommitOffsets are important to provide as simple APIs for users that manage
10981119
// group offsets outside of a consumer group. Each individual group may have an
10991120
// auth error.
1121+
//
1122+
// Use [RequireStable] to block until pending transactional offset commits are
1123+
// resolved.
11001124
func (cl *Client) FetchManyOffsets(ctx context.Context, groups ...string) FetchOffsetsResponses {
11011125
fetched := make(FetchOffsetsResponses)
11021126
if len(groups) == 0 {
11031127
return fetched
11041128
}
11051129

11061130
req := kmsg.NewPtrOffsetFetchRequest()
1131+
if ctx.Value(requireStable) != nil {
1132+
req.RequireStable = true
1133+
}
11071134
for _, group := range groups {
11081135
rg := kmsg.NewOffsetFetchRequestGroup()
11091136
rg.Group = group

pkg/kfake/data.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,17 @@ func (pd *partData) trimLeft() {
256256
}
257257
pd.batches = pd.batches[1:]
258258
pd.nbytes -= int64(b0.nbytes)
259+
pd.maxTimestampBatchIdx--
260+
}
261+
// If the max-timestamp batch was trimmed (index went negative) or
262+
// all batches were removed, rebuild the index from remaining batches.
263+
if pd.maxTimestampBatchIdx < 0 {
264+
pd.maxTimestampBatchIdx = -1
265+
for i, b := range pd.batches {
266+
if pd.maxTimestampBatchIdx < 0 || b.MaxTimestamp >= pd.batches[pd.maxTimestampBatchIdx].MaxTimestamp {
267+
pd.maxTimestampBatchIdx = i
268+
}
269+
}
259270
}
260271
// Prune aborted txn entries fully before logStartOffset.
261272
// Entries are sorted by lastOffset, so binary search for the

0 commit comments

Comments
 (0)