@@ -18,13 +18,37 @@ import NIOConcurrencyHelpers
18
18
import NIOCore
19
19
import ServiceLifecycle
20
20
21
+ // MARK: - KafkaProducerCloseOnTerminate
22
+
23
+ /// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
24
+ /// `didTerminate()` is invoked.
25
+ internal struct KafkaProducerCloseOnTerminate : Sendable {
26
+ let stateMachine : NIOLockedValueBox < KafkaProducer . StateMachine >
27
+ }
28
+
29
+ extension KafkaProducerCloseOnTerminate : NIOAsyncSequenceProducerDelegate {
30
+ func produceMore( ) {
31
+ return // No back pressure
32
+ }
33
+
34
+ func didTerminate( ) {
35
+ let action = self . stateMachine. withLockedValue { $0. stopConsuming ( ) }
36
+ switch action {
37
+ case . finishSource( let source) :
38
+ source? . finish ( )
39
+ case . none:
40
+ break
41
+ }
42
+ }
43
+ }
44
+
21
45
// MARK: - KafkaMessageAcknowledgements
22
46
23
47
/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
24
48
public struct KafkaMessageAcknowledgements : AsyncSequence {
25
49
public typealias Element = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
26
50
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure
27
- typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , NoDelegate >
51
+ typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , KafkaProducerCloseOnTerminate >
28
52
let wrappedSequence : WrappedSequence
29
53
30
54
/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
@@ -51,7 +75,7 @@ public final class KafkaProducer: Service, Sendable {
51
75
typealias Producer = NIOAsyncSequenceProducer <
52
76
Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > ,
53
77
NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
54
- NoDelegate
78
+ KafkaProducerCloseOnTerminate
55
79
>
56
80
57
81
/// State of the ``KafkaProducer``.
@@ -122,7 +146,8 @@ public final class KafkaProducer: Service, Sendable {
122
146
///
123
147
/// Use the asynchronous sequence to consume message acknowledgements.
124
148
///
125
- /// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
149
+ /// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
150
+ /// Additionally, make sure to consume the asynchronous sequence otherwise the acknowledgements will be buffered in memory indefinitely.
126
151
///
127
152
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
128
153
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
@@ -140,7 +165,7 @@ public final class KafkaProducer: Service, Sendable {
140
165
let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
141
166
elementType: Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > . self,
142
167
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
143
- delegate: NoDelegate ( )
168
+ delegate: KafkaProducerCloseOnTerminate ( stateMachine : stateMachine )
144
169
)
145
170
let source = sourceAndSequence. source
146
171
@@ -183,7 +208,10 @@ public final class KafkaProducer: Service, Sendable {
183
208
while !Task. isCancelled {
184
209
let nextAction = self . stateMachine. withLockedValue { $0. nextPollLoopAction ( ) }
185
210
switch nextAction {
186
- case . poll( let client, let source) :
211
+ case . pollWithoutYield( let client) :
212
+ // Drop any incoming acknowledgments
213
+ let _ = client. eventPoll ( )
214
+ case . pollAndYield( let client, let source) :
187
215
let events = client. eventPoll ( )
188
216
for event in events {
189
217
switch event {
@@ -258,6 +286,11 @@ extension KafkaProducer {
258
286
source: Producer . Source ? ,
259
287
topicHandles: RDKafkaTopicHandles
260
288
)
289
+ /// Producer is still running but the acknowledgement asynchronous sequence was terminated.
290
+ /// All incoming acknowledgements will be dropped.
291
+ ///
292
+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
293
+ case consumptionStopped( client: KafkaClient )
261
294
/// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing
262
295
/// any messages that wait to be sent and serve any remaining queued callbacks.
263
296
///
@@ -293,11 +326,15 @@ extension KafkaProducer {
293
326
294
327
/// Action to be taken when wanting to poll.
295
328
enum PollLoopAction {
296
- /// Poll client for new consumer messages.
329
+ /// Poll client.
330
+ ///
331
+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
332
+ case pollWithoutYield( client: KafkaClient )
333
+ /// Poll client and yield acknowledgments if any received.
297
334
///
298
335
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
299
336
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
300
- case poll ( client: KafkaClient , source: Producer . Source ? )
337
+ case pollAndYield ( client: KafkaClient , source: Producer . Source ? )
301
338
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
302
339
///
303
340
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
@@ -315,10 +352,12 @@ extension KafkaProducer {
315
352
case . uninitialized:
316
353
fatalError ( " \( #function) invoked while still in state \( self . state) " )
317
354
case . started( let client, _, let source, _) :
318
- return . poll( client: client, source: source)
355
+ return . pollAndYield( client: client, source: source)
356
+ case . consumptionStopped( let client) :
357
+ return . pollWithoutYield( client: client)
319
358
case . flushing( let client, let source) :
320
359
if client. outgoingQueueSize > 0 {
321
- return . poll ( client: client, source: source)
360
+ return . pollAndYield ( client: client, source: source)
322
361
} else {
323
362
self . state = . finished
324
363
return . terminatePollLoopAndFinishSource( source: source)
@@ -360,13 +399,44 @@ extension KafkaProducer {
360
399
newMessageID: newMessageID,
361
400
topicHandles: topicHandles
362
401
)
402
+ case . consumptionStopped:
403
+ throw KafkaError . connectionClosed ( reason: " Sequence consuming acknowledgements was abruptly terminated, producer closed " )
363
404
case . flushing:
364
405
throw KafkaError . connectionClosed ( reason: " Producer in the process of flushing and shutting down " )
365
406
case . finished:
366
407
throw KafkaError . connectionClosed ( reason: " Tried to produce a message with a closed producer " )
367
408
}
368
409
}
369
410
411
+ /// Action to take after invoking ``KafkaProducer/StateMachine/stopConsuming()``.
412
+ enum StopConsumingAction {
413
+ /// Finish the given `NIOAsyncSequenceProducerSource`.
414
+ ///
415
+ /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
416
+ case finishSource( source: Producer . Source ? )
417
+ }
418
+
419
+ /// The acknowledgements asynchronous sequence was terminated.
420
+ /// All incoming acknowledgements will be dropped.
421
+ mutating func stopConsuming( ) -> StopConsumingAction ? {
422
+ switch self . state {
423
+ case . uninitialized:
424
+ fatalError ( " \( #function) invoked while still in state \( self . state) " )
425
+ case . consumptionStopped:
426
+ fatalError ( " stopConsuming() must not be invoked more than once " )
427
+ case . started( let client, _, let source, _) :
428
+ self . state = . consumptionStopped( client: client)
429
+ return . finishSource( source: source)
430
+ case . flushing( let client, let source) :
431
+ // Setting source to nil to prevent incoming acknowledgements from buffering in `source`
432
+ self . state = . flushing( client: client, source: nil )
433
+ return . finishSource( source: source)
434
+ case . finished:
435
+ break
436
+ }
437
+ return nil
438
+ }
439
+
370
440
/// Get action to be taken when wanting to do close the producer.
371
441
///
372
442
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
@@ -376,6 +446,8 @@ extension KafkaProducer {
376
446
fatalError ( " \( #function) invoked while still in state \( self . state) " )
377
447
case . started( let client, _, let source, _) :
378
448
self . state = . flushing( client: client, source: source)
449
+ case . consumptionStopped( let client) :
450
+ self . state = . flushing( client: client, source: nil )
379
451
case . flushing, . finished:
380
452
break
381
453
}
0 commit comments