Skip to content

Commit b0bc014

Browse files
KafkaConsumer Refactoring (#66)
* `KafkaConsumer` `run()` method Modifications: * `KafkaConsumer`: remove `NIOAsyncSequenceProducer` and use own implementation of `AsyncSequence` instead * write async wrapper method `KafkaClient.consumerPoll(timeout:)` * update memory leak tests for `KafkaConsumer` and `KafkaProducer` * update tests * KafkaConsumer: Replace Serial Queue Modifications: * replace `serialQueue` in `KafkaConsumer` with `StateMachine` that encapsulates all variables and can also be accessed from the `ConsumerMessagesAsyncSequence` * close `KafkaConsumer` when `for await` loop of `AsyncSequence` is exited * make `ConsumerMessagesAsyncIterator` a class-backed `struct` * make `KafkaConsumer.shutdownGracefully` `public` * * remove KafkaConsumerTests * * remove unused NIOCore imports * * remove backpressure strategy from KafkaConsumerConfig * * KafkaConsumer: inject poll interval through initializer (temporary) * Move polling to dedicated run() method Modifications: * add `run()` method to `KafkaConsumer` * rename `ConsumerMessagesAsyncSequence` to `KafkaConsumerMessages` * `KafkaConsumer`: put back `NIOAsyncSequenceProducer` * update `README` * use `Duration` type for `pollInterval` * Review Franz Modifications: * `KafkaClient.consumerPoll`: remove `timeout` as this method should only be used with `timeout = 0` * `KafkaClient.consumerPoll`: remove continuation as it has no benefit * Review Franz Modifications: * make `KafkaConsumer.shutdownGracefully()` `private` * `KafkaConsumer`: create `librdkafka` wrapping methods in `KafkaClient` instead of using `KafkaClient.withKafkaHandlePointer` * `KafkaConsumer`: rename `.killPollLoop` -> `.terminatePollLoop` * `KafkaConsumer.StateMachine`: move `logger` out of `State` * `KafkaConsumer`: move `pollInterval` out of `StateMachine` * Wrapper type for rd_kafka_topic_partition_list_t Modifications: * create new class `RDKafkaTopicPartitionList` wrapping a `rd_kafka_topic_partition_list_t` * KafkaConsumer: Remove unneccesary references to subscribedTopicsPointer * Review Franz Modifications: * make `RDKafkaTopicPartitionList` a `final class` * make `KafkaConsumer.messages` `AsyncSequence` return `KafkaConsumerMessage` instead of `Result<,>` type * `KafkaConsumer` fails immediately when `rd_kafka_message_t` with error payload is received * move `KafkaConsumer.pollInterval` to `KafkaConsumerConfiguration` * move `KafkaProducer.pollInterval` to `KafkaProducerConfiguration` * update `README`
1 parent 51987ec commit b0bc014

File tree

8 files changed

+596
-324
lines changed

8 files changed

+596
-324
lines changed

README.md

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,18 @@ let consumer = try KafkaConsumer(
7979
logger: .kafkaTest // Your logger here
8080
)
8181

82-
for await messageResult in consumer.messages {
83-
switch messageResult {
84-
case .success(let message):
85-
// Do something with message
86-
case .failure(let error):
87-
// Handle error
82+
await withThrowingTaskGroup(of: Void.self) { group in
83+
84+
// Run Task
85+
group.addTask {
86+
try await consumer.run()
87+
}
88+
89+
// Task receiving messages
90+
group.addTask {
91+
for await message in consumer.messages {
92+
// Do something with message
93+
}
8894
}
8995
}
9096
```
@@ -104,12 +110,18 @@ let consumer = try KafkaConsumer(
104110
logger: .kafkaTest // Your logger here
105111
)
106112

107-
for await messageResult in consumer.messages {
108-
switch messageResult {
109-
case .success(let message):
110-
// Do something with message
111-
case .failure(let error):
112-
// Handle error
113+
await withThrowingTaskGroup(of: Void.self) { group in
114+
115+
// Run Task
116+
group.addTask {
117+
try await consumer.run()
118+
}
119+
120+
// Task receiving messages
121+
group.addTask {
122+
for await message in consumer.messages {
123+
// Do something with message
124+
}
113125
}
114126
}
115127
```
@@ -130,13 +142,20 @@ let consumer = try KafkaConsumer(
130142
logger: .kafkaTest // Your logger here
131143
)
132144

133-
for await messageResult in consumer.messages {
134-
switch messageResult {
135-
case .success(let message):
136-
// Do something with message
137-
try await consumer.commitSync(message)
138-
case .failure(let error):
139-
// Handle error
145+
await withThrowingTaskGroup(of: Void.self) { group in
146+
147+
// Run Task
148+
group.addTask {
149+
try await consumer.run()
150+
}
151+
152+
// Task receiving messages
153+
group.addTask {
154+
for await message in consumer.messages {
155+
// Do something with message
156+
// ...
157+
try await consumer.commitSync(message)
158+
}
140159
}
141160
}
142161
```

Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@ import struct Foundation.UUID
1818
public struct KafkaConsumerConfiguration: Hashable {
1919
// MARK: - SwiftKafka-specific Config properties
2020

21-
/// The backpressure strategy to be used for message consumption.
22-
public var backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(
23-
low: 10,
24-
high: 50
25-
)
21+
/// The time between two consecutive polls.
22+
/// Effectively controls the rate at which incoming events and messages are consumed.
23+
public var pollInterval: Duration
2624

2725
// This backs the consumptionStrategy computed property.
2826
private var _consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy
@@ -335,8 +333,8 @@ public struct KafkaConsumerConfiguration: Hashable {
335333
}
336334

337335
public init(
336+
pollInterval: Duration = .milliseconds(100),
338337
consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy,
339-
backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(low: 10, high: 50),
340338
sessionTimeoutMs: UInt = 45000,
341339
heartbeatIntervalMs: UInt = 3000,
342340
maxPollInvervalMs: UInt = 300_000,
@@ -381,9 +379,9 @@ public struct KafkaConsumerConfiguration: Hashable {
381379
saslUsername: String? = nil,
382380
saslPassword: String? = nil
383381
) {
382+
self.pollInterval = pollInterval
384383
self._consumptionStrategy = consumptionStrategy
385384
self.consumptionStrategy = consumptionStrategy // used to invoke set { } method
386-
self.backPressureStrategy = backPressureStrategy
387385

388386
self.sessionTimeoutMs = sessionTimeoutMs
389387
self.heartbeatIntervalMs = heartbeatIntervalMs
@@ -473,30 +471,6 @@ public struct KafkaConsumerConfiguration: Hashable {
473471
// MARK: - KafkaSharedConfiguration + Consumer Additions
474472

475473
extension KafkaSharedConfiguration {
476-
/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
477-
public struct BackPressureStrategy: Hashable {
478-
enum _BackPressureStrategy: Hashable {
479-
case watermark(low: Int, high: Int)
480-
}
481-
482-
let _internal: _BackPressureStrategy
483-
484-
private init(backPressureStrategy: _BackPressureStrategy) {
485-
self._internal = backPressureStrategy
486-
}
487-
488-
/// A back pressure strategy based on high and low watermarks.
489-
///
490-
/// The consumer maintains a buffer size between a low watermark and a high watermark
491-
/// to control the flow of incoming messages.
492-
///
493-
/// - Parameter low: The lower threshold for the buffer size (low watermark).
494-
/// - Parameter high: The upper threshold for the buffer size (high watermark).
495-
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
496-
return .init(backPressureStrategy: .watermark(low: low, high: high))
497-
}
498-
}
499-
500474
/// A struct representing the different Kafka message consumption strategies.
501475
public struct ConsumptionStrategy: Hashable {
502476
enum _ConsumptionStrategy: Hashable {

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
public struct KafkaProducerConfiguration: Hashable {
16+
// MARK: - SwiftKafka-specific Config properties
17+
18+
/// The time between two consecutive polls.
19+
/// Effectively controls the rate at which incoming events and acknowledgements are consumed.
20+
public var pollInterval: Duration
21+
22+
// MARK: - librdkafka Config properties
23+
1624
var dictionary: [String: String] = [:]
1725

1826
// MARK: - Producer-specific Config Properties
@@ -296,6 +304,7 @@ public struct KafkaProducerConfiguration: Hashable {
296304
}
297305

298306
public init(
307+
pollInterval: Duration = .milliseconds(100),
299308
transactionalID: String = "",
300309
transactionalTimeoutMs: UInt = 60000,
301310
enableIdempotence: Bool = false,
@@ -340,6 +349,8 @@ public struct KafkaProducerConfiguration: Hashable {
340349
saslUsername: String? = nil,
341350
saslPassword: String? = nil
342351
) {
352+
self.pollInterval = pollInterval
353+
343354
self.transactionalID = transactionalID
344355
self.transactionTimeoutMs = transactionalTimeoutMs
345356
self.enableIdempotence = enableIdempotence

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,76 @@ final class KafkaClient {
5656
return rd_kafka_poll(self.kafkaHandle, timeout)
5757
}
5858

59+
/// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
60+
/// queue (``KafkaClient/consumerPoll``).
61+
///
62+
/// Events that would be triggered by ``KafkaClient/poll(timeout:)``
63+
/// are now triggered by ``KafkaClient/consumerPoll``.
64+
///
65+
/// - Warning: It is not allowed to call ``KafkaClient/poll(timeout:)`` after ``KafkaClient/pollSetConsumer``.
66+
func pollSetConsumer() throws {
67+
let result = rd_kafka_poll_set_consumer(self.kafkaHandle)
68+
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
69+
throw KafkaError.rdKafkaError(wrapping: result)
70+
}
71+
}
72+
73+
/// Request a new message from the Kafka cluster.
74+
///
75+
/// - Important: This method should only be invoked from ``KafkaConsumer``.
76+
///
77+
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
78+
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
79+
func consumerPoll() throws -> KafkaConsumerMessage? {
80+
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else {
81+
// No error, there might be no more messages
82+
return nil
83+
}
84+
85+
defer {
86+
// Destroy message otherwise poll() will block forever
87+
rd_kafka_message_destroy(messagePointer)
88+
}
89+
90+
// Reached the end of the topic+partition queue on the broker
91+
if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
92+
return nil
93+
}
94+
95+
let message = try KafkaConsumerMessage(messagePointer: messagePointer)
96+
return message
97+
}
98+
99+
/// Subscribe to topic set using balanced consumer groups.
100+
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
101+
func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws {
102+
try topicPartitionList.withListPointer { pointer in
103+
let result = rd_kafka_subscribe(self.kafkaHandle, pointer)
104+
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
105+
throw KafkaError.rdKafkaError(wrapping: result)
106+
}
107+
}
108+
}
109+
110+
/// Atomic assignment of partitions to consume.
111+
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
112+
func assign(topicPartitionList: RDKafkaTopicPartitionList) throws {
113+
try topicPartitionList.withListPointer { pointer in
114+
let result = rd_kafka_assign(self.kafkaHandle, pointer)
115+
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
116+
throw KafkaError.rdKafkaError(wrapping: result)
117+
}
118+
}
119+
}
120+
121+
/// Close the consumer.
122+
func consumerClose() throws {
123+
let result = rd_kafka_consumer_close(self.kafkaHandle)
124+
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
125+
throw KafkaError.rdKafkaError(wrapping: result)
126+
}
127+
}
128+
59129
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
60130
/// - Warning: Do not escape the pointer from the closure for later use.
61131
/// - Parameter body: The closure will use the Kafka handle pointer.

0 commit comments

Comments
 (0)