Skip to content

Commit 51b5e64

Browse files
Move KafkaClient to RD* namespace (#89)
Motivation: Our `KafkaClient` class has evolved into a wrapper of `rd_kafka_*` C functions. Therefore I thought it is reasonable to move it to the `RDKafka*` namespace (conceptually) and remove the imports of `CRDKafka` in `KafkaConsumer` and `KafkaProducer`. Modifications: * `KafkaConsumer`: remove `CRDKafka` import * `KafkaProducer`: remove `CRDKafka` import * rename `KafkaClient -> `RDKafkaClient` and move it to the `RDKafka` folder * delete `RDKafka.swift` and move `create/makeClient` factory method to `RDKafkaClient`
1 parent 594487b commit 51b5e64

File tree

9 files changed

+90
-110
lines changed

9 files changed

+90
-110
lines changed

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import Crdkafka
1615
import Logging
1716
import NIOConcurrencyHelpers
1817
import NIOCore
@@ -122,7 +121,7 @@ public final class KafkaConsumer: Sendable, Service {
122121
self.config = config
123122
self.logger = logger
124123

125-
let client = try RDKafka.createClient(
124+
let client = try RDKafkaClient.makeClient(
126125
type: .consumer,
127126
configDictionary: config.dictionary,
128127
events: [.log, .fetch, .offsetCommit],
@@ -149,8 +148,8 @@ public final class KafkaConsumer: Sendable, Service {
149148
)
150149
}
151150

152-
// Events that would be triggered by ``KafkaClient/poll(timeout:)``
153-
// are now triggered by ``KafkaClient/consumerPoll``.
151+
// Events that would be triggered by ``RDKafkaClient/poll(timeout:)``
152+
// are now triggered by ``RDKafkaClient/consumerPoll``.
154153
try client.pollSetConsumer()
155154

156155
switch config.consumptionStrategy._internal {
@@ -289,7 +288,7 @@ public final class KafkaConsumer: Sendable, Service {
289288
}
290289

291290
private func _triggerGracefulShutdown(
292-
client: KafkaClient,
291+
client: RDKafkaClient,
293292
logger: Logger
294293
) {
295294
do {
@@ -323,27 +322,27 @@ extension KafkaConsumer {
323322
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
324323
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
325324
case initializing(
326-
client: KafkaClient,
325+
client: RDKafkaClient,
327326
source: Producer.Source
328327
)
329328
/// The ``KafkaConsumer`` is consuming messages.
330329
///
331330
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
332331
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
333332
case consuming(
334-
client: KafkaClient,
333+
client: RDKafkaClient,
335334
source: Producer.Source
336335
)
337336
/// Consumer is still running but the messages asynchronous sequence was terminated.
338337
/// All incoming messages will be dropped.
339338
///
340339
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
341-
case consumptionStopped(client: KafkaClient)
340+
case consumptionStopped(client: RDKafkaClient)
342341
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
343342
/// We are now in the process of commiting our last state to the broker.
344343
///
345344
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
346-
case finishing(client: KafkaClient)
345+
case finishing(client: RDKafkaClient)
347346
/// The ``KafkaConsumer`` is closed.
348347
case finished
349348
}
@@ -354,7 +353,7 @@ extension KafkaConsumer {
354353
/// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are
355354
/// not yet available when the normal initialization occurs.
356355
mutating func initialize(
357-
client: KafkaClient,
356+
client: RDKafkaClient,
358357
source: Producer.Source
359358
) {
360359
guard case .uninitialized = self.state else {
@@ -373,15 +372,15 @@ extension KafkaConsumer {
373372
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
374373
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
375374
case pollForAndYieldMessage(
376-
client: KafkaClient,
375+
client: RDKafkaClient,
377376
source: Producer.Source
378377
)
379378
/// The ``KafkaConsumer`` stopped consuming messages or
380379
/// is in the process of shutting down.
381380
/// Poll to serve any queued events and commit outstanding state to the broker.
382381
///
383382
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
384-
case pollWithoutYield(client: KafkaClient)
383+
case pollWithoutYield(client: RDKafkaClient)
385384
/// Terminate the poll loop.
386385
case terminatePollLoop
387386
}
@@ -416,7 +415,7 @@ extension KafkaConsumer {
416415
enum SetUpConnectionAction {
417416
/// Set up the connection through ``subscribe()`` or ``assign()``.
418417
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
419-
case setUpConnection(client: KafkaClient)
418+
case setUpConnection(client: RDKafkaClient)
420419
}
421420

422421
/// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
@@ -458,7 +457,7 @@ extension KafkaConsumer {
458457
enum StoreOffsetAction {
459458
/// Store the message offset with the given `client`.
460459
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
461-
case storeOffset(client: KafkaClient)
460+
case storeOffset(client: RDKafkaClient)
462461
}
463462

464463
/// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
@@ -483,7 +482,7 @@ extension KafkaConsumer {
483482
///
484483
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
485484
case commitSync(
486-
client: KafkaClient
485+
client: RDKafkaClient
487486
)
488487
/// Throw an error. The ``KafkaConsumer`` is closed.
489488
case throwClosedError
@@ -514,14 +513,14 @@ extension KafkaConsumer {
514513
///
515514
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
516515
case triggerGracefulShutdown(
517-
client: KafkaClient
516+
client: RDKafkaClient
518517
)
519518
/// Shut down the ``KafkaConsumer`` and finish the given `source` object.
520519
///
521520
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
522521
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
523522
case triggerGracefulShutdownAndFinishSource(
524-
client: KafkaClient,
523+
client: RDKafkaClient,
525524
source: Producer.Source
526525
)
527526
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import Crdkafka
1615
import Logging
1716
import NIOConcurrencyHelpers
1817
import NIOCore
@@ -119,7 +118,7 @@ public final class KafkaProducer: Service, Sendable {
119118
) throws -> KafkaProducer {
120119
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
121120

122-
let client = try RDKafka.createClient(
121+
let client = try RDKafkaClient.makeClient(
123122
type: .producer,
124123
configDictionary: config.dictionary,
125124
events: [.log], // No .deliveryReport here!
@@ -169,7 +168,7 @@ public final class KafkaProducer: Service, Sendable {
169168
)
170169
let source = sourceAndSequence.source
171170

172-
let client = try RDKafka.createClient(
171+
let client = try RDKafkaClient.makeClient(
173172
type: .producer,
174173
configDictionary: config.dictionary,
175174
events: [.log, .deliveryReport],
@@ -283,7 +282,7 @@ extension KafkaProducer {
283282
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
284283
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
285284
case started(
286-
client: KafkaClient,
285+
client: RDKafkaClient,
287286
messageIDCounter: UInt,
288287
source: Producer.Source?,
289288
topicHandles: RDKafkaTopicHandles
@@ -292,14 +291,14 @@ extension KafkaProducer {
292291
/// All incoming acknowledgements will be dropped.
293292
///
294293
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
295-
case consumptionStopped(client: KafkaClient)
294+
case consumptionStopped(client: RDKafkaClient)
296295
/// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing
297296
/// any messages that wait to be sent and serve any remaining queued callbacks.
298297
///
299298
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
300299
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
301300
case flushing(
302-
client: KafkaClient,
301+
client: RDKafkaClient,
303302
source: Producer.Source?
304303
)
305304
/// The ``KafkaProducer`` has been shut down and cannot be used anymore.
@@ -312,7 +311,7 @@ extension KafkaProducer {
312311
/// Delayed initialization of `StateMachine` as the `source` is not yet available
313312
/// when the normal initialization occurs.
314313
mutating func initialize(
315-
client: KafkaClient,
314+
client: RDKafkaClient,
316315
source: Producer.Source?
317316
) {
318317
guard case .uninitialized = self.state else {
@@ -331,12 +330,12 @@ extension KafkaProducer {
331330
/// Poll client.
332331
///
333332
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
334-
case pollWithoutYield(client: KafkaClient)
333+
case pollWithoutYield(client: RDKafkaClient)
335334
/// Poll client and yield acknowledgments if any received.
336335
///
337336
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
338337
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
339-
case pollAndYield(client: KafkaClient, source: Producer.Source?)
338+
case pollAndYield(client: RDKafkaClient, source: Producer.Source?)
340339
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
341340
///
342341
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
@@ -375,7 +374,7 @@ extension KafkaProducer {
375374
///
376375
/// - Important: `newMessageID` is the new message ID assigned to the message to be sent.
377376
case send(
378-
client: KafkaClient,
377+
client: RDKafkaClient,
379378
newMessageID: UInt,
380379
topicHandles: RDKafkaTopicHandles
381380
)

Sources/SwiftKafka/RDKafka/RDKafka.swift

Lines changed: 0 additions & 63 deletions
This file was deleted.

Sources/SwiftKafka/KafkaClient.swift renamed to Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@ import Logging
1717

1818
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
1919
/// which is used to handle the connection to the Kafka ecosystem.
20-
final class KafkaClient: Sendable {
20+
final class RDKafkaClient: Sendable {
21+
/// Determines if client is a producer or a consumer.
22+
enum ClientType {
23+
case producer
24+
case consumer
25+
}
26+
2127
// Default size for Strings returned from C API
2228
static let stringSize = 1024
2329

@@ -29,7 +35,8 @@ final class KafkaClient: Sendable {
2935
/// `librdkafka`'s main `rd_kafka_queue_t`.
3036
private let mainQueue: OpaquePointer
3137

32-
init(
38+
// Use factory method to initialize
39+
private init(
3340
kafkaHandle: OpaquePointer,
3441
logger: Logger
3542
) {
@@ -45,6 +52,44 @@ final class KafkaClient: Sendable {
4552
rd_kafka_destroy(kafkaHandle)
4653
}
4754

55+
/// Factory method creating a new instance of a ``RDKafkaClient``.
56+
static func makeClient(
57+
type: ClientType,
58+
configDictionary: [String: String],
59+
events: [RDKafkaEvent],
60+
logger: Logger
61+
) throws -> RDKafkaClient {
62+
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
63+
64+
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)
65+
// Manually override some of the configuration options
66+
// Handle logs in event queue
67+
try RDKafkaConfig.set(configPointer: rdConfig, key: "log.queue", value: "true")
68+
// KafkaConsumer is manually storing read offsets
69+
if type == .consumer {
70+
try RDKafkaConfig.set(configPointer: rdConfig, key: "enable.auto.offset.store", value: "false")
71+
}
72+
RDKafkaConfig.setEvents(configPointer: rdConfig, events: events)
73+
74+
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
75+
defer { errorChars.deallocate() }
76+
77+
guard let handle = rd_kafka_new(
78+
clientType,
79+
rdConfig,
80+
errorChars,
81+
RDKafkaClient.stringSize
82+
) else {
83+
// rd_kafka_new only frees the rd_kafka_conf_t upon success
84+
rd_kafka_conf_destroy(rdConfig)
85+
86+
let errorString = String(cString: errorChars)
87+
throw KafkaError.client(reason: errorString)
88+
}
89+
90+
return RDKafkaClient(kafkaHandle: handle, logger: logger)
91+
}
92+
4893
/// Produce a message to the Kafka cluster.
4994
///
5095
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
@@ -134,7 +179,7 @@ final class KafkaClient: Sendable {
134179
/// Handle event of type `RDKafkaEvent.deliveryReport`.
135180
///
136181
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
137-
/// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`.
182+
/// - Returns: `KafkaEvent` to be returned as part of ``RDKafkaClient.eventPoll()`.
138183
private func handleDeliveryReportEvent(_ event: OpaquePointer?) -> KafkaEvent {
139184
let deliveryReportCount = rd_kafka_event_message_count(event)
140185
var deliveryReportResults = [Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>]()
@@ -230,13 +275,13 @@ final class KafkaClient: Sendable {
230275
actualCallback(.success(()))
231276
}
232277

233-
/// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
234-
/// queue (``KafkaClient/consumerPoll``).
278+
/// Redirect the main ``RDKafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
279+
/// queue (``RDKafkaClient/consumerPoll``).
235280
///
236-
/// Events that would be triggered by ``KafkaClient/poll(timeout:)``
237-
/// are now triggered by ``KafkaClient/consumerPoll``.
281+
/// Events that would be triggered by ``RDKafkaClient/poll(timeout:)``
282+
/// are now triggered by ``RDKafkaClient/consumerPoll``.
238283
///
239-
/// - Warning: It is not allowed to call ``KafkaClient/poll(timeout:)`` after ``KafkaClient/pollSetConsumer``.
284+
/// - Warning: It is not allowed to call ``RDKafkaClient/poll(timeout:)`` after ``RDKafkaClient/pollSetConsumer``.
240285
func pollSetConsumer() throws {
241286
let result = rd_kafka_poll_set_consumer(self.kafkaHandle)
242287
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
@@ -347,7 +392,7 @@ final class KafkaClient: Sendable {
347392
/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
348393
/// leaving the consumer group (if applicable).
349394
///
350-
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
395+
/// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
351396
func consumerClose() throws {
352397
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
353398
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)

0 commit comments

Comments
 (0)