Skip to content

Commit 89ac807

Browse files
KafkaConsumer.commitAsync (#126)
* `KafkaConsumer.commitAsync` Motivation: Having `KafkaConsumer.commitSync` be `async` is not always convenient as it suspends the `KafkaConsumer.messages` read loop and can therefore lower throughput. This PR introduces a new method `KafkaConsumer.commitAsync` that allows users who don't care about the result of the `commit` to commit in a "fire-and-forget" manner. Modifications: * new method `KafkaConsumer.commitAsync` * rename `KafkaConsumer.StateMachine.commitSync` to `KafkaConsumer.StateMachine.commit` to serve both `commitSync` and `commitAsync` * add new test for `KafkaConsumer.commitAsync` * Review Franz Modifications: * `KafkaConsumer`: * rename `commitSync(_:)` -> `commit(_:)` * rename `commitAsync(_:)` -> `scheduleCommit(_:)` * `RDKafkaClient`: * rename `commitSync(_:)` -> `commit(_:)` * rename `commitAsync(_:)` -> `scheduleCommit(_:)`
1 parent ee7b7c6 commit 89ac807

File tree

3 files changed

+136
-16
lines changed

3 files changed

+136
-16
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ public final class KafkaConsumer: Sendable, Service {
359359
}
360360

361361
/// Mark all messages up to the passed message in the topic as read.
362+
/// Schedules a commit and returns immediately.
363+
/// Any errors encountered after scheduling the commit will be discarded.
362364
///
363365
/// This method is only used for manual offset management.
364366
///
@@ -367,17 +369,46 @@ public final class KafkaConsumer: Sendable, Service {
367369
/// - Parameters:
368370
/// - message: Last received message that shall be marked as read.
369371
/// - Throws: A ``KafkaError`` if committing failed.
372+
public func scheduleCommit(_ message: KafkaConsumerMessage) throws {
373+
let action = self.stateMachine.withLockedValue { $0.commit() }
374+
switch action {
375+
case .throwClosedError:
376+
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
377+
case .commit(let client):
378+
guard self.configuration.isAutoCommitEnabled == false else {
379+
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
380+
}
381+
382+
try client.scheduleCommit(message)
383+
}
384+
}
385+
386+
@available(*, deprecated, renamed: "commit")
370387
public func commitSync(_ message: KafkaConsumerMessage) async throws {
371-
let action = self.stateMachine.withLockedValue { $0.commitSync() }
388+
try await self.commit(message)
389+
}
390+
391+
/// Mark all messages up to the passed message in the topic as read.
392+
/// Awaits until the commit succeeds or an error is encountered.
393+
///
394+
/// This method is only used for manual offset management.
395+
///
396+
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
397+
///
398+
/// - Parameters:
399+
/// - message: Last received message that shall be marked as read.
400+
/// - Throws: A ``KafkaError`` if committing failed.
401+
public func commit(_ message: KafkaConsumerMessage) async throws {
402+
let action = self.stateMachine.withLockedValue { $0.commit() }
372403
switch action {
373404
case .throwClosedError:
374405
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
375-
case .commitSync(let client):
406+
case .commit(let client):
376407
guard self.configuration.isAutoCommitEnabled == false else {
377408
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
378409
}
379410

380-
try await client.commitSync(message)
411+
try await client.commit(message)
381412
}
382413
}
383414

@@ -596,23 +627,23 @@ extension KafkaConsumer {
596627
}
597628
}
598629

599-
/// Action to be taken when wanting to do a synchronous commit.
600-
enum CommitSyncAction {
601-
/// Do a synchronous commit.
630+
/// Action to be taken when wanting to do a commit.
631+
enum CommitAction {
632+
/// Do a commit.
602633
///
603634
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
604-
case commitSync(
635+
case commit(
605636
client: RDKafkaClient
606637
)
607638
/// Throw an error. The ``KafkaConsumer`` is closed.
608639
case throwClosedError
609640
}
610641

611-
/// Get action to be taken when wanting to do a synchronous commit.
642+
/// Get action to be taken when wanting to do a commit.
612643
/// - Returns: The action to be taken.
613644
///
614645
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
615-
func commitSync() -> CommitSyncAction {
646+
func commit() -> CommitAction {
616647
switch self.state {
617648
case .uninitialized:
618649
fatalError("\(#function) invoked while still in state \(self.state)")
@@ -621,7 +652,7 @@ extension KafkaConsumer {
621652
case .consumptionStopped:
622653
fatalError("Cannot commit when consumption has been stopped")
623654
case .consuming(let client, _):
624-
return .commitSync(client: client)
655+
return .commit(client: client)
625656
case .finishing, .finished:
626657
return .throwClosedError
627658
}

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -532,15 +532,46 @@ final class RDKafkaClient: Sendable {
532532
}
533533
}
534534

535-
/// Non-blocking **awaitable** commit of a the `message`'s offset to Kafka.
535+
/// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
536+
/// Schedules a commit and returns immediately.
537+
/// Any errors encountered after scheduling the commit will be discarded.
536538
///
537539
/// - Parameter message: Last received message that shall be marked as read.
538-
func commitSync(_ message: KafkaConsumerMessage) async throws {
540+
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
541+
func scheduleCommit(_ message: KafkaConsumerMessage) throws {
542+
// The offset committed is always the offset of the next requested message.
543+
// Thus, we increase the offset of the current message by one before committing it.
544+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
545+
let changesList = RDKafkaTopicPartitionList()
546+
changesList.setOffset(
547+
topic: message.topic,
548+
partition: message.partition,
549+
offset: Int64(message.offset.rawValue + 1)
550+
)
551+
552+
let error = changesList.withListPointer { listPointer in
553+
return rd_kafka_commit(
554+
self.kafkaHandle,
555+
listPointer,
556+
1 // async = true
557+
)
558+
}
559+
560+
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
561+
throw KafkaError.rdKafkaError(wrapping: error)
562+
}
563+
}
564+
565+
/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
566+
///
567+
/// - Parameter message: Last received message that shall be marked as read.
568+
/// - Throws: A ``KafkaError`` if the commit failed.
569+
func commit(_ message: KafkaConsumerMessage) async throws {
539570
// Declare captured closure outside of withCheckedContinuation.
540571
// We do that because do an unretained pass of the captured closure to
541572
// librdkafka which means we have to keep a reference to the closure
542573
// ourselves to make sure it does not get deallocated before
543-
// commitSync returns.
574+
// commit returns.
544575
var capturedClosure: CapturedCommitCallback!
545576
try await withCheckedThrowingContinuation { continuation in
546577
capturedClosure = CapturedCommitCallback { result in

Tests/IntegrationTests/KafkaTests.swift

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ final class KafkaTests: XCTestCase {
214214
}
215215
}
216216

217-
func testProduceAndConsumeWithCommitSync() async throws {
217+
func testProduceAndConsumeWithScheduleCommit() async throws {
218218
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
219219
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
220220

@@ -254,7 +254,65 @@ final class KafkaTests: XCTestCase {
254254
var consumedMessages = [KafkaConsumerMessage]()
255255
for try await message in consumer.messages {
256256
consumedMessages.append(message)
257-
try await consumer.commitSync(message)
257+
try consumer.scheduleCommit(message)
258+
259+
if consumedMessages.count >= testMessages.count {
260+
break
261+
}
262+
}
263+
264+
XCTAssertEqual(testMessages.count, consumedMessages.count)
265+
}
266+
267+
// Wait for Producer Task and Consumer Task to complete
268+
try await group.next()
269+
try await group.next()
270+
// Shutdown the serviceGroup
271+
await serviceGroup.triggerGracefulShutdown()
272+
}
273+
}
274+
275+
func testProduceAndConsumeWithCommit() async throws {
276+
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
277+
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
278+
279+
var consumerConfig = KafkaConsumerConfiguration(
280+
consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
281+
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
282+
)
283+
consumerConfig.isAutoCommitEnabled = false
284+
consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning
285+
consumerConfig.broker.addressFamily = .v4
286+
287+
let consumer = try KafkaConsumer(
288+
configuration: consumerConfig,
289+
logger: .kafkaTest
290+
)
291+
292+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest)
293+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
294+
295+
try await withThrowingTaskGroup(of: Void.self) { group in
296+
// Consumer Run Task
297+
group.addTask {
298+
try await serviceGroup.run()
299+
}
300+
301+
// Producer Task
302+
group.addTask {
303+
try await Self.sendAndAcknowledgeMessages(
304+
producer: producer,
305+
events: events,
306+
messages: testMessages
307+
)
308+
}
309+
310+
// Consumer Task
311+
group.addTask {
312+
var consumedMessages = [KafkaConsumerMessage]()
313+
for try await message in consumer.messages {
314+
consumedMessages.append(message)
315+
try await consumer.commit(message)
258316

259317
if consumedMessages.count >= testMessages.count {
260318
break
@@ -323,7 +381,7 @@ final class KafkaTests: XCTestCase {
323381
continue
324382
}
325383
consumedMessages.append(message)
326-
try await consumer.commitSync(message)
384+
try await consumer.commit(message)
327385

328386
if consumedMessages.count >= testMessages.count {
329387
break

0 commit comments

Comments
 (0)