Skip to content

Commit 6999fb8

Browse files
authored
Merge pull request #1236 from twmb/1195
kgo: unlinger partitions in ProduceSync to avoid linger delay
2 parents dba723b + 764eb29 commit 6999fb8

File tree

3 files changed

+132
-11
lines changed

3 files changed

+132
-11
lines changed

pkg/kfake/behavior_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,4 +1785,56 @@ func TestTxnDescribeTransactions(t *testing.T) {
17851785
}
17861786
}
17871787

1788+
// TestProduceSyncUnlinger verifies that ProduceSync does not wait for the full
1789+
// linger duration before completing. With a 10s linger, ProduceSync should
1790+
// still return quickly because it unlingers partitions after enqueuing records.
1791+
func TestProduceSyncUnlinger(t *testing.T) {
1792+
t.Parallel()
1793+
topic := "produce-sync-unlinger"
1794+
c := newCluster(t, kfake.NumBrokers(1), kfake.SeedTopics(1, topic))
1795+
1796+
producer := newPlainClient(t, c,
1797+
kgo.DefaultProduceTopic(topic),
1798+
kgo.ProducerLinger(10*time.Second),
1799+
)
1800+
1801+
// Produce one record and flush to load topic metadata and
1802+
// establish connections. Without this, the first ProduceSync
1803+
// would buffer to the unknown-topic path and not benefit from
1804+
// the unlinger optimization.
1805+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
1806+
defer cancel()
1807+
producer.Produce(ctx, kgo.StringRecord("warmup"), nil)
1808+
if err := producer.Flush(ctx); err != nil {
1809+
t.Fatalf("warmup flush failed: %v", err)
1810+
}
1811+
1812+
start := time.Now()
1813+
results := producer.ProduceSync(ctx, kgo.StringRecord("v1"), kgo.StringRecord("v2"), kgo.StringRecord("v3"))
1814+
elapsed := time.Since(start)
1815+
1816+
if err := results.FirstErr(); err != nil {
1817+
t.Fatalf("ProduceSync failed: %v", err)
1818+
}
1819+
if len(results) != 3 {
1820+
t.Fatalf("expected 3 results, got %d", len(results))
1821+
}
1822+
1823+
// With the unlinger fix, ProduceSync should complete well within 5s
1824+
// despite the 10s linger. Without the fix, it would block for 10s.
1825+
if elapsed > 5*time.Second {
1826+
t.Fatalf("ProduceSync took %v, expected well under 5s with unlinger", elapsed)
1827+
}
1828+
1829+
// Verify all records are consumable (warmup + 3 = 4 records total).
1830+
consumer := newPlainClient(t, c,
1831+
kgo.ConsumeTopics(topic),
1832+
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
1833+
)
1834+
records := consumeN(t, consumer, 4, 5*time.Second)
1835+
if len(records) != 4 {
1836+
t.Fatalf("expected 4 consumed records, got %d", len(records))
1837+
}
1838+
}
1839+
17881840
func stringp(s string) *string { return &s }

