Skip to content

Commit 20c7f54

Browse files
KafkaProducer.makeProducerWithEvents() avoid fatalError (#117)
* `KafkaProducer.makeeProducerWithEvents()` avoid `fatalError` Motivation: `KafkaProducer.makeeProducerWithEvents()`: initializing the `NIOAsyncSequenceProducer Modifications: * make designated initializer of `KafkaProducer` non-throwing * `KafkaProducer.makeProducerWithEvents`: * initialize `client` before calling `NIOAsyncSequenceProducer.makeSequence` -> if initializing `client` fails the appropiate error gets thrown instead of triggering a fatalError because `KafkaProducerCloseOnTerminate` gets triggered in an invalid state Result: When the initialization `KafkaProducer.makeProducerWithEvents` fails an error is thrown instead of triggering the `fatalError` in `KafkaProducerCloseOnTerminate`. * Add order comment to KafkaConsumer.makeConsumerWithEvents
1 parent 2a1ced0 commit 20c7f54

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ public final class KafkaConsumer: Sendable, Service {
256256
logger: logger
257257
)
258258

259+
// Note:
260+
// It's crucial to initialize the `sourceAndSequence` variable AFTER `client`.
261+
// This order is important to prevent the accidental triggering of `KafkaConsumerCloseOnTerminate.didTerminate()`.
262+
// If this order is not met and `RDKafkaClient.makeClient()` fails,
263+
// it leads to a call to `stateMachine.messageSequenceTerminated()` while it's still in the `.uninitialized` state.
259264
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
260265
elementType: KafkaConsumerEvent.self,
261266
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),

Sources/Kafka/KafkaProducer.swift

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public final class KafkaProducer: Service, Sendable {
9595
stateMachine: NIOLockedValueBox<KafkaProducer.StateMachine>,
9696
configuration: KafkaProducerConfiguration,
9797
topicConfiguration: KafkaTopicConfiguration
98-
) throws {
98+
) {
9999
self.stateMachine = stateMachine
100100
self.configuration = configuration
101101
self.topicConfiguration = topicConfiguration
@@ -130,7 +130,7 @@ public final class KafkaProducer: Service, Sendable {
130130
)
131131
}
132132

133-
try self.init(
133+
self.init(
134134
stateMachine: stateMachine,
135135
configuration: configuration,
136136
topicConfiguration: configuration.topicConfiguration
@@ -156,30 +156,34 @@ public final class KafkaProducer: Service, Sendable {
156156
) throws -> (KafkaProducer, KafkaProducerEvents) {
157157
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
158158

159-
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
160-
elementType: KafkaProducerEvent.self,
161-
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
162-
delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine)
163-
)
164-
let source = sourceAndSequence.source
165-
166159
let client = try RDKafkaClient.makeClient(
167160
type: .producer,
168161
configDictionary: configuration.dictionary,
169162
events: [.log, .deliveryReport],
170163
logger: logger
171164
)
172165

173-
let producer = try KafkaProducer(
166+
let producer = KafkaProducer(
174167
stateMachine: stateMachine,
175168
configuration: configuration,
176169
topicConfiguration: configuration.topicConfiguration
177170
)
178171

172+
// Note:
173+
// It's crucial to initialize the `sourceAndSequence` variable AFTER `client`.
174+
// This order is important to prevent the accidental triggering of `KafkaProducerCloseOnTerminate.didTerminate()`.
175+
// If this order is not met and `RDKafkaClient.makeClient()` fails,
176+
// it leads to a call to `stateMachine.stopConsuming()` while it's still in the `.uninitialized` state.
177+
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
178+
elementType: KafkaProducerEvent.self,
179+
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
180+
delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine)
181+
)
182+
179183
stateMachine.withLockedValue {
180184
$0.initialize(
181185
client: client,
182-
source: source
186+
source: sourceAndSequence.source
183187
)
184188
}
185189

0 commit comments

Comments
 (0)