Skip to content

Commit f5bce5d

Browse files
Public API v1 (swift-server#104)
* Public API v1 Modifications: * updated public API for: * `KafkaProducer` * `KafkaProducerMessage` * `KafkaContiguousBytes` * `KafkaProducerMessageID` * `KafkaProducerConfiguration` * `KafkaDeliveryReport` * `KafkaAcknowledgedMessage` * `KafkaConsumer` * `KafkaConsumerMessage` * `KafkaConsumerConfiguration` * `KafkaTopicConfiguration` * `KafkaPartition` * `KafkaConfiguration` * `KafkaConfiguration+Security` * Renaming Franz * Compression.Level: add precondition for range * DocC: replace rdkafka config names with own names * DocC fixes * KafkaOffset type * Update commitSync docc * Remove Hashable from Kafka[Producer|Consumer|Topic]Configuration * Move specialised KafkaConfiguration extensions * Review Franz Modifications: * `KafkaBrokerAddress`: revert back to have an `init` with parameters * Review Franz Modifications: * make `bootstrapBrokerAddresses` `init` parameter of `Kafka[Producer|Consumer]Configuration` * Update license headers to new repo name * Fix CI Failure Modifications: * `Foundation.Data` is not `Sendable` in Swift `<5.9` -> import `Foundation` as `@preconcurrency` when swift `<5.9` * retroactive `Sendable` conformance for `Data` when swift `<5.9` After your change, what will change. * Rename config -> configuration * `KafkaProducerMessage`: conditional conformances Motivation: * we cannot guarantee that the types implementing `KafkaContiguousBytes` are `Sendable` etc. (e.g. `Foundation.Data` for Swift <5.9) Modifications: * `KafkaProducerMessage`: * conform to `Sendable` if `Key` and `Value` are `Sendable` * conform to `Hashable` if `Key` and `Value` are `Hashable` * conform to `Equatable` if `Key` and `Value` are `Equatable`
1 parent 5e8963e commit f5bce5d

26 files changed

+987
-572
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ The `send(_:)` method of `KafkaProducer` returns a message-id that can later be
3434
```swift
3535
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
3636
var config = KafkaProducerConfiguration()
37-
config.bootstrapServers = [broker]
37+
config.bootstrapBrokerAddresses = [broker]
3838

3939
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
4040
config: config,
@@ -86,7 +86,7 @@ var config = KafkaConsumerConfiguration(
8686
topic: "topic-name"
8787
)
8888
)
89-
config.bootstrapServers = [broker]
89+
config.bootstrapBrokerAddresses = [broker]
9090

9191
let consumer = try KafkaConsumer(
9292
config: config,
@@ -123,7 +123,7 @@ let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
123123
var config = KafkaConsumerConfiguration(
124124
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
125125
)
126-
config.bootstrapServers = [broker]
126+
config.bootstrapBrokerAddresses = [broker]
127127

128128
let consumer = try KafkaConsumer(
129129
config: config,
@@ -161,7 +161,7 @@ var config = KafkaConsumerConfiguration(
161161
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
162162
)
163163
config.enableAutoCommit = false,
164-
config.bootstrapServers = [broker]
164+
config.bootstrapBrokerAddresses = [broker]
165165

166166
let consumer = try KafkaConsumer(
167167
config: config,

Sources/Kafka/Configuration/KafkaConfiguration+Security.swift

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ extension KafkaConfiguration {
1717

1818
/// Use to configure an TLS connection.
1919
public struct TLSConfiguration: Sendable, Hashable {
20-
/// Certificate chain consisting of one leaf certificate and potenentially multiple intermediate certificates.
20+
/// Certificate chain consisting of one leaf certificate and potentially multiple intermediate certificates.
2121
/// The public key of the leaf certificate will be used for authentication.
2222
public struct LeafAndIntermediates: Sendable, Hashable {
2323
internal enum _Key: Sendable, Hashable {
@@ -27,7 +27,7 @@ extension KafkaConfiguration {
2727

2828
let _internal: _Key
2929

30-
/// Read certificate chain from file.
30+
/// Read certificate chain from a file.
3131
public static func file(location: String) -> LeafAndIntermediates {
3232
return LeafAndIntermediates(
3333
_internal: .file(location: location)
@@ -42,32 +42,32 @@ extension KafkaConfiguration {
4242
}
4343
}
4444

45-
public struct RootCertificate: Sendable, Hashable {
46-
internal enum _RootCertificate: Sendable, Hashable {
45+
public struct Root: Sendable, Hashable {
46+
internal enum _Root: Sendable, Hashable {
4747
case probe
4848
case disableBrokerVerification
4949
case file(location: String)
5050
case pem(String)
5151
}
5252

53-
let _internal: _RootCertificate
53+
let _internal: _Root
5454

5555
/// A list of standard paths will be probed and the first one found will be used as the default root certificate location path.
56-
public static let probe = RootCertificate(_internal: .probe)
56+
public static let probe = Root(_internal: .probe)
5757

58-
/// Disable OpenSSL's builtin broker (server) certificate verification.
59-
public static let disableBrokerVerification = RootCertificate(_internal: .disableBrokerVerification)
58+
/// Disable OpenSSL's built-in broker (server) certificate verification.
59+
public static let disableBrokerVerification = Root(_internal: .disableBrokerVerification)
6060

6161
/// File or directory path to root certificate(s) for verifying the broker's key.
62-
public static func file(location: String) -> RootCertificate {
63-
return RootCertificate(
62+
public static func file(location: String) -> Root {
63+
return Root(
6464
_internal: .file(location: location)
6565
)
6666
}
6767

6868
/// Root certificate String for verifying the broker's key.
69-
public static func pem(_ pem: String) -> RootCertificate {
70-
return RootCertificate(
69+
public static func pem(_ pem: String) -> Root {
70+
return Root(
7171
_internal: .pem(pem)
7272
)
7373
}
@@ -126,12 +126,12 @@ extension KafkaConfiguration {
126126
case keyPair(
127127
privateKey: PrivateKey,
128128
publicKeyCertificate: LeafAndIntermediates,
129-
caCertificate: RootCertificate,
129+
caCertificate: Root,
130130
crlLocation: String?
131131
)
132132
case keyStore(
133133
keyStore: KeyStore,
134-
caCertificate: RootCertificate,
134+
caCertificate: Root,
135135
crlLocation: String?
136136
)
137137
}
@@ -145,11 +145,11 @@ extension KafkaConfiguration {
145145
/// - privateKey: The client's private key (PEM) used for authentication.
146146
/// - publicKeyCertificate: The client's public key (PEM) used for authentication.
147147
/// - caCertificate: File or directory path to CA certificate(s) for verifying the broker's key.
148-
/// - crlocation: Path to CRL for verifying broker's certificate validity.
148+
/// - crLocation: Path to CRL for verifying broker's certificate validity.
149149
public static func keyPair(
150150
privateKey: PrivateKey,
151151
publicKeyCertificate: LeafAndIntermediates,
152-
caCertificate: RootCertificate = .probe,
152+
caCertificate: Root = .probe,
153153
crlLocation: String?
154154
) -> TLSConfiguration {
155155
return TLSConfiguration(
@@ -167,10 +167,10 @@ extension KafkaConfiguration {
167167
///
168168
/// - keyStore: The client's keystore (PKCS#12) used for authentication.
169169
/// - caCertificate: File or directory path to CA certificate(s) for verifying the broker's key.
170-
/// - crlocation: Path to CRL for verifying broker's certificate validity.
170+
/// - crlLocation: Path to CRL for verifying broker's certificate validity.
171171
public static func keyStore(
172172
keyStore: KeyStore,
173-
caCertificate: RootCertificate = .probe,
173+
caCertificate: Root = .probe,
174174
crlLocation: String?
175175
) -> TLSConfiguration {
176176
return TLSConfiguration(
@@ -240,27 +240,51 @@ extension KafkaConfiguration {
240240
public struct SASLMechanism: Sendable, Hashable {
241241
/// Used to configure Kerberos.
242242
public struct KerberosConfiguration: Sendable, Hashable {
243-
/// Kerberos p rincipal name that Kafka runs as, not including `/hostname@REALM`.
243+
/// Kerberos principal name that Kafka runs as, not including `/hostname@REALM`.
244244
/// Default: `"kafka"`
245245
public var serviceName: String = "kafka"
246246
/// This client's Kerberos principal name. (Not supported on Windows, will use the logon user's principal).
247247
/// Default: `"kafkaclient"`
248248
public var principal: String = "kafkaclient"
249249
/// Shell command to refresh or acquire the client's Kerberos ticket.
250-
/// This command is executed on client creation and every sasl.kerberos.min.time.before.relogin (0=disable).
250+
/// This command is executed on client creation and every ``KafkaConfiguration/SASLMechanism/KerberosConfiguration/minTimeBeforeRelogin``.
251251
/// %{config.prop.name} is replaced by corresponding config object value.
252252
/// Default: `kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}"`.
253253
public var kinitCommand: String = """
254254
kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || \
255255
kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}"
256256
"""
257257
/// Path to Kerberos keytab file.
258-
/// This configuration property is only used as a variable in sasl.kerberos.kinit.cmd as ... -t "%{sasl.kerberos.keytab}".
258+
/// This configuration property is only used as a variable in ``KafkaConfiguration/SASLMechanism/KerberosConfiguration/kinitCommand``
259+
/// as ... -t "%{sasl.kerberos.keytab}".
259260
public var keytab: String
260-
/// Minimum time in milliseconds between key refresh attempts.
261+
262+
/// Minimum time between key refresh attempts.
263+
public struct KeyRefreshAttempts: Sendable, Hashable {
264+
internal let rawValue: UInt
265+
266+
private init(rawValue: UInt) {
267+
self.rawValue = rawValue
268+
}
269+
270+
/// (Lowest granularity is milliseconds)
271+
public static func value(_ value: Duration) -> KeyRefreshAttempts {
272+
precondition(
273+
value.canBeRepresentedAsMilliseconds,
274+
"Lowest granularity is milliseconds"
275+
)
276+
return .init(rawValue: UInt(value.inMilliseconds))
277+
}
278+
279+
/// Disable automatic key refresh by setting this property.
280+
public static let disable: KeyRefreshAttempts = .init(rawValue: 0)
281+
}
282+
283+
/// Minimum time in between key refresh attempts.
261284
/// Disable automatic key refresh by setting this property to 0.
262-
/// Default: `60000`
263-
public var minTimeBeforeRelogin: Int = 60000
285+
/// (Lowest granularity is milliseconds)
286+
/// Default: `.value(.milliseconds(60000))`
287+
public var minTimeBeforeRelogin: KeyRefreshAttempts = .value(.milliseconds(60000))
264288