pkg/kgo/producer.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,13 @@ func (rs ProduceResults) First() (*Record, error) {
311311
// ProduceSync is a synchronous produce. See the [Produce] documentation for an
312312
// in depth description of how producing works.
313313
//
314-
// This function simply produces all records in one range loop and waits for
315-
// them all to be produced before returning.
314+
// This function produces all records and waits for them all to be produced
315+
// before returning. If the client has a non-zero linger configured, after all
316+
// records are enqueued, this function stops lingering and triggers an immediate
317+
// drain on all partitions that records were produced to. This avoids
318+
// unnecessarily waiting for linger timers when the caller is synchronously
319+
// waiting for results. Partitions that are lingering due to concurrent
320+
// [Produce] calls are not affected.
316321
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
317322
var (
318323
wg sync.WaitGroup
@@ -324,9 +329,76 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
324329
)
325330

326331
wg.Add(len(rs))
332+
333+
// After each Produce call for a known topic, the record's Partition
334+
// field is already set (see bufferRecord), allowing us to collect
335+
// which recBufs to unlinger without a second pass over the records.
336+
// We use a [16] base array to avoid heap allocation in the common
337+
// case, and linear dedup since the number of unique partitions is
338+
// typically small.
339+
//
340+
// We load partition data BEFORE calling Produce to avoid a data
341+
// race on r.Partition. If partitions exist before Produce,
342+
// partitionsForTopicProduce will also see them (partition counts
343+
// are monotonically increasing) and will partition the record
344+
// synchronously in bufferRecord, making r.Partition safe to read
345+
// after Produce returns. If pd is nil, we never read r.Partition,
346+
// avoiding a race with the metadata goroutine which partitions
347+
// unknownTopics records asynchronously.
348+
var (
349+
buf [16]*recBuf
350+
unlinger = buf[:0]
351+
topics topicsPartitionsData
352+
353+
lastTopic string
354+
lastPD *topicPartitionsData
355+
)
356+
if cl.cfg.linger > 0 {
357+
topics = cl.producer.topics.load()
358+
}
359+
327360
for _, r := range rs {
361+
var pd *topicPartitionsData
362+
if topics != nil {
363+
if r.Topic == "" || cl.cfg.defaultProduceTopicAlways {
364+
r.Topic = cl.cfg.defaultProduceTopic
365+
}
366+
if r.Topic == lastTopic {
367+
pd = lastPD
368+
} else if parts, ok := topics[r.Topic]; ok {
369+
if v := parts.load(); len(v.partitions) > 0 {
370+
pd = v
371+
}
372+
lastTopic = r.Topic
373+
lastPD = pd
374+
}
375+
}
376+
328377
cl.Produce(ctx, r, promise)
378+
379+
if pd == nil {
380+
continue
381+
}
382+
if int(r.Partition) >= len(pd.partitions) {
383+
continue
384+
}
385+
rb := pd.partitions[r.Partition].records
386+
var seen bool
387+
for _, have := range unlinger {
388+
if have == rb {
389+
seen = true
390+
break
391+
}
392+
}
393+
if !seen {
394+
unlinger = append(unlinger, rb)
395+
}
329396
}
397+
398+
for _, rb := range unlinger {
399+
rb.unlingerAndManuallyDrain()
400+
}
401+
330402
wg.Wait()
331403

332404
return results
@@ -594,7 +666,6 @@ type batchPromise struct {
594666
epoch int16
595667
attrs RecordAttrs
596668
beforeBuf bool
597-
partition int32
598669
recs []promisedRec
599670
err error
600671
}
@@ -632,7 +703,6 @@ start:
632703
} else {
633704
pr.Offset = b.baseOffset + int64(i)
634705
}
635-
pr.Partition = b.partition
636706
pr.ProducerID = b.pid
637707
pr.ProducerEpoch = b.epoch
638708
pr.Attrs = b.attrs

pkg/kgo/sink.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque
688688
if debug {
689689
fmt.Fprintf(b, "%d{0=>%d}, ", partition, len(batch.records))
690690
}
691-
s.cl.finishBatch(batch.recBatch, req.producerID, req.producerEpoch, partition, 0, nil)
691+
s.cl.finishBatch(batch.recBatch, req.producerID, req.producerEpoch, 0, nil)
692692
} else if debug {
693693
fmt.Fprintf(b, "%d{skipped}, ", partition)
694694
}
@@ -979,7 +979,7 @@ func (s *sink) handleReqRespBatch(
979979
)
980980
s.cl.failProducerID(producerID, producerEpoch, err)
981981

982-
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err)
982+
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.BaseOffset, err)
983983
if debug {
984984
fmt.Fprintf(b, "fatal@%d,%d(%s)}, ", rp.BaseOffset, nrec, err)
985985
}
@@ -1043,7 +1043,7 @@ func (s *sink) handleReqRespBatch(
10431043
batch.owner.addedToTxn.Swap(true)
10441044
}
10451045
}
1046-
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err)
1046+
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.BaseOffset, err)
10471047
didProduce = err == nil
10481048
if debug {
10491049
if err != nil {
@@ -1061,7 +1061,7 @@ func (s *sink) handleReqRespBatch(
10611061
//
10621062
// This is safe even if the owning recBuf migrated sinks, since we are
10631063
// finishing based off the status of an inflight req from the original sink.
1064-
func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch int16, partition int32, baseOffset int64, err error) {
1064+
func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch int16, baseOffset int64, err error) {
10651065
recBuf := batch.owner
10661066

10671067
if err != nil {
@@ -1095,9 +1095,8 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
10951095
// corresponding to our own RecordAttr's bit 8 being no
10961096
// timestamp type. Thus, we can directly convert the batch
10971097
// attrs to our own RecordAttrs.
1098-
attrs: RecordAttrs{uint8(attrs)},
1099-
partition: partition,
1100-
recs: records,
1098+
attrs: RecordAttrs{uint8(attrs)},
1099+
recs: records,
11011100
})
11021101
}
11031102

0 commit comments

Comments
 (0)