Skip to content

Commit 0dc11d6

Browse files
Refactor *Configuration types (#87)
* Refactor `*Configuration` types Modifications: * `*Configuration`: * remove initializer * replace computed properties with normal `var`s * make `dictionary` computed property that uses `self` to create a `librdkafka` compatible String-based configuration dictionary * add missing `Sendable` conformance to some types * rename `KafkaSharedConfiguration` -> `KafkaConfiguration` * replace `Ms` -> `Milliseconds` * replace `UInt` -> `Int` in `*Configuration` types * `KafkaConsumerConfiguration`: change `init` Modifications: * `KafkaConsumerConfiguration`: add `consumptionStrategy` as `init` parameter instead of providing default value (default value makes no sense here) * Update README * Align `*Config` with API Review Changes Modifications: * align `*Config` with API Review Changes * `KafkaConsumerConfiguration`: remove `enable.auto.offset.store` option (we want to handle storing offsets ourselves) * update README * Review George Motivation: Auto-generated `struct` initializers are not `public` in Swift. Modifications: * add `public init`ializers to `KafkaConfiguration.*` types * fix typo `Mac OSX` -> `macOS` * `KafkaConfiguration`: update year in copyright notice * `*Configuration` types: document default values * `soundness.sh`: make 2023 valid copyright date * Review Franz Modifications: * `KafkaConfiguration.Broker`: improve documentation * `Kafka*Configuration`: * rename `CALocation` -> `caLocation` * rename `CRLLocation` -> `crlLocation`
1 parent f69a0e7 commit 0dc11d6

11 files changed

+684
-1052
lines changed

README.md

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Finally, add `import SwiftKafka` to your source code.
2323

2424
## Usage
2525

26-
`SwiftKafka` should be used within a [`Swift Service Lifecycle`](https://github.com/swift-server/swift-service-lifecycle)
26+
`SwiftKafka` should be used within a [`Swift Service Lifecycle`](https://github.com/swift-server/swift-service-lifecycle)
2727
[`ServiceGroup`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/servicegroup) for proper startup and shutdown handling.
2828
Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/service) protocol.
2929

@@ -32,7 +32,9 @@ Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https
3232
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
3333

3434
```swift
35-
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])
35+
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
36+
var config = KafkaProducerConfiguration()
37+
config.bootstrapServers = [broker]
3638

3739
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
3840
config: config,
@@ -72,13 +74,14 @@ await withThrowingTaskGroup(of: Void.self) { group in
7274
After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence).
7375

7476
```swift
75-
let config = KafkaConsumerConfiguration(
77+
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
78+
var config = KafkaConsumerConfiguration(
7679
consumptionStrategy: .partition(
77-
topic: "topic-name",
78-
partition: KafkaPartition(rawValue: 0)
79-
),
80-
bootstrapServers: ["localhost:9092"]
80+
KafkaPartition(rawValue: 0),
81+
topic: "topic-name"
82+
)
8183
)
84+
config.bootstrapServers = [broker]
8285

8386
let consumer = try KafkaConsumer(
8487
config: config,
@@ -111,10 +114,11 @@ await withThrowingTaskGroup(of: Void.self) { group in
111114
SwiftKafka also allows users to subscribe to an array of topics as part of a consumer group.
112115

113116
```swift
114-
let config = KafkaConsumerConfiguration(
115-
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
116-
bootstrapServers: ["localhost:9092"]
117+
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
118+
var config = KafkaConsumerConfiguration(
119+
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
117120
)
121+
config.bootstrapServers = [broker]
118122

119123
let consumer = try KafkaConsumer(
120124
config: config,
@@ -147,11 +151,12 @@ await withThrowingTaskGroup(of: Void.self) { group in
147151
By default, the `KafkaConsumer` automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.
148152

149153
```swift
150-
let config = KafkaConsumerConfiguration(
151-
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
152-
enableAutoCommit: false,
153-
bootstrapServers: ["localhost:9092"]
154+
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
155+
var config = KafkaConsumerConfiguration(
156+
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
154157
)
158+
config.enableAutoCommit = false,
159+
config.bootstrapServers = [broker]
155160

156161
let consumer = try KafkaConsumer(
157162
config: config,
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// Collection of types used in the configuration structs this library provides.
16+
public enum KafkaConfiguration {
17+
/// A Kafka Broker to connect to.
18+
public struct Broker: Sendable, Hashable, CustomStringConvertible {
19+
/// The host component of the broker to connect to.
20+
public var host: String
21+
22+
/// The port to connect to.
23+
public var port: Int
24+
25+
public var description: String {
26+
"\(self.host):\(self.port)"
27+
}
28+
29+
public init(host: String, port: Int) {
30+
self.host = host
31+
self.port = port
32+
}
33+
}
34+
35+
/// Message options.
36+
public struct MessageOptions: Sendable, Hashable {
37+
/// Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's max.message.bytes limit (see Apache Kafka documentation).
38+
/// Default: `1_000_000`
39+
public var maxBytes: Int = 1_000_000
40+
41+
/// Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs.
42+
/// Default: `65535`
43+
public var copyMaxBytes: Int = 65535
44+
45+
public init(
46+
maxBytes: Int = 1_000_000,
47+
copyMaxBytes: Int = 65535
48+
) {
49+
self.maxBytes = maxBytes
50+
self.copyMaxBytes = copyMaxBytes
51+
}
52+
}
53+
54+
/// Topic metadata options.
55+
public struct TopicMetadataOptions: Sendable, Hashable {
56+
/// Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
57+
/// Default: `300_000`
58+
public var refreshIntervalMilliseconds: Int = 300_000
59+
60+
/// When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers.
61+
/// Default: `250`
62+
public var refreshFastIntervalMilliseconds: Int = 250
63+
64+
/// Sparse metadata requests (consumes less network bandwidth).
65+
/// Default: `true`
66+
public var refreshSparse: Bool = true
67+
68+
/// Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with ERR__UNKNOWN_TOPIC. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on `send()`.
69+
/// Default: `30000`
70+
public var propagationMaxMilliseconds: Int = 30000
71+
72+
public init(
73+
refreshIntervalMilliseconds: Int = 300_000,
74+
refreshFastIntervalMilliseconds: Int = 250,
75+
refreshSparse: Bool = true,
76+
propagationMaxMilliseconds: Int = 30000
77+
) {
78+
self.refreshIntervalMilliseconds = refreshIntervalMilliseconds
79+
self.refreshFastIntervalMilliseconds = refreshFastIntervalMilliseconds
80+
self.refreshSparse = refreshSparse
81+
self.propagationMaxMilliseconds = propagationMaxMilliseconds
82+
}
83+
}
84+
85+
/// Socket options.
86+
public struct SocketOptions: Sendable, Hashable {
87+
/// Default timeout for network requests. Producer: ProduceRequests will use the lesser value of socket.timeout.ms and remaining message.timeout.ms for the first message in the batch. Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms.
88+
/// Default: `60000`
89+
public var timeoutMilliseconds: Int = 60000
90+
91+
/// Broker socket send buffer size. System default is used if 0.
92+
/// Default: `0`
93+
public var sendBufferBytes: Int = 0
94+
95+
/// Broker socket receive buffer size. System default is used if 0.
96+
/// Default: `0`
97+
public var receiveBufferBytes: Int = 0
98+
99+
/// Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets.
100+
/// Default: `false`
101+
public var keepaliveEnable: Bool = false
102+
103+
/// Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.
104+
/// Default: `false`
105+
public var nagleDisable: Bool = false
106+
107+
/// Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. WARNING: It is highly recommended to leave this setting at its default value of 1 to avoid the client and broker to become desynchronized in case of request timeouts. NOTE: The connection is automatically re-established.
108+
/// Default: `1`
109+
public var maxFails: Int = 1
110+
111+
/// Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake). If the connection to the broker is not fully functional after this the connection will be closed and retried.
112+
/// Default: `30000`
113+
public var connectionSetupTimeoutMilliseconds: Int = 30000
114+
115+
public init(
116+
timeoutMilliseconds: Int = 60000,
117+
sendBufferBytes: Int = 0,
118+
receiveBufferBytes: Int = 0,
119+
keepaliveEnable: Bool = false,
120+
nagleDisable: Bool = false,
121+
maxFails: Int = 1,
122+
connectionSetupTimeoutMilliseconds: Int = 30000
123+
) {
124+
self.timeoutMilliseconds = timeoutMilliseconds
125+
self.sendBufferBytes = sendBufferBytes
126+
self.receiveBufferBytes = receiveBufferBytes
127+
self.keepaliveEnable = keepaliveEnable
128+
self.nagleDisable = nagleDisable
129+
self.maxFails = maxFails
130+
self.connectionSetupTimeoutMilliseconds = connectionSetupTimeoutMilliseconds
131+
}
132+
}
133+
134+
/// Broker options.
135+
public struct BrokerOptions: Sendable, Hashable {
136+
/// How long to cache the broker address resolving results (milliseconds).
137+
/// Default: `1000`
138+
public var addressTTL: Int = 1000
139+
140+
/// Allowed broker ``KafkaConfiguration/IPAddressFamily``.
141+
/// Default: `.any`
142+
public var addressFamily: KafkaConfiguration.IPAddressFamily = .any
143+
144+
public init(
145+
addressTTL: Int = 1000,
146+
addressFamily: KafkaConfiguration.IPAddressFamily = .any
147+
) {
148+
self.addressTTL = addressTTL
149+
self.addressFamily = addressFamily
150+
}
151+
}
152+
153+
/// Reconnect options.
154+
public struct ReconnectOptions: Sendable, Hashable {
155+
/// The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until reconnect.backoff.max.ms is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
156+
/// Default: `100`
157+
public var backoffMilliseconds: Int = 100
158+
159+
/// The maximum time to wait before reconnecting to a broker after the connection has been closed.
160+
/// Default: `10000`
161+
public var backoffMaxMilliseconds: Int = 10000
162+
163+
public init(
164+
backoffMilliseconds: Int = 100,
165+
backoffMaxMilliseconds: Int = 10000
166+
) {
167+
self.backoffMilliseconds = backoffMilliseconds
168+
self.backoffMaxMilliseconds = backoffMaxMilliseconds
169+
}
170+
}
171+
172+
/// SSL options.
173+
public struct SSLOptions: Sendable, Hashable {
174+
/// Path to client's private key (PEM) used for authentication.
175+
public var keyLocation: String = ""
176+
177+
/// Private key passphrase (for use with ssl.key.location).
178+
public var keyPassword: String = ""
179+
180+
/// Path to client's public key (PEM) used for authentication.
181+
public var certificateLocation: String = ""
182+
183+
/// File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On macOS this configuration defaults to probe. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or ssl.ca.location is set to probe a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see OPENSSLDIR in openssl version -a).
184+
public var caLocation: String = ""
185+
186+
/// Path to CRL for verifying broker's certificate validity.
187+
public var crlLocation: String = ""
188+
189+
/// Path to client's keystore (PKCS#12) used for authentication.
190+
public var keystoreLocation: String = ""
191+
192+
/// Client's keystore (PKCS#12) password.
193+
public var keystorePassword: String = ""
194+
}
195+
196+
/// SASL options.
197+
public struct SASLOptions: Sendable, Hashable {
198+
/// SASL mechanism to use for authentication.
199+
public var mechanism: KafkaConfiguration.SASLMechanism?
200+
201+
/// SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms.
202+
public var username: String?
203+
204+
/// SASL password for use with the PLAIN and SASL-SCRAM-.. mechanisms.
205+
public var password: String?
206+
}
207+
208+
// MARK: - Enum-like Option types
209+
210+
/// Available debug contexts to enable.
211+
public struct DebugOption: Sendable, Hashable, CustomStringConvertible {
212+
public let description: String
213+
214+
public static let generic = DebugOption(description: "generic")
215+
public static let broker = DebugOption(description: "broker")
216+
public static let topic = DebugOption(description: "topic")
217+
public static let metadata = DebugOption(description: "metadata")
218+
public static let feature = DebugOption(description: "feature")
219+
public static let queue = DebugOption(description: "queue")
220+
public static let msg = DebugOption(description: "msg")
221+
public static let `protocol` = DebugOption(description: "protocol")
222+
public static let cgrp = DebugOption(description: "cgrp")
223+
public static let security = DebugOption(description: "security")
224+
public static let fetch = DebugOption(description: "fetch")
225+
public static let interceptor = DebugOption(description: "interceptor")
226+
public static let plugin = DebugOption(description: "plugin")
227+
public static let consumer = DebugOption(description: "consumer")
228+
public static let admin = DebugOption(description: "admin")
229+
public static let eos = DebugOption(description: "eos")
230+
public static let all = DebugOption(description: "all")
231+
}
232+
233+
/// Available IP address families.
234+
public struct IPAddressFamily: Sendable, Hashable, CustomStringConvertible {
235+
public let description: String
236+
237+
/// Use any IP address family.
238+
public static let any = IPAddressFamily(description: "any")
239+
/// Use the IPv4 address family.
240+
public static let v4 = IPAddressFamily(description: "v4")
241+
/// Use the IPv6 address family.
242+
public static let v6 = IPAddressFamily(description: "v6")
243+
}
244+
245+
/// Protocol used to communicate with brokers.
246+
public struct SecurityProtocol: Sendable, Hashable, CustomStringConvertible {
247+
public let description: String
248+
249+
/// Send messages as plaintext (no security protocol used).
250+
public static let plaintext = SecurityProtocol(description: "plaintext")
251+
/// Use the Secure Sockets Layer (SSL) protocol.
252+
public static let ssl = SecurityProtocol(description: "ssl")
253+
/// Use the Simple Authentication and Security Layer (SASL).
254+
public static let saslPlaintext = SecurityProtocol(description: "sasl_plaintext")
255+
/// Use the Simple Authentication and Security Layer (SASL) with SSL.
256+
public static let saslSSL = SecurityProtocol(description: "sasl_ssl")
257+
}
258+
259+
/// Available SASL mechanisms that can be used for authentication.
260+
public struct SASLMechanism: Sendable, Hashable, CustomStringConvertible {
261+
public let description: String
262+
263+
/// Use the GSSAPI mechanism.
264+
public static let gssapi = SASLMechanism(description: "GSSAPI")
265+
/// Use the PLAIN mechanism.
266+
public static let plain = SASLMechanism(description: "PLAIN")
267+
/// Use the SCRAM-SHA-256 mechanism.
268+
public static let scramSHA256 = SASLMechanism(description: "SCRAM-SHA-256")
269+
/// Use the SCRAM-SHA-512 mechanism.
270+
public static let scramSHA512 = SASLMechanism(description: "SCRAM-SHA-512")
271+
/// Use the OAUTHBEARER mechanism.
272+
public static let oauthbearer = SASLMechanism(description: "OAUTHBEARER")
273+
}
274+
}

0 commit comments

Comments
 (0)