Skip to content

Commit 27cf519

Browse files
KafkaConsumerConfig add SwiftKafka options (#54)
* `KafkaConsumerConfig` add `SwiftKafka` options Motivation: We want to use the `KafkaConsumerConfig` to determine if the `KafkaConsumer` is part of a consumer-group (subscription) or assinged to a single partition of a topic (assingment). This should also be implemented in a way that errors are caught at compile time. Furthermore, we want users to be able to determine a `BackPressureStrategy` for their respective `KafkaClient`. Modifications: * add two new configuration options to `KafkaConsumerConfig`: `ConsumptionStrategy` and `BackPressureStrategy` * remove `KafkaConsumerConfig`'s `groupID` option, this is now taken care of by the `consumptionStrategy` option, which is better as this ensures at compile time that now groupID mismatch errors occur * rename `KafkaProducerConfig` back to `ConfigEnums` as it is not really meant for shared configurations, rather as a namespace for _all_ configuration options * update README * * move setting group.id to `KafkaConsumerConfig` * * fix soundness * * rename `ConfigEnums` -> `KafkaSharedConfiguration` * Review Franz Modifications: * fix wrong Docc comments * remove implicitly unwrapped optional * rename `highLowWatermark(lowWatermark:,highWatermark:)` to `watermark(low:,high:)` * `ConsumptionStrategy`: remove *Based suffixes * replace type of `KafkaConsumer.offset`: `Int64` -> `Int` * * fix docc nits
1 parent 4470f69 commit 27cf519

File tree

8 files changed

+188
-103
lines changed

8 files changed

+188
-103
lines changed

README.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,15 @@ await producer.shutdownGracefully()
3636
After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence).
3737

3838
```swift
39-
let config = KafkaConsumerConfig(bootstrapServers: ["localhost:9092"])
39+
let config = KafkaConsumerConfig(
40+
consumptionStrategy: .partition(
41+
topic: "topic-name",
42+
partition: KafkaPartition(rawValue: 0)
43+
),
44+
bootstrapServers: ["localhost:9092"]
45+
)
4046

4147
let consumer = try KafkaConsumer(
42-
topic: "topic-name",
43-
partition: KafkaPartition(rawValue: 0),
4448
config: config,
4549
logger: .kafkaTest // Your logger here
4650
)
@@ -61,12 +65,11 @@ SwiftKafka also allows users to subscribe to an array of topics as part of a con
6165

6266
```swift
6367
let config = KafkaConsumerConfig(
64-
groupID: "example-group",
68+
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
6569
bootstrapServers: ["localhost:9092"]
6670
)
6771

6872
let consumer = try KafkaConsumer(
69-
topics: ["topic-name"],
7073
config: config,
7174
logger: .kafkaTest // Your logger here
7275
)
@@ -87,13 +90,12 @@ By default, the `KafkaConsumer` automatically commits message offsets after rece
8790

