Skip to content

Commit ffcae12

Browse files
authored
Merge pull request #1260 from twmb/kfake
kfake: further improvements
2 parents ae75cac + 9a95b70 commit ffcae12

File tree

8 files changed

+1279
-164
lines changed

8 files changed

+1279
-164
lines changed

pkg/kfake/00_produce.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ func (c *Cluster) handleProduce(creq *clientReq) (kmsg.Response, error) {
156156
continue
157157
}
158158
attrs := uint16(b.Attributes)
159+
if attrs&0x0020 != 0 {
160+
donep(rt, rp, kerr.InvalidRecord.Code, "Client cannot send control batches.")
161+
continue
162+
}
159163
if attrs&0x0007 > 4 {
160164
donep(rt, rp, kerr.CorruptMessage.Code, "Invalid compression type.")
161165
continue
@@ -192,6 +196,12 @@ func (c *Cluster) handleProduce(creq *clientReq) (kmsg.Response, error) {
192196
}
193197
pidinf, window := c.pids.get(b.ProducerID, rt.Topic, rp.Partition, implicit)
194198

199+
// Reject non-transactional produce during an
200+
// active transaction.
201+
if pidinf != nil && pidinf.inTx && !txnal {
202+
errCode = kerr.InvalidTxnState.Code
203+
}
204+
195205
if txnal && window == nil {
196206
errCode = kerr.InvalidTxnState.Code
197207
}
@@ -217,20 +227,18 @@ func (c *Cluster) handleProduce(creq *clientReq) (kmsg.Response, error) {
217227
baseOffset = pd.highWatermark
218228
lso = pd.logStartOffset
219229

220-
// For transactional batches, determine txnFirstOffset
221-
var txnFirstOffset int64
222230
if txnal {
231+
// Track per-partition first offset for AbortedTransactions index.
223232
ptr := pidinf.txPartFirstOffsets.mkp(rt.Topic, rp.Partition, func() *int64 {
224233
v := int64(-1)
225234
return &v
226235
})
227236
if *ptr == -1 {
228237
*ptr = baseOffset
229238
}
230-
txnFirstOffset = *ptr
231239
}
232240

233-
batchptr := pd.pushBatch(len(rp.Records), b, txnal, txnFirstOffset)
241+
batchptr := pd.pushBatch(len(rp.Records), b, txnal)
234242
if txnal {
235243
pidinf.txBatches = append(pidinf.txBatches, batchptr)
236244
// Track bytes for readCommitted watcher accounting at commit time
@@ -242,7 +250,7 @@ func (c *Cluster) handleProduce(creq *clientReq) (kmsg.Response, error) {
242250
// Non-idempotent produce, no pids validation needed
243251
baseOffset = pd.highWatermark
244252
lso = pd.logStartOffset
245-
pd.pushBatch(len(rp.Records), b, txnal, 0)
253+
pd.pushBatch(len(rp.Records), b, txnal)
246254
}
247255

248256
if errCode != 0 {

pkg/kfake/01_fetch.go

Lines changed: 114 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
// Fetch sessions (KIP-227):
3131
// * Sessions allow incremental fetches where clients only send changed partitions
3232
// * We track session state per broker and merge with request to get full partition list
33-
// * We always return full results (not incremental diffs) which is compliant behavior
33+
// * Incremental responses omit unchanged partitions (no records, same HWM/logStartOffset, no error)
3434
//
3535
// Version notes:
3636
// * v4: RecordBatch format, IsolationLevel, LastStableOffset, AbortedTransactions
@@ -60,7 +60,7 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er
6060
var newSession bool
6161
if req.Version >= 7 {
6262
var errCode int16
63-
session, newSession, errCode = c.fetchSessions.getOrCreate(creq.cc.b.node, req.SessionID, req.SessionEpoch)
63+
session, newSession, errCode = c.fetchSessions.getOrCreate(creq.cc.b.node, req.SessionID, req.SessionEpoch, int(c.fetchSessionCacheSlots()))
6464
if errCode != 0 {
6565
resp.ErrorCode = errCode
6666
return resp, nil
@@ -303,11 +303,7 @@ full:
303303
continue
304304
}
305305

306-
// Track aborted transactions per producer. The kgo client expects
307-
// AbortedTransactions to be sorted by FirstOffset per producer.
308-
var abortedByProducer map[int64][]int64
309-
310-
var pbytes int
306+
var pbytes, pBatchesAdded int
311307
for _, b := range pd.batches[i:] {
312308
if readCommitted && b.inTx {
313309
break
@@ -319,42 +315,37 @@ full:
319315
break
320316
}
321317
batchesAdded++
322-
323-
// Track aborted transactions in returned batches
324-
if readCommitted && req.Version >= 4 && b.aborted {
325-
if abortedByProducer == nil {
326-
abortedByProducer = make(map[int64][]int64)
327-
}
328-
// Only add if we haven't seen this txnFirstOffset for this producer
329-
offsets := abortedByProducer[b.ProducerID]
330-
found := false
331-
for _, o := range offsets {
332-
if o == b.txnFirstOffset {
333-
found = true
334-
break
335-
}
336-
}
337-
if !found {
338-
abortedByProducer[b.ProducerID] = append(offsets, b.txnFirstOffset)
339-
}
340-
}
341-
318+
pBatchesAdded++
342319
sp.RecordBatches = b.AppendTo(sp.RecordBatches)
343320
}
344321

345-
// Add aborted transactions for read_committed consumers
346-
// Sorted by FirstOffset per producer (as expected by kgo client)
347-
for producerID, offsets := range abortedByProducer {
348-
sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] })
349-
for _, firstOffset := range offsets {
322+
// For read_committed, look up aborted transactions that overlap
323+
// the returned data range using the partition's aborted txn index.
324+
if readCommitted && req.Version >= 4 && pBatchesAdded > 0 {
325+
lastBatch := pd.batches[i+pBatchesAdded-1]
326+
upperBound := lastBatch.FirstOffset + int64(lastBatch.LastOffsetDelta) + 1
327+
// Binary search for the first entry whose abort marker
328+
// is at or after the fetch offset.
329+
j := sort.Search(len(pd.abortedTxns), func(k int) bool {
330+
return pd.abortedTxns[k].lastOffset >= fp.fetchOffset
331+
})
332+
for ; j < len(pd.abortedTxns); j++ {
333+
e := pd.abortedTxns[j]
334+
if e.firstOffset >= upperBound {
335+
continue
336+
}
350337
at := kmsg.NewFetchResponseTopicPartitionAbortedTransaction()
351-
at.ProducerID = producerID
352-
at.FirstOffset = firstOffset
338+
at.ProducerID = e.producerID
339+
at.FirstOffset = e.firstOffset
353340
sp.AbortedTransactions = append(sp.AbortedTransactions, at)
354341
}
355342
}
356343
}
357344

345+
// Record partition state in the session. For incremental fetches
346+
// (not new), also filter out unchanged partitions.
347+
session.updateAndFilterResponse(resp, !newSession)
348+
358349
// Bump session epoch after successful fetch, but not for newly created
359350
// sessions. The client will send epoch=1 for its first incremental fetch
360351
// after receiving the new session ID.
@@ -435,15 +426,24 @@ type fetchSession struct {
435426
id int32
436427
epoch int32
437428
partitions map[tp]fetchSessionPartition
429+
lastUsed time.Time
438430
}
439431

440432
// fetchSessionPartition tracks per-partition state within a session.
441433
type fetchSessionPartition struct {
442434
fetchOffset int64
443435
maxBytes int32
444436
currentEpoch int32
437+
438+
// Cached from last response sent. Used to detect changes for
439+
// incremental filtering. Initialized to -1 to force inclusion
440+
// in the first response after the partition is added.
441+
lastHighWatermark int64
442+
lastLogStartOffset int64
445443
}
446444

445+
const defMaxFetchSessionCacheSlots = 1000
446+
447447
func (fs *fetchSessions) init(brokerNode int32) {
448448
if fs.sessions == nil {
449449
fs.sessions = make(map[int32]map[int32]*fetchSession)
@@ -457,7 +457,7 @@ func (fs *fetchSessions) init(brokerNode int32) {
457457
// getOrCreate returns an existing session or creates a new one based on the
458458
// request's SessionID and SessionEpoch. Returns (nil, false, 0) for legacy
459459
// sessionless fetches.
460-
func (fs *fetchSessions) getOrCreate(brokerNode, sessionID, sessionEpoch int32) (*fetchSession, bool, int16) {
460+
func (fs *fetchSessions) getOrCreate(brokerNode, sessionID, sessionEpoch int32, maxSlots int) (*fetchSession, bool, int16) {
461461
fs.init(brokerNode)
462462

463463
// SessionEpoch=-1: Full fetch, no session (legacy mode).
@@ -477,11 +477,26 @@ func (fs *fetchSessions) getOrCreate(brokerNode, sessionID, sessionEpoch int32)
477477
if sessionID > 0 {
478478
delete(fs.sessions[brokerNode], sessionID)
479479
}
480+
// Evict the oldest session if we're at the limit.
481+
brokerSessions := fs.sessions[brokerNode]
482+
if len(brokerSessions) >= maxSlots {
483+
var oldestID int32
484+
var oldestTime time.Time
485+
for id, s := range brokerSessions {
486+
if oldestTime.IsZero() || s.lastUsed.Before(oldestTime) {
487+
oldestID = id
488+
oldestTime = s.lastUsed
489+
}
490+
}
491+
delete(brokerSessions, oldestID)
492+
}
493+
now := time.Now()
480494
id := fs.nextID.Add(1) - 1
481495
session := &fetchSession{
482496
id: id,
483497
epoch: 1,
484498
partitions: make(map[tp]fetchSessionPartition),
499+
lastUsed: now,
485500
}
486501
fs.sessions[brokerNode][id] = session
487502
return session, true, 0
@@ -495,14 +510,29 @@ func (fs *fetchSessions) getOrCreate(brokerNode, sessionID, sessionEpoch int32)
495510
if sessionEpoch != session.epoch {
496511
return nil, false, kerr.InvalidFetchSessionEpoch.Code
497512
}
513+
session.lastUsed = time.Now()
498514
return session, false, 0
499515
}
500516

501517
func (s *fetchSession) updatePartition(topic string, partition int32, fetchOffset int64, maxBytes int32, currentEpoch int32) {
502518
if s == nil {
503519
return
504520
}
505-
s.partitions[tp{topic, partition}] = fetchSessionPartition{fetchOffset, maxBytes, currentEpoch}
521+
key := tp{topic, partition}
522+
if existing, ok := s.partitions[key]; ok {
523+
existing.fetchOffset = fetchOffset
524+
existing.maxBytes = maxBytes
525+
existing.currentEpoch = currentEpoch
526+
s.partitions[key] = existing
527+
} else {
528+
s.partitions[key] = fetchSessionPartition{
529+
fetchOffset: fetchOffset,
530+
maxBytes: maxBytes,
531+
currentEpoch: currentEpoch,
532+
lastHighWatermark: -1,
533+
lastLogStartOffset: -1,
534+
}
535+
}
506536
}
507537

508538
func (s *fetchSession) forgetPartition(topic string, partition int32) {
@@ -521,3 +551,51 @@ func (s *fetchSession) bumpEpoch() {
521551
s.epoch = 1
522552
}
523553
}
554+
555+
// updateAndFilterResponse records the HWM and logStartOffset from each
556+
// partition as baseline state. When filter is true, unchanged partitions
557+
// (no records, same HWM/logStartOffset, no error) are removed from the
558+
// response for incremental fetch.
559+
func (s *fetchSession) updateAndFilterResponse(resp *kmsg.FetchResponse, filter bool) {
560+
if s == nil {
561+
return
562+
}
563+
n := 0
564+
for i := range resp.Topics {
565+
rt := &resp.Topics[i]
566+
np := 0
567+
for j := range rt.Partitions {
568+
rp := &rt.Partitions[j]
569+
key := tp{rt.Topic, rp.Partition}
570+
sp, ok := s.partitions[key]
571+
572+
include := !filter || !ok ||
573+
len(rp.RecordBatches) > 0 ||
574+
rp.ErrorCode != 0 ||
575+
rp.HighWatermark != sp.lastHighWatermark ||
576+
rp.LogStartOffset != sp.lastLogStartOffset
577+
578+
if ok {
579+
if rp.ErrorCode != 0 {
580+
sp.lastHighWatermark = -1
581+
sp.lastLogStartOffset = -1
582+
} else {
583+
sp.lastHighWatermark = rp.HighWatermark
584+
sp.lastLogStartOffset = rp.LogStartOffset
585+
}
586+
s.partitions[key] = sp
587+
}
588+
589+
if include {
590+
rt.Partitions[np] = *rp
591+
np++
592+
}
593+
}
594+
rt.Partitions = rt.Partitions[:np]
595+
if len(rt.Partitions) > 0 {
596+
resp.Topics[n] = *rt
597+
n++
598+
}
599+
}
600+
resp.Topics = resp.Topics[:n]
601+
}

0 commit comments

Comments
 (0)