265289
public init(keytab: String) {
266290
self.keytab = keytab
@@ -292,7 +316,7 @@ extension KafkaConfiguration {
292316
/// The format is implementation-dependent and must be parsed accordingly.
293317
/// The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds.
294318
/// The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600.
295-
/// The scope value is CSV format with the default value being no/empty scope.
319+
/// The scope value is in CSV format with the default value being no/empty scope.
296320
/// For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`.
297321
/// In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`.
298322
/// For example: `principal=admin extension_traceId=123`
@@ -307,16 +331,16 @@ extension KafkaConfiguration {
307331
/// - configuration: SASL/OAUTHBEARER configuration.
308332
/// The format is implementation-dependent and must be parsed accordingly.
309333
/// The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds.
310-
/// The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600.
311-
/// The scope value is CSV format with the default value being no/empty scope.
334+
/// The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600.
335+
/// The scope value is in CSV format with the default value being no/empty scope.
312336
/// For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`.
313337
/// In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`.
314338
/// For example: `principal=admin extension_traceId=123`
315339
/// - clientID: Public identifier for the application. Must be unique across all clients that the authorization server handles.
316340
/// - clientSecret: Client secret only known to the application and the authorization server.
317341
/// This should be a sufficiently random string that is not guessable.
318342
/// - tokenEndPointURL: OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token.
319-
/// - scope: Client use this to specify the scope of the access request to the broker.
343+
/// - scope: The client uses this to specify the scope of the access request to the broker.
320344
/// - extensions: Allow additional information to be provided to the broker.
321345
/// Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".
322346
public static func oidc(
@@ -397,7 +421,7 @@ extension KafkaConfiguration {
397421
resultDict["sasl.kerberos.principal"] = kerberosConfiguration.principal
398422
resultDict["sasl.kerberos.kinit.cmd"] = kerberosConfiguration.kinitCommand
399423
resultDict["sasl.kerberos.keytab"] = kerberosConfiguration.keytab
400-
resultDict["sasl.kerberos.min.time.before.relogin"] = String(kerberosConfiguration.minTimeBeforeRelogin)
424+
resultDict["sasl.kerberos.min.time.before.relogin"] = String(kerberosConfiguration.minTimeBeforeRelogin.rawValue)
401425
case .plain(let username, let password):
402426
resultDict["sasl.mechanism"] = "PLAIN"
403427
resultDict["sasl.username"] = username

0 commit comments

Comments
 (0)