Skip to content

Commit d96d7b0

Browse files
Await acknowledgements in async benchmarks (#1545)
By waiting for the acknowledgements before closing the async benchmark, we expect these benchmarks to become more stable. Fixes #1542.
1 parent 01192c8 commit d96d7b0

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer
3636
def produceSingleRecordSeqAsync(): Any = runZIO {
3737
// Produce 100 records
3838
for {
39-
producer <- ZIO.service[Producer]
40-
_ <- producer.produceAsync(topic1, "key", "value", Serde.string, Serde.string).schedule(Schedule.recurs(100))
39+
producer <- ZIO.service[Producer]
40+
continuations <- ZIO.replicateZIO(100)(producer.produceAsync(topic1, "key", "value", Serde.string, Serde.string))
41+
_ <- ZIO.foreachDiscard(continuations)(identity)
4142
} yield ()
4243
}
4344

@@ -56,8 +57,9 @@ class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer
5657
def produceChunkSeqAsync(): Any = runZIO {
5758
// Produce 30 chunks sequentially
5859
for {
59-
producer <- ZIO.service[Producer]
60-
_ <- producer.produceChunkAsync(records, Serde.string, Serde.string).schedule(Schedule.recurs(30))
60+
producer <- ZIO.service[Producer]
61+
continuations <- ZIO.replicateZIO(30)(producer.produceChunkAsync(records, Serde.string, Serde.string))
62+
_ <- ZIO.foreachDiscard(continuations)(identity)
6163
} yield ()
6264
}
6365

0 commit comments

Comments
 (0)