@@ -16,27 +16,15 @@ import Crdkafka
16
16
import Logging
17
17
import NIOCore
18
18
19
- /// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
20
- struct NoBackPressure : NIOAsyncSequenceProducerBackPressureStrategy {
21
- func didYield( bufferDepth: Int ) -> Bool { true }
22
- func didConsume( bufferDepth: Int ) -> Bool { true }
23
- }
24
-
25
- /// `NIOAsyncSequenceProducerDelegate` that does nothing.
26
- struct NoDelegate : NIOAsyncSequenceProducerDelegate {
27
- func produceMore( ) { }
28
- func didTerminate( ) { }
29
- }
30
-
31
19
/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
32
- public struct AcknowledgedMessagesAsyncSequence : AsyncSequence {
20
+ public struct KafkaMessageAcknowledgements : AsyncSequence {
33
21
public typealias Element = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
34
- typealias WrappedSequence = NIOAsyncSequenceProducer < Element , NoBackPressure , NoDelegate >
22
+ typealias WrappedSequence = AsyncStream < Element >
35
23
let wrappedSequence : WrappedSequence
36
24
37
25
/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
38
26
public struct AcknowledgedMessagesAsyncIterator : AsyncIteratorProtocol {
39
- let wrappedIterator : NIOAsyncSequenceProducer < Element , NoBackPressure , NoDelegate > . AsyncIterator
27
+ var wrappedIterator : AsyncStream < Element > . AsyncIterator
40
28
41
29
public mutating func next( ) async -> Element ? {
42
30
await self . wrappedIterator. next ( )
@@ -77,65 +65,110 @@ public actor KafkaProducer {
77
65
/// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
78
66
private var topicHandles : [ String : OpaquePointer ]
79
67
80
- // We use implicitly unwrapped optionals here as these properties need to access self upon initialization
81
68
/// Used for handling the connection to the Kafka cluster.
82
- private var client : KafkaClient !
83
- /// Task that polls the Kafka cluster for updates periodically.
84
- private var pollTask : Task < Void , Never > !
85
-
86
- /// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
87
- /// acknowledged by the Kafka cluster.
88
- public nonisolated let acknowledgements : AcknowledgedMessagesAsyncSequence
89
- nonisolated let acknowlegdementsSource : AcknowledgedMessagesAsyncSequence . WrappedSequence . Source
90
- private typealias Acknowledgement = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
69
+ private let client : KafkaClient
91
70
71
+ // Private initializer, use factory methods to create KafkaProducer
92
72
/// Initialize a new ``KafkaProducer``.
93
73
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
94
74
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
95
75
/// - Parameter logger: A logger.
96
- /// - Throws: A ``KafkaError`` if the received message is an error message or malformed .
97
- public init (
98
- config : KafkaProducerConfig = KafkaProducerConfig ( ) ,
99
- topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
76
+ /// - Throws: A ``KafkaError`` if initializing the producer failed .
77
+ private init (
78
+ client : KafkaClient ,
79
+ topicConfig: KafkaTopicConfig ,
100
80
logger: Logger
101
81
) async throws {
82
+ self . client = client
102
83
self . topicConfig = topicConfig
103
- self . logger = logger
104
84
self . topicHandles = [ : ]
85
+ self . logger = logger
105
86
self . state = . started
87
+ }
106
88
107
- // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
108
- // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
109
- // The source MUST be held by the caller and used to signal new elements or finish.
110
- // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
111
- // This is due to the fact that deiniting the sequence is used as part of a trigger to
112
- // terminate the underlying source.
113
- let acknowledgementsSourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
114
- elementType: Acknowledgement . self,
115
- backPressureStrategy: NoBackPressure ( ) ,
116
- delegate: NoDelegate ( )
89
+ /// Initialize a new ``KafkaProducer``.
90
+ ///
91
+ /// This factory method creates a producer without message acknowledgements.
92
+ ///
93
+ /// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
94
+ /// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
95
+ /// - Parameter logger: A logger.
96
+ /// - Returns: The newly created ``KafkaProducer``.
97
+ /// - Throws: A ``KafkaError`` if initializing the producer failed.
98
+ public static func makeProducer(
99
+ config: KafkaProducerConfig = KafkaProducerConfig ( ) ,
100
+ topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
101
+ logger: Logger
102
+ ) async throws -> KafkaProducer {
103
+ let client = try RDKafka . createClient (
104
+ type: . producer,
105
+ configDictionary: config. dictionary,
106
+ // Having no callback will discard any incoming acknowledgement messages
107
+ // Ref: rdkafka_broker.c:rd_kafka_dr_msgq
108
+ callback: nil ,
109
+ logger: logger
117
110
)
118
- self . acknowlegdementsSource = acknowledgementsSourceAndSequence. source
119
- self . acknowledgements = AcknowledgedMessagesAsyncSequence (
120
- wrappedSequence: acknowledgementsSourceAndSequence. sequence
111
+
112
+ let producer = try await KafkaProducer (
113
+ client: client,
114
+ topicConfig: topicConfig,
115
+ logger: logger
121
116
)
122
117
123
- self . client = try RDKafka . createClient (
118
+ return producer
119
+ }
120
+
121
+ /// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
122
+ ///
123
+ /// Use the asynchronous sequence to consume message acknowledgements.
124
+ ///
125
+ /// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
126
+ ///
127
+ /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
128
+ /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
129
+ /// - Parameter logger: A logger.
130
+ /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
131
+ /// `AsyncSequence` used for receiving message acknowledgements.
132
+ /// - Throws: A ``KafkaError`` if initializing the producer failed.
133
+ public static func makeProducerWithAcknowledgements(
134
+ config: KafkaProducerConfig = KafkaProducerConfig ( ) ,
135
+ topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
136
+ logger: Logger
137
+ ) async throws -> ( KafkaProducer , KafkaMessageAcknowledgements ) {
138
+ var streamContinuation : AsyncStream < Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > > . Continuation ?
139
+ let stream = AsyncStream { continuation in
140
+ streamContinuation = continuation
141
+ }
142
+
143
+ let client = try RDKafka . createClient (
124
144
type: . producer,
125
145
configDictionary: config. dictionary,
126
- callback: self . deliveryReportCallback,
127
- logger: self . logger
146
+ callback: { [ logger, streamContinuation] messageResult in
147
+ guard let messageResult else {
148
+ logger. error ( " Could not resolve acknowledged message " )
149
+ return
150
+ }
151
+
152
+ // Ignore YieldResult as we don't support back pressure in KafkaProducer
153
+ streamContinuation? . yield ( messageResult)
154
+ } ,
155
+ logger: logger
128
156
)
129
157
130
- // Poll Kafka every millisecond
131
- self . pollTask = Task { [ client] in
132
- while !Task. isCancelled {
133
- client? . withKafkaHandlePointer { handle in
134
- rd_kafka_poll ( handle, 0 )
135
- }
136
- try ? await Task . sleep ( nanoseconds: 1_000_000 )
158
+ let producer = try await KafkaProducer (
159
+ client: client,
160
+ topicConfig: topicConfig,
161
+ logger: logger
162
+ )
163
+
164
+ streamContinuation? . onTermination = { [ producer] _ in
165
+ Task {
166
+ await producer. shutdownGracefully ( )
137
167
}
138
168
}
169
+
170
+ let acknowlegementsSequence = KafkaMessageAcknowledgements ( wrappedSequence: stream)
171
+ return ( producer, acknowlegementsSequence)
139
172
}
140
173
141
174
/// Method to shutdown the ``KafkaProducer``.
@@ -155,7 +188,7 @@ public actor KafkaProducer {
155
188
156
189
private func _shutDownGracefully( timeout: Int32 ) async {
157
190
await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
158
- // Wait 10 seconds for outstanding messages to be sent and callbacks to be called
191
+ // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
159
192
self . client. withKafkaHandlePointer { handle in
160
193
rd_kafka_flush ( handle, timeout)
161
194
continuation. resume ( )
@@ -165,11 +198,22 @@ public actor KafkaProducer {
165
198
for (_, topicHandle) in self . topicHandles {
166
199
rd_kafka_topic_destroy ( topicHandle)
167
200
}
168
- self . pollTask. cancel ( )
169
201
170
202
self . state = . shutDown
171
203
}
172
204
205
+ /// Start polling Kafka for acknowledged messages.
206
+ ///
207
+ /// - Parameter pollInterval: The desired time interval between two consecutive polls.
208
+ /// - Returns: An awaitable task representing the execution of the poll loop.
209
+ public func run( pollInterval: Duration = . milliseconds( 100 ) ) async throws {
210
+ // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
211
+ while self . state == . started {
212
+ self . client. poll ( timeout: 0 )
213
+ try await Task . sleep ( for: pollInterval)
214
+ }
215
+ }
216
+
173
217
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
174
218
/// This function is non-blocking.
175
219
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
@@ -220,29 +264,6 @@ public actor KafkaProducer {
220
264
return self . messageIDCounter
221
265
}
222
266
223
- // Closure that is executed when a message has been acknowledged by Kafka
224
- private lazy var deliveryReportCallback : ( UnsafePointer < rd_kafka_message_t > ? ) -> Void = { [ logger, acknowlegdementsSource] messagePointer in
225
- guard let messagePointer = messagePointer else {
226
- logger. error ( " Could not resolve acknowledged message " )
227
- return
228
- }
229
-
230
- let messageID = UInt ( bitPattern: messagePointer. pointee. _private)
231
-
232
- do {
233
- let message = try KafkaAcknowledgedMessage ( messagePointer: messagePointer, id: messageID)
234
- _ = acknowlegdementsSource. yield ( . success( message) )
235
- } catch {
236
- guard let error = error as? KafkaAcknowledgedMessageError else {
237
- fatalError ( " Caught error that is not of type \( KafkaAcknowledgedMessageError . self) " )
238
- }
239
- _ = acknowlegdementsSource. yield ( . failure( error) )
240
- }
241
-
242
- // The messagePointer is automatically destroyed by librdkafka
243
- // For safety reasons, we only use it inside of this closure
244
- }
245
-
246
267
/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
247
268
/// - Parameter topic: The name of the topic that is addressed.
248
269
private func createTopicHandleIfNeeded( topic: String ) throws -> OpaquePointer ? {
0 commit comments