Skip to content

Commit f69a0e7

Browse files
KafkaConsumer: drop messages on sequence termination (#85)
* `KafkaConsumer`: drop msgs on sequence termination Motivation: We want to avoid running out of memory when the user stops reading the `KafkaConsumer.messages` `AsyncSequence`. Therefore we drop any incoming messages that we receive after the `AsyncSequence` was terminated. Modifications: * `KafkaConsumer` * enter idle polling state when `KafkaConsumer.messages` `AsyncSequence` was terminated to ensure that the `func run()` async does not return early and result in an error for its `ServiceGroup` * `KafkaConsumerConfiguration`: * proxy `enable.auto.commit` and make it `false` by default Reason: our library should be in charge of deciding when a message offset is to be commited because only we know if our library user received the message through the `KafkaConsumer.messages` `AsyncSequence` * `KafkaSharedConfiguration.partition`: document default value of parameter `offset` * add new test `SwiftKafkaTests.testCommittedOffsetsAreCorrect()` * KafkaConsumerCloseOnTerminate: make 'checked' `Sendable` * Review Franz Motivation: Instead of storing a messages offset after every yield to the `messages` `AsyncSequence`, we want to do that after every invocation to the `message` `AsyncSequenceIterator`'s `next()` method to ensure that the user has actually received the message. Yielding only enqueued a message but did not ensure that the user actually consumed it through the `AsyncSequence`. Modifications: * `KafkaConsumerConfig` * revert to plain `librdkafka` `enable.auto.commit` implementation * `KafkaConsumer` * use `rd_kafka_offsets_store()` instead of `rd_kafka_commit` to store a message's offset after it has been consumed through the `messages` `AsyncSequence` -> more lightweight than `rd_kafka_commit` + we can still leverage `librdkafka`'s auto-commit logic * commit message offsets on `ConsumerMessagesAsyncIterator.next()` instead of after yielding (explaination in Motivation section) * Review Franz & George Modifications: * make `KafkaConsumerMessages` `AsyncSequence` throwing * remove associated `Producer.Source` from `KafkaConsumer.StateMachine.State.consumptionStopped` * rename `KafkaConsumer.StateMachine.stopConsuming()` -> `messageSequenceTerminated()` * Review Franz Modifications: * `KafkaConsumerMessages` `AsyncSequnce`: deallocate `wrappedIterator` when sequence terminates (abruptly) * Remove unused try
1 parent fe3c9fc commit f69a0e7

File tree

7 files changed

+322
-38
lines changed

7 files changed

+322
-38
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
9999

100100
// Task receiving messages
101101
group.addTask {
102-
for await message in consumer.messages {
102+
for try await message in consumer.messages {
103103
// Do something with message
104104
}
105105
}
@@ -135,7 +135,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
135135

136136
// Task receiving messages
137137
group.addTask {
138-
for await message in consumer.messages {
138+
for try await message in consumer.messages {
139139
// Do something with message
140140
}
141141
}
@@ -172,7 +172,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
172172

173173
// Task receiving messages
174174
group.addTask {
175-
for await message in consumer.messages {
175+
for try await message in consumer.messages {
176176
// Do something with message
177177
// ...
178178
try await consumer.commitSync(message)

Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ extension KafkaSharedConfiguration {
498498
/// - Parameter topic: The name of the Kafka topic.
499499
/// - Parameter partition: The partition of the topic to consume from.
500500
/// - Parameter offset: The offset to start consuming from.
501+
/// Defaults to the end of the Kafka partition queue (meaning wait for next produced message).
501502
public static func partition(
502503
topic: String,
503504
partition: KafkaPartition,

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,33 @@ final class KafkaClient: Sendable {
277277
}
278278
}
279279

280-
/// Non-blocking commit of a the `message`'s offset to Kafka.
280+
/// Store `message`'s offset for next auto-commit.
281+
///
282+
/// - Important: `enable.auto.offset.store` must be set to `false` when using this API.
283+
func storeMessageOffset(_ message: KafkaConsumerMessage) throws {
284+
// The offset committed is always the offset of the next requested message.
285+
// Thus, we increase the offset of the current message by one before committing it.
286+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
287+
let changesList = RDKafkaTopicPartitionList()
288+
changesList.setOffset(
289+
topic: message.topic,
290+
partition: message.partition,
291+
offset: Int64(message.offset + 1)
292+
)
293+
294+
let error = changesList.withListPointer { listPointer in
295+
rd_kafka_offsets_store(
296+
self.kafkaHandle,
297+
listPointer
298+
)
299+
}
300+
301+
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
302+
throw KafkaError.rdKafkaError(wrapping: error)
303+
}
304+
}
305+
306+
/// Non-blocking **awaitable** commit of a the `message`'s offset to Kafka.
281307
///
282308
/// - Parameter message: Last received message that shall be marked as read.
283309
func commitSync(_ message: KafkaConsumerMessage) async throws {

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 135 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,86 @@ import NIOConcurrencyHelpers
1818
import NIOCore
1919
import ServiceLifecycle
2020

21-
// MARK: - NoDelegate
21+
// MARK: - KafkaConsumerCloseOnTerminate
2222

23-
// `NIOAsyncSequenceProducerDelegate` that does nothing.
24-
internal struct NoDelegate: NIOAsyncSequenceProducerDelegate {
25-
func produceMore() {}
26-
func didTerminate() {}
23+
/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
24+
/// `didTerminate()` is invoked.
25+
internal struct KafkaConsumerCloseOnTerminate: Sendable {
26+
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
27+
}
28+
29+
extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
30+
func produceMore() {
31+
return // No back pressure
32+
}
33+
34+
func didTerminate() {
35+
self.stateMachine.withLockedValue { $0.messageSequenceTerminated() }
36+
}
2737
}
2838

2939
// MARK: - KafkaConsumerMessages
3040

3141
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
3242
public struct KafkaConsumerMessages: Sendable, AsyncSequence {
43+
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
44+
3345
public typealias Element = KafkaConsumerMessage
3446
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
35-
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, NoDelegate>
47+
typealias WrappedSequence = NIOThrowingAsyncSequenceProducer<
48+
Element,
49+
Error,
50+
BackPressureStrategy,
51+
KafkaConsumerCloseOnTerminate
52+
>
3653
let wrappedSequence: WrappedSequence
3754

3855
/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
3956
public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol {
40-
var wrappedIterator: WrappedSequence.AsyncIterator
57+
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
58+
var wrappedIterator: WrappedSequence.AsyncIterator?
4159

42-
public mutating func next() async -> Element? {
43-
await self.wrappedIterator.next()
60+
public mutating func next() async throws -> Element? {
61+
guard let element = try await self.wrappedIterator?.next() else {
62+
self.deallocateIterator()
63+
return nil
64+
}
65+
66+
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
67+
switch action {
68+
case .storeOffset(let client):
69+
do {
70+
try client.storeMessageOffset(element)
71+
} catch {
72+
self.deallocateIterator()
73+
throw error
74+
}
75+
}
76+
return element
77+
}
78+
79+
private mutating func deallocateIterator() {
80+
self.wrappedIterator = nil
4481
}
4582
}
4683

4784
public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator {
48-
return ConsumerMessagesAsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
85+
return ConsumerMessagesAsyncIterator(
86+
stateMachine: self.stateMachine,
87+
wrappedIterator: self.wrappedSequence.makeAsyncIterator()
88+
)
4989
}
5090
}
5191

5292
// MARK: - KafkaConsumer
5393

5494
/// Receive messages from the Kafka cluster.
5595
public final class KafkaConsumer: Sendable, Service {
56-
typealias Producer = NIOAsyncSequenceProducer<
96+
typealias Producer = NIOThrowingAsyncSequenceProducer<
5797
KafkaConsumerMessage,
98+
Error,
5899
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
59-
NoDelegate
100+
KafkaConsumerCloseOnTerminate
60101
>
61102
/// The configuration object of the consumer client.
62103
private let config: KafkaConsumerConfiguration
@@ -90,13 +131,14 @@ public final class KafkaConsumer: Sendable, Service {
90131

91132
self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger))
92133

93-
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
134+
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
94135
elementType: KafkaConsumerMessage.self,
95136
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
96-
delegate: NoDelegate()
137+
delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine)
97138
)
98139

99140
self.messages = KafkaConsumerMessages(
141+
stateMachine: self.stateMachine,
100142
wrappedSequence: sourceAndSequence.sequence
101143
)
102144

@@ -141,7 +183,8 @@ public final class KafkaConsumer: Sendable, Service {
141183
/// Assign the``KafkaConsumer`` to a specific `partition` of a `topic`.
142184
/// - Parameter topic: Name of the topic that this ``KafkaConsumer`` will read from.
143185
/// - Parameter partition: Partition that this ``KafkaConsumer`` will read from.
144-
/// - Parameter offset: The topic offset where reading begins. Defaults to the offset of the last read message.
186+
/// - Parameter offset: The offset to start consuming from.
187+
/// Defaults to the end of the Kafka partition queue (meaning wait for next produced message).
145188
/// - Throws: A ``KafkaError`` if the consumer could not be assigned to the topic + partition pair.
146189
private func assign(
147190
topic: String,
@@ -190,10 +233,11 @@ public final class KafkaConsumer: Sendable, Service {
190233
}
191234
}
192235
try await Task.sleep(for: self.config.pollInterval)
193-
case .pollUntilClosed(let client):
194-
// Ignore poll result, we are closing down and just polling to commit
195-
// outstanding consumer state
196-
let _ = client.eventPoll()
236+
case .pollWithoutYield(let client):
237+
// Ignore poll result.
238+
// We are just polling to serve any remaining events queued inside of `librdkafka`.
239+
// All remaining queued consumer messages will get dropped and not be committed (marked as read).
240+
_ = client.eventPoll()
197241
try await Task.sleep(for: self.config.pollInterval)
198242
case .terminatePollLoop:
199243
return
@@ -228,24 +272,26 @@ public final class KafkaConsumer: Sendable, Service {
228272
private func triggerGracefulShutdown() {
229273
let action = self.stateMachine.withLockedValue { $0.finish() }
230274
switch action {
275+
case .triggerGracefulShutdown(let client):
276+
self._triggerGracefulShutdown(
277+
client: client,
278+
logger: self.logger
279+
)
231280
case .triggerGracefulShutdownAndFinishSource(let client, let source):
232-
self._triggerGracefulShutdownAndFinishSource(
281+
source.finish()
282+
self._triggerGracefulShutdown(
233283
client: client,
234-
source: source,
235284
logger: self.logger
236285
)
237286
case .none:
238287
return
239288
}
240289
}
241290

242-
private func _triggerGracefulShutdownAndFinishSource(
291+
private func _triggerGracefulShutdown(
243292
client: KafkaClient,
244-
source: Producer.Source,
245293
logger: Logger
246294
) {
247-
source.finish()
248-
249295
do {
250296
try client.consumerClose()
251297
} catch {
@@ -288,6 +334,11 @@ extension KafkaConsumer {
288334
client: KafkaClient,
289335
source: Producer.Source
290336
)
337+
/// Consumer is still running but the messages asynchronous sequence was terminated.
338+
/// All incoming messages will be dropped.
339+
///
340+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
341+
case consumptionStopped(client: KafkaClient)
291342
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
292343
/// We are now in the process of commiting our last state to the broker.
293344
///
@@ -325,11 +376,12 @@ extension KafkaConsumer {
325376
client: KafkaClient,
326377
source: Producer.Source
327378
)
328-
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
329-
/// to commit its state to the broker.
379+
/// The ``KafkaConsumer`` stopped consuming messages or
380+
/// is in the process of shutting down.
381+
/// Poll to serve any queued events and commit outstanding state to the broker.
330382
///
331383
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
332-
case pollUntilClosed(client: KafkaClient)
384+
case pollWithoutYield(client: KafkaClient)
333385
/// Terminate the poll loop.
334386
case terminatePollLoop
335387
}
@@ -346,12 +398,14 @@ extension KafkaConsumer {
346398
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
347399
case .consuming(let client, let source):
348400
return .pollForAndYieldMessage(client: client, source: source)
401+
case .consumptionStopped(let client):
402+
return .pollWithoutYield(client: client)
349403
case .finishing(let client):
350404
if client.isConsumerClosed {
351405
self.state = .finished
352406
return .terminatePollLoop
353407
} else {
354-
return .pollUntilClosed(client: client)
408+
return .pollWithoutYield(client: client)
355409
}
356410
case .finished:
357411
return .terminatePollLoop
@@ -378,11 +432,51 @@ extension KafkaConsumer {
378432
source: source
379433
)
380434
return .setUpConnection(client: client)
381-
case .consuming, .finishing, .finished:
435+
case .consuming, .consumptionStopped, .finishing, .finished:
382436
fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer")
383437
}
384438
}
385439

440+
/// The messages asynchronous sequence was terminated.
441+
/// All incoming messages will be dropped.
442+
mutating func messageSequenceTerminated() {
443+
switch self.state {
444+
case .uninitialized:
445+
fatalError("\(#function) invoked while still in state \(self.state)")
446+
case .initializing:
447+
fatalError("Call to \(#function) before setUpConnection() was invoked")
448+
case .consumptionStopped:
449+
fatalError("messageSequenceTerminated() must not be invoked more than once")
450+
case .consuming(let client, _):
451+
self.state = .consumptionStopped(client: client)
452+
case .finishing, .finished:
453+
break
454+
}
455+
}
456+
457+
/// Action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
458+
enum StoreOffsetAction {
459+
/// Store the message offset with the given `client`.
460+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
461+
case storeOffset(client: KafkaClient)
462+
}
463+
464+
/// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
465+
func storeOffset() -> StoreOffsetAction {
466+
switch self.state {
467+
case .uninitialized:
468+
fatalError("\(#function) invoked while still in state \(self.state)")
469+
case .initializing:
470+
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
471+
case .consumptionStopped:
472+
fatalError("Cannot store offset when consumption has been stopped")
473+
case .consuming(let client, _):
474+
return .storeOffset(client: client)
475+
case .finishing, .finished:
476+
fatalError("\(#function) invoked while still in state \(self.state)")
477+
}
478+
}
479+
386480
/// Action to be taken when wanting to do a synchronous commit.
387481
enum CommitSyncAction {
388482
/// Do a synchronous commit.
@@ -405,6 +499,8 @@ extension KafkaConsumer {
405499
fatalError("\(#function) invoked while still in state \(self.state)")
406500
case .initializing:
407501
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
502+
case .consumptionStopped:
503+
fatalError("Cannot commit when consumption has been stopped")
408504
case .consuming(let client, _):
409505
return .commitSync(client: client)
410506
case .finishing, .finished:
@@ -414,6 +510,12 @@ extension KafkaConsumer {
414510

415511
/// Action to be taken when wanting to do close the consumer.
416512
enum FinishAction {
513+
/// Shut down the ``KafkaConsumer``.
514+
///
515+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
516+
case triggerGracefulShutdown(
517+
client: KafkaClient
518+
)
417519
/// Shut down the ``KafkaConsumer`` and finish the given `source` object.
418520
///
419521
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
@@ -440,6 +542,9 @@ extension KafkaConsumer {
440542
client: client,
441543
source: source
442544
)
545+
case .consumptionStopped(let client):
546+
self.state = .finishing(client: client)
547+
return .triggerGracefulShutdown(client: client)
443548
case .finishing, .finished:
444549
return nil
445550
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ extension KafkaProducer {
425425
case .uninitialized:
426426
fatalError("\(#function) invoked while still in state \(self.state)")
427427
case .consumptionStopped:
428-
fatalError("stopConsuming() must not be invoked more than once")
428+
fatalError("messageSequenceTerminated() must not be invoked more than once")
429429
case .started(let client, _, let source, _):
430430
self.state = .consumptionStopped(client: client)
431431
return .finishSource(source: source)

Sources/SwiftKafka/RDKafka/RDKafka.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,13 @@ struct RDKafka {
3333
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
3434

3535
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)
36+
// Manually override some of the configuration options
37+
// Handle logs in event queue
3638
try RDKafkaConfig.set(configPointer: rdConfig, key: "log.queue", value: "true")
39+
// KafkaConsumer is manually storing read offsets
40+
if type == .consumer {
41+
try RDKafkaConfig.set(configPointer: rdConfig, key: "enable.auto.offset.store", value: "false")
42+
}
3743
RDKafkaConfig.setEvents(configPointer: rdConfig, events: events)
3844

3945
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)

0 commit comments

Comments
 (0)