8891
```swift
8992
let config = KafkaConsumerConfig(
90-
groupID: "example-group",
93+
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
9194
enableAutoCommit: false,
9295
bootstrapServers: ["localhost:9092"]
9396
)
9497

9598
let consumer = try KafkaConsumer(
96-
topics: ["topic-name"],
9799
config: config,
98100
logger: .kafkaTest // Your logger here
99101
)

Sources/SwiftKafka/Configuration/KafkaConsumerConfig.swift

Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,50 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Crdkafka
16+
import struct Foundation.UUID
17+
1518
public struct KafkaConsumerConfig: Hashable, Equatable {
16-
var dictionary: [String: String] = [:]
19+
// MARK: - SwiftKafka-specific Config properties
1720

18-
// MARK: - Consumer-specific Config Properties
21+
/// The backpressure strategy to be used for message consumption.
22+
public var backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(
23+
low: 10,
24+
high: 50
25+
)
26+
27+
// This backs the consumptionStrategy computed property.
28+
private var _consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy
1929

20-
/// Client group id string. All clients sharing the same group.id belong to the same group.
21-
public var groupID: String {
22-
get { self.dictionary["group.id"] ?? "" }
23-
set { self.dictionary["group.id"] = String(newValue) }
30+
/// The strategy used for consuming messages.
31+
/// See ``KafkaSharedConfiguration/ConsumptionStrategy`` for more information.
32+
public var consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy {
33+
get { self._consumptionStrategy }
34+
set {
35+
self._consumptionStrategy = newValue
36+
37+
// We do not expose the group.id option to the user
38+
// but rather set it ourselves as part of our much safer
39+
// consumptionStrategy option.
40+
switch newValue._internal {
41+
case .partition:
42+
// Although an assignment is not related to a consumer group,
43+
// librdkafka requires us to set a `group.id`.
44+
// This is a known issue:
45+
// https://github.com/edenhill/librdkafka/issues/3261
46+
self.dictionary["group.id"] = UUID().uuidString
47+
case .group(groupID: let groupID, topics: _):
48+
self.dictionary["group.id"] = groupID
49+
}
50+
}
2451
}
2552

53+
// MARK: - librdkafka Config properties
54+
55+
var dictionary: [String: String] = [:]
56+
57+
// MARK: - Consumer-specific Config Properties
58+
2659
/// Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms. Also see max.poll.interval.ms.
2760
public var sessionTimeoutMs: UInt {
2861
get { self.dictionary.getUInt("session.timeout.ms") ?? 45000 }
@@ -53,7 +86,7 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
5386
set { self.dictionary["auto.commit.interval.ms"] = String(newValue) }
5487
}
5588

56-
/// Action to take when there is no initial offset in offset store or the desired offset is out of range. See ``ConfigEnums/AutoOffsetReset`` for more information.
89+
/// Action to take when there is no initial offset in offset store or the desired offset is out of range. See ``KafkaSharedConfiguration/AutoOffsetReset`` for more information.
5790
public var autoOffsetReset: KafkaSharedConfiguration.AutoOffsetReset {
5891
get { self.getAutoOffsetReset() ?? .largest }
5992
set { self.dictionary["auto.offset.reset"] = newValue.description }
@@ -205,7 +238,7 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
205238
set { self.dictionary["broker.address.ttl"] = String(newValue) }
206239
}
207240

208-
/// Allowed broker ``ConfigEnums/IPAddressFamily``.
241+
/// Allowed broker ``KafkaSharedConfiguration/IPAddressFamily``.
209242
public var brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily {
210243
get { self.getIPAddressFamily() ?? .any }
211244
set { self.dictionary["broker.address.family"] = newValue.description }
@@ -223,7 +256,7 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
223256
set { self.dictionary["reconnect.backoff.max.ms"] = String(newValue) }
224257
}
225258

226-
/// ``ConfigEnums/SecurityProtocol`` used to communicate with brokers.
259+
/// ``KafkaSharedConfiguration/SecurityProtocol`` used to communicate with brokers.
227260
public var securityProtocol: KafkaSharedConfiguration.SecurityProtocol {
228261
get { self.getSecurityProtocol() ?? .plaintext }
229262
set { self.dictionary["security.protocol"] = newValue.description }
@@ -302,7 +335,8 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
302335
}
303336

304337
public init(
305-
groupID: String = "",
338+
consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy,
339+
backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(low: 10, high: 50),
306340
sessionTimeoutMs: UInt = 45000,
307341
heartbeatIntervalMs: UInt = 3000,
308342
maxPollInvervalMs: UInt = 300_000,
@@ -347,7 +381,10 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
347381
saslUsername: String? = nil,
348382
saslPassword: String? = nil
349383
) {
350-
self.groupID = groupID
384+
self._consumptionStrategy = consumptionStrategy
385+
self.consumptionStrategy = consumptionStrategy // used to invoke set { } method
386+
self.backPressureStrategy = backPressureStrategy
387+
351388
self.sessionTimeoutMs = sessionTimeoutMs
352389
self.heartbeatIntervalMs = heartbeatIntervalMs
353390
self.maxPollInvervalMs = maxPollInvervalMs
@@ -433,9 +470,70 @@ public struct KafkaConsumerConfig: Hashable, Equatable {
433470
}
434471
}
435472

436-
// MARK: - ConfigEnums + AutoOffsetReset
473+
// MARK: - KafkaSharedConfiguration + Consumer Additions
437474

438475
extension KafkaSharedConfiguration {
476+
/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
477+
public struct BackPressureStrategy: Hashable, Equatable {
478+
enum _BackPressureStrategy: Hashable, Equatable {
479+
case watermark(low: Int, high: Int)
480+
}
481+
482+
let _internal: _BackPressureStrategy
483+
484+
private init(backPressureStrategy: _BackPressureStrategy) {
485+
self._internal = backPressureStrategy
486+
}
487+
488+
/// A back pressure strategy based on high and low watermarks.
489+
///
490+
/// The consumer maintains a buffer size between a low watermark and a high watermark
491+
/// to control the flow of incoming messages.
492+
///
493+
/// - Parameter low: The lower threshold for the buffer size (low watermark).
494+
/// - Parameter high: The upper threshold for the buffer size (high watermark).
495+
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
496+
return .init(backPressureStrategy: .watermark(low: low, high: high))
497+
}
498+
}
499+
500+
/// A struct representing the different Kafka message consumption strategies.
501+
public struct ConsumptionStrategy: Hashable, Equatable {
502+
enum _ConsumptionStrategy: Hashable, Equatable {
503+
case partition(topic: String, partition: KafkaPartition, offset: Int)
504+
case group(groupID: String, topics: [String])
505+
}
506+
507+
let _internal: _ConsumptionStrategy
508+
509+
private init(consumptionStrategy: _ConsumptionStrategy) {
510+
self._internal = consumptionStrategy
511+
}
512+
513+
/// A consumption strategy based on partition assignment.
514+
/// The consumer reads from a specific partition of a topic at a given offset.
515+
///
516+
/// - Parameter topic: The name of the Kafka topic.
517+
/// - Parameter partition: The partition of the topic to consume from.
518+
/// - Parameter offset: The offset to start consuming from.
519+
public static func partition(
520+
topic: String,
521+
partition: KafkaPartition,
522+
offset: Int = Int(RD_KAFKA_OFFSET_END)
523+
) -> ConsumptionStrategy {
524+
return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset))
525+
}
526+
527+
/// A consumption strategy based on consumer group membership.
528+
/// The consumer joins a consumer group identified by a group ID and consumes from multiple topics.
529+
///
530+
/// - Parameter groupID: The ID of the consumer group to join.
531+
/// - Parameter topics: An array of topic names to consume from.
532+
public static func group(groupID: String, topics: [String]) -> ConsumptionStrategy {
533+
return .init(consumptionStrategy: .group(groupID: groupID, topics: topics))
534+
}
535+
}
536+
439537
/// Available actions to take when there is no initial offset in offset store / offset is out of range.
440538
public struct AutoOffsetReset: Hashable, Equatable, CustomStringConvertible {
441539
public let description: String

Sources/SwiftKafka/Configuration/KafkaProducerConfig.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public struct KafkaProducerConfig: Hashable, Equatable {
199199
set { self.dictionary["broker.address.ttl"] = String(newValue) }
200200
}
201201

202-
/// Allowed broker ``ConfigEnums/IPAddressFamily``.
202+
/// Allowed broker ``KafkaSharedConfiguration/IPAddressFamily``.
203203
public var brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily {
204204
get { self.getIPAddressFamily() ?? .any }
205205
set { self.dictionary["broker.address.family"] = newValue.description }
@@ -217,7 +217,7 @@ public struct KafkaProducerConfig: Hashable, Equatable {
217217
set { self.dictionary["reconnect.backoff.max.ms"] = String(newValue) }
218218
}
219219

220-
/// ``ConfigEnums/SecurityProtocol`` used to communicate with brokers.
220+
/// ``KafkaSharedConfiguration/SecurityProtocol`` used to communicate with brokers.
221221
public var securityProtocol: KafkaSharedConfiguration.SecurityProtocol {
222222
get { self.getSecurityProtocol() ?? .plaintext }
223223
set { self.dictionary["security.protocol"] = newValue.description }

Sources/SwiftKafka/Configuration/KafkaTopicConfig.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public struct KafkaTopicConfig: Hashable, Equatable {
3434
set { self.dictionary["message.timeout.ms"] = String(newValue) }
3535
}
3636

37-
/// Paritioner. See ``ConfigEnums/Partitioner`` for more information.
37+
/// Paritioner. See ``KafkaSharedConfiguration/Partitioner`` for more information.
3838
public var partitioner: KafkaSharedConfiguration.Partitioner {
3939
get { self.getPartitioner() ?? .consistentRandom }
4040
set { self.dictionary["partitioner"] = newValue.description }

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public struct KafkaAcknowledgedMessage: Hashable {
3030
/// The body of the message.
3131
public var value: ByteBuffer
3232
/// The offset of the message in its partition.
33-
public var offset: Int64
33+
public var offset: Int
3434

3535
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
3636
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
@@ -63,6 +63,6 @@ public struct KafkaAcknowledgedMessage: Hashable {
6363
self.key = nil
6464
}
6565

66-
self.offset = Int64(rdKafkaMessage.offset)
66+
self.offset = Int(rdKafkaMessage.offset)
6767
}
6868
}

0 commit comments

Comments
 (0)