Skip to content

Commit cd0dc2e

Browse files
Strongly-typed Configurations (#42)
1 parent f3ca724 commit cd0dc2e

10 files changed

+1220
-0
lines changed

Sources/SwiftKafka/KafkaConfig.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,30 @@ public struct KafkaConfig: Hashable, Equatable {
4646
self.opaque = opaque
4747
}
4848

49+
/// Initialize internal `KafkaConfig` object from a ``ProducerConfig`` provided by the new API.
50+
convenience init(producerConfig: ProducerConfig) throws {
51+
self.init(
52+
pointer: rd_kafka_conf_new(),
53+
opaque: nil
54+
)
55+
56+
try producerConfig.dictionary.forEach { key, value in
57+
try self.set(value, forKey: key)
58+
}
59+
}
60+
61+
/// Initialize internal `KafkaConfig` object from a ``ConsumerConfig`` provided by the new API.
62+
convenience init(consumerConfig: ConsumerConfig) throws {
63+
self.init(
64+
pointer: rd_kafka_conf_new(),
65+
opaque: nil
66+
)
67+
68+
try consumerConfig.dictionary.forEach { key, value in
69+
try self.set(value, forKey: key)
70+
}
71+
}
72+
4973
/// Initialize internal `KafkaConfig` object with default configuration.
5074
convenience init() {
5175
self.init(
@@ -160,6 +184,16 @@ public struct KafkaConfig: Hashable, Equatable {
160184
self._internal = .init()
161185
}
162186

187+
/// Initialize a legacy ``KafkaConfig`` from a ``ProducerConfig`` provided by the new API.
188+
init(producerConfig: ProducerConfig) throws {
189+
self._internal = try .init(producerConfig: producerConfig)
190+
}
191+
192+
/// Initialize a legacy ``KafkaConfig`` from a ``ConsumerConfig`` provided by the new API.
193+
init(consumerConfig: ConsumerConfig) throws {
194+
self._internal = try .init(consumerConfig: consumerConfig)
195+
}
196+
163197
/// Retrieve value of configuration property for `key`
164198
public func value(forKey key: String) -> String? {
165199
return self._internal.value(forKey: key)

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,37 @@ public final class KafkaConsumer {
193193
)
194194
}
195195

196+
// MARK: - Initialisers with new config
197+
198+
public convenience init(
199+
topics: [String],
200+
config: ConsumerConfig = ConsumerConfig(),
201+
logger: Logger
202+
) throws {
203+
try self.init(
204+
topics: topics,
205+
groupID: config.groupID,
206+
config: KafkaConfig(consumerConfig: config),
207+
logger: logger
208+
)
209+
}
210+
211+
public convenience init(
212+
topic: String,
213+
partition: KafkaPartition,
214+
offset: Int64,
215+
config: ConsumerConfig = ConsumerConfig(),
216+
logger: Logger
217+
) throws {
218+
try self.init(
219+
topic: topic,
220+
partition: partition,
221+
offset: offset,
222+
config: KafkaConfig(consumerConfig: config),
223+
logger: logger
224+
)
225+
}
226+
196227
/// Subscribe to the given list of `topics`.
197228
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
198229
/// - Parameter topics: An array of topic names to subscribe to.

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,20 @@ public actor KafkaProducer {
136136
}
137137
}
138138

139+
// MARK: - Initialiser with new config
140+
141+
public init(
142+
config: ProducerConfig = ProducerConfig(),
143+
topicConfig: TopicConfig = TopicConfig(),
144+
logger: Logger
145+
) async throws {
146+
try await self.init(
147+
config: KafkaConfig(producerConfig: config),
148+
topicConfig: KafkaTopicConfig(topicConfig: topicConfig),
149+
logger: logger
150+
)
151+
}
152+
139153
/// Method to shutdown the ``KafkaProducer``.
140154
///
141155
/// This method flushes any buffered messages and waits until a callback is received for all of them.

Sources/SwiftKafka/KafkaTopicConfig.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ public struct KafkaTopicConfig: Hashable, Equatable {
3434
self.pointer = pointer
3535
}
3636

37+
/// Initialize internal `KafkaTopicConfig` object from a ``TopicConfig`` provided by the new API.
38+
convenience init(topicConfig: TopicConfig) throws {
39+
self.init()
40+
41+
try topicConfig.dictionary.forEach { key, value in
42+
try self.set(value, forKey: key)
43+
}
44+
}
45+
3746
deinit {
3847
rd_kafka_topic_conf_destroy(pointer)
3948
}
@@ -101,6 +110,11 @@ public struct KafkaTopicConfig: Hashable, Equatable {
101110
self._internal = .init()
102111
}
103112

113+
/// Initialize a legacy ``KafkaTopicConfig`` from a ``TopicConfig`` provided by the new API.
114+
init(topicConfig: TopicConfig) throws {
115+
self._internal = try .init(topicConfig: topicConfig)
116+
}
117+
104118
/// Retrieve value of topic configuration property for `key`
105119
public func value(forKey key: String) -> String? {
106120
return self._internal.value(forKey: key)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// Collection of `enum` types used in the configuration structs this library provides.
16+
public struct ConfigEnums {
17+
/// Available debug contexts to enable.
18+
public struct DebugOption: Hashable, Equatable, CustomStringConvertible {
19+
public let description: String
20+
21+
public static let generic = DebugOption(description: "generic")
22+
public static let broker = DebugOption(description: "broker")
23+
public static let topic = DebugOption(description: "topic")
24+
public static let metadata = DebugOption(description: "metadata")
25+
public static let feature = DebugOption(description: "feature")
26+
public static let queue = DebugOption(description: "queue")
27+
public static let msg = DebugOption(description: "msg")
28+
public static let `protocol` = DebugOption(description: "protocol")
29+
public static let cgrp = DebugOption(description: "cgrp")
30+
public static let security = DebugOption(description: "security")
31+
public static let fetch = DebugOption(description: "fetch")
32+
public static let interceptor = DebugOption(description: "interceptor")
33+
public static let plugin = DebugOption(description: "plugin")
34+
public static let consumer = DebugOption(description: "consumer")
35+
public static let admin = DebugOption(description: "admin")
36+
public static let eos = DebugOption(description: "eos")
37+
public static let all = DebugOption(description: "all")
38+
}
39+
40+
/// Available IP address families.
41+
public struct IPAddressFamily: Hashable, Equatable, CustomStringConvertible {
42+
public let description: String
43+
44+
/// Use any IP address family.
45+
public static let any = IPAddressFamily(description: "any")
46+
/// Use the IPv4 address family.
47+
public static let v4 = IPAddressFamily(description: "v4")
48+
/// Use the IPv6 address family.
49+
public static let v6 = IPAddressFamily(description: "v6")
50+
}
51+
52+
/// Protocol used to communicate with brokers.
53+
public struct SecurityProtocol: Hashable, Equatable, CustomStringConvertible {
54+
public let description: String
55+
56+
/// Send messages as plaintext (no security protocol used).
57+
public static let plaintext = SecurityProtocol(description: "plaintext")
58+
/// Use the Secure Sockets Layer (SSL) protocol.
59+
public static let ssl = SecurityProtocol(description: "ssl")
60+
/// Use the Simple Authentication and Security Layer (SASL).
61+
public static let saslPlaintext = SecurityProtocol(description: "sasl_plaintext")
62+
/// Use the Simple Authentication and Security Layer (SASL) with SSL.
63+
public static let saslSSL = SecurityProtocol(description: "sasl_ssl")
64+
}
65+
66+
/// Available SASL mechanisms that can be used for authentication.
67+
public struct SASLMechanism: Hashable, Equatable, CustomStringConvertible {
68+
public let description: String
69+
70+
/// Use the GSSAPI mechanism.
71+
public static let gssapi = SASLMechanism(description: "GSSAPI")
72+
/// Use the PLAIN mechanism.
73+
public static let plain = SASLMechanism(description: "PLAIN")
74+
/// Use the SCRAM-SHA-256 mechanism.
75+
public static let scramSHA256 = SASLMechanism(description: "SCRAM-SHA-256")
76+
/// Use the SCRAM-SHA-512 mechanism.
77+
public static let scramSHA512 = SASLMechanism(description: "SCRAM-SHA-512")
78+
/// Use the OAUTHBEARER mechanism.
79+
public static let oauthbearer = SASLMechanism(description: "OAUTHBEARER")
80+
}
81+
}

0 commit comments

Comments
 (0)