Skip to content

Commit dc4c623

Browse files
KafkaProducer with sendAsync method (#21)
* KafkaProducer sendAsync method Modifications: * specified minimum OS versions in Package.swift to support new Swift concurrency * made it possible to test the KafkaProducer inside of Docker * KafkaProducerMessage * KafkaConsumerMessage * KafkaProducer with sendAsync method * initialize KafkaError from rd_kafka_resp_err_t Result: The user is able to produce messages to the Kafka ecosystem and get a delivery report via callback * Isolate topicHandles and messageToCallBack using actors Motivation: For concurrently running producers, we want to ensure that no race conditions occur when accessing the topic handle cache or the callback storage. Modifications: * replaced not-threadsafe dictionary messageToCallBack with actor * replaced not-threadsafe dictionary topicHandles with actor KafkaTopicHandles * refactored KafkaProducerTests * added flush to KafkaProducer deinit * Refactoring async KafkaProducer Modifications: * refactored the `KafkaTopicHandles` `actor` into the `KafkaProducer` `actor` * deleted the `KafkaProducerCallbacks` `actor` in favor of the `KafkaProducer` `actor` * added a `createDuplicatePointer` method to `KafkaConfig` and `KafkaTopicConfig` that enables us to pass configs to rd_kafka_new without losing the object in memory * expose an `AsyncStream` `acknowledgements` in `KafkaProducer` that enables the user to listen for acknowledgements * Bug Fix: Implemented copy-on-write for KafkaConfig.setMessageCallback * Update comments * Refactoring KafkaConsumerMessage Modifications: * made swift-nio a dependency * replaced unsafe assumingMemoryBound method calls * renamed KafkaConsumerMessage to KafkaAckedMessage * changed KafkaConsumerMessage's underlyign raw data type from ContiguousBytes to NIOCore:ByteBuffer * removed keyString and valueString computed properties from KafkaConsumerMessage aka KafkaAckedMessage * Refactor pass of KafkaProducer pointer to message callback Modifications: * gave KafkaAckedMessage an optional id property * made KafkaAckedMessage conform to Hashable, Equatable * made KafkaAckedMessage non-throwing and introduced an optional initializer instead * replaced KafkaProducer message UUIDs with simple counter * made it possible to pass an opaque pointer to a KafkaConfig that can then be accessed from delivery report callbacks * updated KafkaProducerTests * refactored deliverReportCallback * renamed KafkaProducer:close to KafkaProducer:shutdownGracefully * New AsyncSequence implementation for KafkaProducer Modifications: * changed the swift-nio dependency to branch main * expose a AcknowledgedMessagesAsyncSequence in KafkaProducer that is backed by NIOAsyncSequenceProducer * Made KafkaError non-public and ran swiftformat * Replaced ContiguousBytes with ByteBuffer in KafkaProducerMessage * sendAsync(): return message identifier * Changes from Franz Modifications: * removed TODOs * renamed setCallBackOpaque -> setOpaque * renamed KafkaAckedMessage -> KafkaAcknowledgedMessage * improved DocC comments * wrap NIOAsyncSequenceProducer.Iterator instead of exposing it itself * added state to KafkaProducer to avoid that a shut down producer is used * replace ByteBuffer.getBytes with readBytes in KafkaProducer * Changes Franz 2 Modifications: * refactored KafkaProducer.acknowledgedMessages underlying NIOAsyncSequenceProducer * refactored KafkaProducer's state management * made KafkaAcknowledgedMessage's initializer non-optional and throwing * return a Result type in KafkaProducer.acknowledgedMessages sequence instead of messages and silent errors * introduced KafkaAcknowledgedMessageError type * executed SwiftFormat * Added missing error Handling clause in producer callback * Changes Franz 3 * Changes Cory * Changes Cory 2 * Hide callbackOpaque memory management in KafkaConfig * Make setCallbackOpaque generic * Simplified Swift Closure to C Closure conversion * Make pointer property of _Internal config classes private * Changes Franz 4
1 parent a646b37 commit dc4c623

14 files changed

+756
-54
lines changed

Package.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,28 @@ import PackageDescription
1717

1818
let package = Package(
1919
name: "swift-kafka-gsoc",
20+
platforms: [
21+
.macOS(.v10_15),
22+
.iOS(.v13),
23+
.watchOS(.v6),
24+
.tvOS(.v13),
25+
],
2026
products: [
2127
.library(
2228
name: "SwiftKafka",
2329
targets: ["SwiftKafka"]
2430
),
2531
],
2632
dependencies: [
33+
.package(url: "https://github.com/apple/swift-nio.git", branch: "main"),
2734
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
2835
],
2936
targets: [
3037
.target(
3138
name: "SwiftKafka",
3239
dependencies: [
3340
"Crdkafka",
41+
.product(name: "NIOCore", package: "swift-nio"),
3442
.product(name: "Logging", package: "swift-log"),
3543
]
3644
),
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
import NIOCore
17+
18+
/// A message produced by the client and acknowledged by the Kafka cluster.
19+
public struct KafkaAcknowledgedMessage: Hashable {
20+
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
21+
/// The same identifier is returned by ``KafkaProducer/sendAsync(message:)`` and can be used to correlate
22+
/// a sent message and an acknowledged message.
23+
public var id: UInt
24+
/// The topic that the message was sent to.
25+
public var topic: String
26+
/// The partition that the message was sent to.
27+
public var partition: Int32
28+
/// The key of the message.
29+
public var key: ByteBuffer?
30+
/// The body of the message.
31+
public var value: ByteBuffer
32+
/// The offset of the message in its partition.
33+
public var offset: Int64
34+
35+
/// Initialize `KafkaAckedMessage` from `rd_kafka_message_t` pointer.
36+
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: UInt) throws {
37+
self.id = id
38+
39+
let rdKafkaMessage = messagePointer.pointee
40+
41+
let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)
42+
self.value = ByteBuffer(bytes: valueBufferPointer)
43+
44+
guard rdKafkaMessage.err.rawValue == 0 else {
45+
var errorStringBuffer = self.value
46+
let errorString = errorStringBuffer.readString(length: errorStringBuffer.readableBytes)
47+
48+
throw KafkaAcknowledgedMessageError(
49+
rawValue: rdKafkaMessage.err.rawValue,
50+
description: errorString,
51+
messageID: self.id
52+
)
53+
}
54+
55+
guard let topic = String(validatingUTF8: rd_kafka_topic_name(rdKafkaMessage.rkt)) else {
56+
fatalError("Received topic name that is non-valid UTF-8")
57+
}
58+
self.topic = topic
59+
60+
self.partition = rdKafkaMessage.partition
61+
62+
if let keyPointer = rdKafkaMessage.key {
63+
let keyBufferPointer = UnsafeRawBufferPointer(
64+
start: keyPointer,
65+
count: rdKafkaMessage.key_len
66+
)
67+
self.key = .init(bytes: keyBufferPointer)
68+
} else {
69+
self.key = nil
70+
}
71+
72+
self.offset = Int64(rdKafkaMessage.offset)
73+
}
74+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// Error caused by the Kafka cluster when trying to process a message produced by ``KafkaProducer``.
16+
public struct KafkaAcknowledgedMessageError: Error {
17+
/// A raw value representing the error code.
18+
public var rawValue: Int32
19+
/// A string describing the error.
20+
public var description: String?
21+
/// Identifier of the message that caused the error.
22+
public var messageID: UInt
23+
}

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ final class KafkaClient {
2424
/// A logger.
2525
private let logger: Logger
2626

27-
/// A client is either a `.producer` or a `.consumer`
27+
/// A client is either a `.producer` or a `.consumer`.
2828
private let clientType: rd_kafka_type_t
29-
/// The configuration object of the client
29+
/// The configuration object of the client.
3030
private let config: KafkaConfig
31-
/// Handle for the C library's Kafka instance
31+
/// Handle for the C library's Kafka instance.
3232
private let kafkaHandle: OpaquePointer
3333

34-
/// Determines if client is a producer or a consumer
34+
/// Determines if client is a producer or a consumer.
3535
enum `Type` {
3636
case producer
3737
case consumer
@@ -45,19 +45,34 @@ final class KafkaClient {
4545
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
4646
defer { errorChars.deallocate() }
4747

48-
guard let handle = rd_kafka_new(
49-
clientType,
50-
config.pointer,
51-
errorChars,
52-
KafkaClient.stringSize
53-
) else {
54-
let errorString = String(cString: errorChars)
55-
throw KafkaError(description: errorString)
48+
self.kafkaHandle = try self.config.withDuplicatePointer { [clientType] duplicateConfig in
49+
// Duplicate because rd_kafka_new takes ownership of the pointer and frees it upon success.
50+
guard let handle = rd_kafka_new(
51+
clientType,
52+
duplicateConfig,
53+
errorChars,
54+
KafkaClient.stringSize
55+
) else {
56+
// rd_kafka_new only frees the duplicate pointer upon success.
57+
rd_kafka_conf_destroy(duplicateConfig)
58+
59+
let errorString = String(cString: errorChars)
60+
throw KafkaError(description: errorString)
61+
}
62+
63+
return handle
5664
}
57-
self.kafkaHandle = handle
5865
}
5966

6067
deinit {
6168
rd_kafka_destroy(kafkaHandle)
6269
}
70+
71+
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
72+
/// - Warning: Do not escape the pointer from the closure for later use.
73+
/// - Parameter body: The closure will use the Kafka handle pointer.
74+
@discardableResult
75+
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
76+
return try body(self.kafkaHandle)
77+
}
6378
}

Sources/SwiftKafka/KafkaConfig.swift

Lines changed: 92 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,37 @@ import Crdkafka
2121
/// For more information on how to configure Kafka, see
2222
/// [all available configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
2323
public struct KafkaConfig: Hashable, Equatable {
24-
private final class _Internal: Hashable, Equatable {
25-
/// Pointer to the `rd_kafka_conf_t` object managed by `librdkafka`
26-
private(set) var pointer: OpaquePointer
24+
private final class CapturedClosure {
25+
typealias Closure = (UnsafePointer<rd_kafka_message_t>?) -> Void
26+
let closure: Closure
2727

28-
/// Initialize internal `KafkaConfig` object with default configuration
29-
init() {
30-
self.pointer = rd_kafka_conf_new()
28+
init(_ closure: @escaping Closure) {
29+
self.closure = closure
3130
}
31+
}
32+
33+
private final class _Internal: Hashable, Equatable {
34+
/// Pointer to the `rd_kafka_conf_t` object managed by `librdkafka`.
35+
private var pointer: OpaquePointer
36+
37+
/// References the opaque object passed to the config to ensure ARC retains it as long as the config exists.
38+
private var opaque: CapturedClosure?
3239

33-
/// Initialize internal `KafkaConfig` object through a given `rd_kafka_conf_t` pointer
34-
init(pointer: OpaquePointer) {
40+
/// Initialize internal `KafkaConfig` object through a given `rd_kafka_conf_t` pointer.
41+
init(
42+
pointer: OpaquePointer,
43+
opaque: CapturedClosure?
44+
) {
3545
self.pointer = pointer
46+
self.opaque = opaque
47+
}
48+
49+
/// Initialize internal `KafkaConfig` object with default configuration.
50+
convenience init() {
51+
self.init(
52+
pointer: rd_kafka_conf_new(),
53+
opaque: nil
54+
)
3655
}
3756

3857
deinit {
@@ -75,9 +94,51 @@ public struct KafkaConfig: Hashable, Equatable {
7594
}
7695
}
7796

97+
func setDeliveryReportCallback(
98+
callback: @escaping (UnsafePointer<rd_kafka_message_t>?) -> Void
99+
) {
100+
let capturedClosure = CapturedClosure(callback)
101+
102+
// Pass the captured closure to the C closure as an opaque object
103+
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
104+
rd_kafka_conf_set_opaque(
105+
self.pointer,
106+
opaquePointer
107+
)
108+
109+
// Create a C closure that calls the captured closure
110+
let callbackWrapper: (
111+
@convention(c) (OpaquePointer?, UnsafePointer<rd_kafka_message_t>?, UnsafeMutableRawPointer?) -> Void
112+
) = { _, messagePointer, opaquePointer in
113+
114+
guard let opaquePointer = opaquePointer else {
115+
fatalError("Could not resolve reference to KafkaProducer instance")
116+
}
117+
let opaque = Unmanaged<CapturedClosure>.fromOpaque(opaquePointer).takeUnretainedValue()
118+
119+
let actualCallback = opaque.closure
120+
actualCallback(messagePointer)
121+
}
122+
123+
rd_kafka_conf_set_dr_msg_cb(
124+
self.pointer,
125+
callbackWrapper
126+
)
127+
128+
// Retain captured closure in this config
129+
// This shall only happen after rd_kafka_conf_set_dr_msg_cb to avoid potential race-conditions
130+
self.opaque = capturedClosure
131+
}
132+
133+
func createDuplicatePointer() -> OpaquePointer {
134+
rd_kafka_conf_dup(self.pointer)
135+
}
136+
78137
func createDuplicate() -> _Internal {
79-
let duplicatePointer: OpaquePointer = rd_kafka_conf_dup(self.pointer)
80-
return .init(pointer: duplicatePointer)
138+
return .init(
139+
pointer: self.createDuplicatePointer(),
140+
opaque: self.opaque
141+
)
81142
}
82143

83144
// MARK: Hashable
@@ -99,10 +160,6 @@ public struct KafkaConfig: Hashable, Equatable {
99160
self._internal = .init()
100161
}
101162

102-
var pointer: OpaquePointer {
103-
return self._internal.pointer
104-
}
105-
106163
/// Retrieve value of configuration property for `key`
107164
public func value(forKey key: String) -> String? {
108165
return self._internal.value(forKey: key)
@@ -117,4 +174,25 @@ public struct KafkaConfig: Hashable, Equatable {
117174

118175
try self._internal.set(value, forKey: key)
119176
}
177+
178+
/// Define a function that is called upon every message acknowledgement.
179+
/// - Parameter callback: A closure that is invoked upon message acknowledgement.
180+
mutating func setDeliveryReportCallback(
181+
callback: @escaping (UnsafePointer<rd_kafka_message_t>?) -> Void
182+
) {
183+
// Copy-on-write mechanism
184+
if !isKnownUniquelyReferenced(&(self._internal)) {
185+
self._internal = self._internal.createDuplicate()
186+
}
187+
188+
self._internal.setDeliveryReportCallback(callback: callback)
189+
}
190+
191+
/// Create a duplicate configuration object in memory and access it through a scoped accessor.
192+
/// - Warning: Do not escape the pointer from the closure for later use.
193+
/// - Parameter body: The closure will use the `OpaquePointer` to the duplicate `rd_kafka_conf_t` object in memory.
194+
@discardableResult
195+
func withDuplicatePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
196+
return try body(self._internal.createDuplicatePointer())
197+
}
120198
}

Sources/SwiftKafka/KafkaError.swift

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,25 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
public struct KafkaError: Error {
15+
import Crdkafka
16+
17+
struct KafkaError: Error {
1618
// Preliminary Implementation
17-
public let description: String
19+
var rawValue: Int32
20+
var description: String
21+
22+
init(rawValue: Int32) {
23+
self.rawValue = rawValue
24+
self.description = "" // TODO: https://github.com/swift-server/swift-kafka-gsoc/issues/4
25+
}
26+
27+
init(description: String) {
28+
self.rawValue = -1
29+
self.description = description
30+
}
31+
32+
init(error: rd_kafka_resp_err_t) {
33+
self.rawValue = error.rawValue
34+
self.description = String(cString: rd_kafka_err2str(error))
35+
}
1836
}

Sources/SwiftKafka/KafkaPartition.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Crdkafka
1616

1717
/// Type for representing the number of a Kafka Partition.
1818
public struct KafkaPartition: RawRepresentable {
19-
public let rawValue: Int32
19+
public var rawValue: Int32
2020

2121
public init(rawValue: Int32) {
2222
self.rawValue = rawValue

0 commit comments

Comments
 (0)