Skip to content

Commit 6e53e52

Browse files
New KafkaConfig Refactoring (#48)
* Refactor KafkaClient to use new Kafka configs Motivation: Currently, the new strongly-typed config structs such as ProducerConfig are just used to create a new KafkaConfig. We want to get rid of KafkaConfig entirely and only create librdkafka configs right before rd_kafka_new() is called to avoid having to bother with the memory management of a rd_kafka_conf_t object. Modifications: * deprecate KafkaConfig in favour of ProducerConfig, ConsumerConfig etc. Result: * everything should work as before, just with a slightly nicer syntax * Temporarily remove socket.connection.setup.timeout.ms Motivation: * This option is only available with newer librdkafka versions, not including the librdkafka package used in our Docker image Modifications: * Temporarily commented out the option to pass unit tests on Linux * Remove KafkaConfig Modifications: * remove KafkaConfig * remove unused Tests * update DocC to use ConsumerConfig and ProducerConfig * Create internal wrapper for rd_kafka_conf funcs Modifications: * new struct `RDKafkaConfig` containing helper functions wrapping `rd_kafka_conf*` functions for easier use inside of `KafkaClient` * Replace KafkaTopicConfig with new TopicConfig Motivation: * similar to Producer/Consumer configs, the librdkafka rd_kafka_topic_conf_t config shall only be created when needed by the rd_kafka_topic_new, for all other cases a light-weight Swift struct TopicConfig shall be used Modifications: * delete `KafkaTopicConfig` and `KafkaTopicConfigTests` * create a new wrapper `RDKafkaTopicConfig` that wraps the common `rd_kafka_topic_conf_*` functions * refactor `KafkaProducer` into using the new `TopicConfig` type * Remove TODOs * Rearrange folders + rename config structs Motivation: * create a clearer file structure * add `Kafka*` prefix to configs to have a clear Kafka namespace and avoid name collisions when used in other projects Modifications: * created folders for Configuration, RDKafka and Utilities * rename ConsumerConfig -> KafkaConsumerConfig * rename ProducerConfig -> KafkaProducerConfig * rename TopicConfig -> KafkaTopicConfig * rename ConfigEnums -> KafkaConfigEnums * RDKafkaConfig.setDeliveryCallback -> .setDeliveryReportCallback * Update README to use new config types * Remove unused DocC comment * * re-enable socketConnectionSetupTimeoutMs configuration option in Kafka(Consumer|Producer)Config * Review Franz Modifications: * rename `KafkaConfigEnums` to `KafkaSharedConfiguration` * remove `protocol StringDictionaryRepresentable` * KafkaProducer: ensure deliveryCallback is retained Motivation: With the current code architecture, `KafkaClient` has a var `opaque` that references a `CapturedClosure` which in turn captures the delivery callback that gets invoked by Kafka upon successful message delivery. This callback has to be retained in memory while our producer is running. Modifications: * make `KafkaClient` a `class` in which the captured closure is a `let` and not a `var` * create a factory method called `RDKafka.createClient` that creates a new `KafkaClient` for a given set of parameters
1 parent f09a67e commit 6e53e52

20 files changed

+345
-823
lines changed

README.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ SwiftKafka is a Swift Package in development that provides a convenient way to c
99
The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
1010

1111
```swift
12-
var config = KafkaConfig()
13-
try config.set("localhost:9092", forKey: "bootstrap.servers")
12+
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])
1413

1514
let producer = try await KafkaProducer(
1615
config: config,
@@ -37,8 +36,7 @@ await producer.shutdownGracefully()
3736
After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence).
3837

3938
```swift
40-
var config = KafkaConfig()
41-
try config.set("localhost:9092", forKey: "bootstrap.servers")
39+
let config = KafkaConsumerConfig(bootstrapServers: ["localhost:9092"])
4240

4341
let consumer = try KafkaConsumer(
4442
topic: "topic-name",
@@ -62,12 +60,13 @@ for await messageResult in consumer.messages {
6260
SwiftKafka also allows users to subscribe to an array of topics as part of a consumer group.
6361

6462
```swift
65-
var config = KafkaConfig()
66-
try config.set("localhost:9092", forKey: "bootstrap.servers")
63+
let config = KafkaConsumerConfig(
64+
groupID: "example-group",
65+
bootstrapServers: ["localhost:9092"]
66+
)
6767

6868
let consumer = try KafkaConsumer(
6969
topics: ["topic-name"],
70-
groupID: "example-group",
7170
config: config,
7271
logger: .kafkaTest // Your logger here
7372
)
@@ -87,13 +86,14 @@ for await messageResult in consumer.messages {
8786
By default, the `KafkaConsumer` automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.
8887

8988
```swift
90-
var config = KafkaConfig()
91-
try config.set("localhost:9092", forKey: "bootstrap.servers")
92-
try config.set("false", forKey: "enable.auto.commit")
89+
let config = KafkaConsumerConfig(
90+
groupID: "example-group",
91+
enableAutoCommit: false,
92+
bootstrapServers: ["localhost:9092"]
93+
)
9394

9495
let consumer = try KafkaConsumer(
9596
topics: ["topic-name"],
96-
groupID: "example-group",
9797
config: config,
9898
logger: .kafkaTest // Your logger here
9999
)

Sources/SwiftKafka/New/ConsumerConfig.swift renamed to Sources/SwiftKafka/Configuration/KafkaConsumerConfig.swift

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable {
15+
public struct KafkaConsumerConfig: Hashable, Equatable {
1616
var dictionary: [String: String] = [:]
1717

1818
// MARK: - Consumer-specific Config Properties
@@ -54,7 +54,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
5454
}
5555

5656
/// Action to take when there is no initial offset in offset store or the desired offset is out of range. See ``ConfigEnums/AutoOffsetReset`` for more information.
57-
public var autoOffsetReset: ConfigEnums.AutoOffsetReset {
57+
public var autoOffsetReset: KafkaSharedConfiguration.AutoOffsetReset {
5858
get { self.getAutoOffsetReset() ?? .largest }
5959
set { self.dictionary["auto.offset.reset"] = newValue.description }
6060
}
@@ -146,7 +146,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
146146
}
147147

148148
/// A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch.
149-
public var debug: [ConfigEnums.DebugOption] {
149+
public var debug: [KafkaSharedConfiguration.DebugOption] {
150150
get { self.getDebugOptions() }
151151
set {
152152
if !newValue.isEmpty {
@@ -204,7 +204,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
204204
}
205205

206206
/// Allowed broker ``ConfigEnums/IPAddressFamily``.
207-
public var brokerAddressFamily: ConfigEnums.IPAddressFamily {
207+
public var brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily {
208208
get { self.getIPAddressFamily() ?? .any }
209209
set { self.dictionary["broker.address.family"] = newValue.description }
210210
}
@@ -222,7 +222,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
222222
}
223223

224224
/// ``ConfigEnums/SecurityProtocol`` used to communicate with brokers.
225-
public var securityProtocol: ConfigEnums.SecurityProtocol {
225+
public var securityProtocol: KafkaSharedConfiguration.SecurityProtocol {
226226
get { self.getSecurityProtocol() ?? .plaintext }
227227
set { self.dictionary["security.protocol"] = newValue.description }
228228
}
@@ -270,7 +270,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
270270
}
271271

272272
/// SASL mechanism to use for authentication.
273-
public var saslMechanism: ConfigEnums.SASLMechanism? {
273+
public var saslMechanism: KafkaSharedConfiguration.SASLMechanism? {
274274
get { self.getSASLMechanism() }
275275
set {
276276
if let newValue {
@@ -307,7 +307,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
307307
enableAutoCommit: Bool = true,
308308
autoCommitIntervalMs: UInt = 5000,
309309
enableAutoOffsetStore: Bool = true,
310-
autoOffsetReset: ConfigEnums.AutoOffsetReset = .largest,
310+
autoOffsetReset: KafkaSharedConfiguration.AutoOffsetReset = .largest,
311311
allowAutoCreateTopics: Bool = false,
312312
clientID: String = "rdkafka",
313313
bootstrapServers: [String] = [],
@@ -321,7 +321,7 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
321321
topicMetadataRefreshSparse: Bool = true,
322322
topicMetadataPropagationMaxMs: UInt = 30000,
323323
topicDenylist: [String] = [],
324-
debug: [ConfigEnums.DebugOption] = [],
324+
debug: [KafkaSharedConfiguration.DebugOption] = [],
325325
socketTimeoutMs: UInt = 60000,
326326
socketSendBufferBytes: UInt = 0,
327327
socketReceiveBufferBytes: UInt = 0,
@@ -330,18 +330,18 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
330330
socketMaxFails: UInt = 1,
331331
socketConnectionSetupTimeoutMs: UInt = 30000,
332332
brokerAddressTTL: UInt = 1000,
333-
brokerAddressFamily: ConfigEnums.IPAddressFamily = .any,
333+
brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily = .any,
334334
reconnectBackoffMs: UInt = 100,
335335
reconnectBackoffMaxMs: UInt = 10000,
336-
securityProtocol: ConfigEnums.SecurityProtocol = .plaintext,
336+
securityProtocol: KafkaSharedConfiguration.SecurityProtocol = .plaintext,
337337
sslKeyLocation: String = "",
338338
sslKeyPassword: String = "",
339339
sslCertificateLocation: String = "",
340340
sslCALocation: String = "",
341341
sslCRLLocation: String = "",
342342
sslKeystoreLocation: String = "",
343343
sslKeystorePassword: String = "",
344-
saslMechanism: ConfigEnums.SASLMechanism? = nil,
344+
saslMechanism: KafkaSharedConfiguration.SASLMechanism? = nil,
345345
saslUsername: String? = nil,
346346
saslPassword: String? = nil
347347
) {
@@ -394,46 +394,46 @@ public struct ConsumerConfig: Hashable, Equatable, StringDictionaryRepresentable
394394

395395
// MARK: - Helpers
396396

397-
func getDebugOptions() -> [ConfigEnums.DebugOption] {
397+
func getDebugOptions() -> [KafkaSharedConfiguration.DebugOption] {
398398
guard let options = dictionary["debug"] else {
399399
return []
400400
}
401401
return options.components(separatedBy: ",")
402-
.map { ConfigEnums.DebugOption(description: $0) }
402+
.map { KafkaSharedConfiguration.DebugOption(description: $0) }
403403
}
404404

405-
func getIPAddressFamily() -> ConfigEnums.IPAddressFamily? {
405+
func getIPAddressFamily() -> KafkaSharedConfiguration.IPAddressFamily? {
406406
guard let value = dictionary["broker.address.family"] else {
407407
return nil
408408
}
409-
return ConfigEnums.IPAddressFamily(description: value)
409+
return KafkaSharedConfiguration.IPAddressFamily(description: value)
410410
}
411411

412-
func getSecurityProtocol() -> ConfigEnums.SecurityProtocol? {
412+
func getSecurityProtocol() -> KafkaSharedConfiguration.SecurityProtocol? {
413413
guard let value = dictionary["security.protocol"] else {
414414
return nil
415415
}
416-
return ConfigEnums.SecurityProtocol(description: value)
416+
return KafkaSharedConfiguration.SecurityProtocol(description: value)
417417
}
418418

419-
func getSASLMechanism() -> ConfigEnums.SASLMechanism? {
419+
func getSASLMechanism() -> KafkaSharedConfiguration.SASLMechanism? {
420420
guard let value = dictionary["sasl.mechanism"] else {
421421
return nil
422422
}
423-
return ConfigEnums.SASLMechanism(description: value)
423+
return KafkaSharedConfiguration.SASLMechanism(description: value)
424424
}
425425

426-
func getAutoOffsetReset() -> ConfigEnums.AutoOffsetReset? {
426+
func getAutoOffsetReset() -> KafkaSharedConfiguration.AutoOffsetReset? {
427427
guard let value = dictionary["auto.offset.reset"] else {
428428
return nil
429429
}
430-
return ConfigEnums.AutoOffsetReset(description: value)
430+
return KafkaSharedConfiguration.AutoOffsetReset(description: value)
431431
}
432432
}
433433

434434
// MARK: - ConfigEnums + AutoOffsetReset
435435

436-
extension ConfigEnums {
436+
extension KafkaSharedConfiguration {
437437
/// Available actions to take when there is no initial offset in offset store / offset is out of range.
438438
public struct AutoOffsetReset: Hashable, Equatable, CustomStringConvertible {
439439
public let description: String

Sources/SwiftKafka/New/ProducerConfig.swift renamed to Sources/SwiftKafka/Configuration/KafkaProducerConfig.swift

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable {
15+
public struct KafkaProducerConfig: Hashable, Equatable {
1616
var dictionary: [String: String] = [:]
1717

1818
// MARK: - Producer-specific Config Properties
@@ -134,7 +134,7 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
134134
}
135135

136136
/// A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch.
137-
public var debug: [ConfigEnums.DebugOption] {
137+
public var debug: [KafkaSharedConfiguration.DebugOption] {
138138
get { self.getDebugOptions() }
139139
set {
140140
if !newValue.isEmpty {
@@ -192,7 +192,7 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
192192
}
193193

194194
/// Allowed broker ``ConfigEnums/IPAddressFamily``.
195-
public var brokerAddressFamily: ConfigEnums.IPAddressFamily {
195+
public var brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily {
196196
get { self.getIPAddressFamily() ?? .any }
197197
set { self.dictionary["broker.address.family"] = newValue.description }
198198
}
@@ -210,7 +210,7 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
210210
}
211211

212212
/// ``ConfigEnums/SecurityProtocol`` used to communicate with brokers.
213-
public var securityProtocol: ConfigEnums.SecurityProtocol {
213+
public var securityProtocol: KafkaSharedConfiguration.SecurityProtocol {
214214
get { self.getSecurityProtocol() ?? .plaintext }
215215
set { self.dictionary["security.protocol"] = newValue.description }
216216
}
@@ -258,7 +258,7 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
258258
}
259259

260260
/// SASL mechanism to use for authentication.
261-
public var saslMechanism: ConfigEnums.SASLMechanism? {
261+
public var saslMechanism: KafkaSharedConfiguration.SASLMechanism? {
262262
get { self.getSASLMechanism() }
263263
set {
264264
if let newValue {
@@ -307,7 +307,7 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
307307
topicMetadataRefreshSparse: Bool = true,
308308
topicMetadataPropagationMaxMs: UInt = 30000,
309309
topicDenylist: [String] = [],
310-
debug: [ConfigEnums.DebugOption] = [],
310+
debug: [KafkaSharedConfiguration.DebugOption] = [],
311311
socketTimeoutMs: UInt = 60000,
312312
socketSendBufferBytes: UInt = 0,
313313
socketReceiveBufferBytes: UInt = 0,
@@ -316,18 +316,18 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
316316
socketMaxFails: UInt = 1,
317317
socketConnectionSetupTimeoutMs: UInt = 30000,
318318
brokerAddressTTL: UInt = 1000,
319-
brokerAddressFamily: ConfigEnums.IPAddressFamily = .any,
319+
brokerAddressFamily: KafkaSharedConfiguration.IPAddressFamily = .any,
320320
reconnectBackoffMs: UInt = 100,
321321
reconnectBackoffMaxMs: UInt = 10000,
322-
securityProtocol: ConfigEnums.SecurityProtocol = .plaintext,
322+
securityProtocol: KafkaSharedConfiguration.SecurityProtocol = .plaintext,
323323
sslKeyLocation: String = "",
324324
sslKeyPassword: String = "",
325325
sslCertificateLocation: String = "",
326326
sslCALocation: String = "",
327327
sslCRLLocation: String = "",
328328
sslKeystoreLocation: String = "",
329329
sslKeystorePassword: String = "",
330-
saslMechanism: ConfigEnums.SASLMechanism? = nil,
330+
saslMechanism: KafkaSharedConfiguration.SASLMechanism? = nil,
331331
saslUsername: String? = nil,
332332
saslPassword: String? = nil
333333
) {
@@ -377,32 +377,32 @@ public struct ProducerConfig: Hashable, Equatable, StringDictionaryRepresentable
377377

378378
// MARK: - Helpers
379379

380-
func getDebugOptions() -> [ConfigEnums.DebugOption] {
380+
func getDebugOptions() -> [KafkaSharedConfiguration.DebugOption] {
381381
guard let options = dictionary["debug"] else {
382382
return []
383383
}
384384
return options.components(separatedBy: ",")
385-
.map { ConfigEnums.DebugOption(description: $0) }
385+
.map { KafkaSharedConfiguration.DebugOption(description: $0) }
386386
}
387387

388-
func getIPAddressFamily() -> ConfigEnums.IPAddressFamily? {
388+
func getIPAddressFamily() -> KafkaSharedConfiguration.IPAddressFamily? {
389389
guard let value = dictionary["broker.address.family"] else {
390390
return nil
391391
}
392-
return ConfigEnums.IPAddressFamily(description: value)
392+
return KafkaSharedConfiguration.IPAddressFamily(description: value)
393393
}
394394

395-
func getSecurityProtocol() -> ConfigEnums.SecurityProtocol? {
395+
func getSecurityProtocol() -> KafkaSharedConfiguration.SecurityProtocol? {
396396
guard let value = dictionary["security.protocol"] else {
397397
return nil
398398
}
399-
return ConfigEnums.SecurityProtocol(description: value)
399+
return KafkaSharedConfiguration.SecurityProtocol(description: value)
400400
}
401401

402-
func getSASLMechanism() -> ConfigEnums.SASLMechanism? {
402+
func getSASLMechanism() -> KafkaSharedConfiguration.SASLMechanism? {
403403
guard let value = dictionary["sasl.mechanism"] else {
404404
return nil
405405
}
406-
return ConfigEnums.SASLMechanism(description: value)
406+
return KafkaSharedConfiguration.SASLMechanism(description: value)
407407
}
408408
}

Sources/SwiftKafka/New/ConfigEnums.swift renamed to Sources/SwiftKafka/Configuration/KafkaSharedConfiguration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
/// Collection of `enum` types used in the configuration structs this library provides.
16-
public struct ConfigEnums {
16+
public enum KafkaSharedConfiguration {
1717
/// Available debug contexts to enable.
1818
public struct DebugOption: Hashable, Equatable, CustomStringConvertible {
1919
public let description: String

0 commit comments

Comments
 (0)