Skip to content

Commit 052396b

Browse files
KafkaProducer: replace callbacks with eventPoll (#82)
* `KafkaProducer`: replace callbacks with `eventPoll` Motivation: Currently we serve `librdkafka` logs and delivery reports through callbacks. In the future we also want to get notified about `librdkafka` triggered errors, statistics and rebalances. As having the callbacks adds a lot of overhead e.g. by having `Unmanaged` references and having to wrap Swift closures in C callbacks, we rather want to have an `eventPoll` method that regularly polls for all sorts of events and handles them inline. Modifications: * `KafkaClient` * remove `poll` method * add new method `eventPoll` * move mapping from `librdkafka` logs to `swift-log` to `KafkaClient.eventPoll` * `KafkaProducer`: * receive acknowledgements through `KafkaClient.eventPoll` instead of using `KafkaClient.poll` to trigger event callbacks * replace `deliveryReport` callback and handle deliveryReport event directly in `KafkaProducer` * `RDKafkaConfig` * remove `logging` logic * `KafkaProducerTests`: * add `testProducerLog` that asserts that `librdkafka` logs get forwarded to `swift-log` * move `convertMessageToAcknowledgementResult` from `RDKafkaConfig` to `KafkaClient` * Fix: don't listen to RD_KAFKA_EVENT_DR in non-ack producer * Review Franz Modifications: * `KafkaClient.eventPoll`: * reserve capacity for arrays to avoid reallocations * move `rd_kafka_event_destory` to `defer` statement * get rid of named loop (just return from loop) * wrap handling of different event types in `private` methods * wrap subset of `RD_KAFKA_*` event types in Swift type `RDKafkaEvent`
1 parent a46f57b commit 052396b

File tree

8 files changed

+322
-205
lines changed

8 files changed

+322
-205
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 126 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,22 @@ final class KafkaClient: Sendable {
2323

2424
/// Handle for the C library's Kafka instance.
2525
private let kafkaHandle: OpaquePointer
26-
/// References the opaque object passed to the config to ensure ARC retains it as long as the client exists.
27-
private let opaque: RDKafkaConfig.CapturedClosures
2826
/// A logger.
2927
private let logger: Logger
3028

29+
/// `librdkafka`'s main `rd_kafka_queue_t`.
30+
private let mainQueue: OpaquePointer
31+
3132
init(
3233
kafkaHandle: OpaquePointer,
33-
opaque: RDKafkaConfig.CapturedClosures,
3434
logger: Logger
3535
) {
3636
self.kafkaHandle = kafkaHandle
37-
self.opaque = opaque
3837
self.logger = logger
38+
39+
self.mainQueue = rd_kafka_queue_get_main(self.kafkaHandle)
40+
41+
rd_kafka_set_log_queue(self.kafkaHandle, self.mainQueue)
3942
}
4043

4144
deinit {
@@ -83,18 +86,101 @@ final class KafkaClient: Sendable {
8386
}
8487
}
8588

86-
/// Polls the Kafka client for events.
89+
/// Swift wrapper for events from `librdkafka`'s event queue.
90+
enum KafkaEvent {
91+
case deliveryReport(results: [Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>])
92+
}
93+
94+
/// Poll the event `rd_kafka_queue_t` for new events.
8795
///
88-
/// Events will cause application-provided callbacks to be called.
96+
/// - Parameter maxEvents:Maximum number of events to serve in one invocation.
97+
func eventPoll(maxEvents: Int = 100) -> [KafkaEvent] {
98+
var events = [KafkaEvent]()
99+
events.reserveCapacity(maxEvents)
100+
101+
for _ in 0..<maxEvents {
102+
let event = rd_kafka_queue_poll(self.mainQueue, 0)
103+
defer { rd_kafka_event_destroy(event) }
104+
105+
let rdEventType = rd_kafka_event_type(event)
106+
guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
107+
fatalError("Unsupported event type: \(rdEventType)")
108+
}
109+
110+
switch eventType {
111+
case .deliveryReport:
112+
let forwardEvent = self.handleDeliveryReportEvent(event)
113+
events.append(forwardEvent) // Return KafkaEvent.deliveryReport as part of this method
114+
case .log:
115+
self.handleLogEvent(event)
116+
case .none:
117+
// Finished reading events, return early
118+
return events
119+
default:
120+
break // Ignored Event
121+
}
122+
}
123+
124+
return events
125+
}
126+
127+
/// Handle event of type `RDKafkaEvent.deliveryReport`.
89128
///
90-
/// - Parameter timeout: Specifies the maximum amount of time
91-
/// (in milliseconds) that the call will block waiting for events.
92-
/// For non-blocking calls, provide 0 as `timeout`.
93-
/// To wait indefinitely for an event, provide -1.
94-
/// - Returns: The number of events served.
95-
@discardableResult
96-
func poll(timeout: Int32) -> Int32 {
97-
return rd_kafka_poll(self.kafkaHandle, timeout)
129+
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
130+
/// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`.
131+
private func handleDeliveryReportEvent(_ event: OpaquePointer?) -> KafkaEvent {
132+
let deliveryReportCount = rd_kafka_event_message_count(event)
133+
var deliveryReportResults = [Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>]()
134+
deliveryReportResults.reserveCapacity(deliveryReportCount)
135+
136+
while let messagePointer = rd_kafka_event_message_next(event) {
137+
guard let message = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer) else {
138+
continue
139+
}
140+
deliveryReportResults.append(message)
141+
}
142+
143+
return .deliveryReport(results: deliveryReportResults)
144+
}
145+
146+
/// Handle event of type `RDKafkaEvent.log`.
147+
///
148+
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
149+
private func handleLogEvent(_ event: OpaquePointer?) {
150+
var faculty: UnsafePointer<CChar>?
151+
var buffer: UnsafePointer<CChar>?
152+
var level: Int32 = 0
153+
if rd_kafka_event_log(event, &faculty, &buffer, &level) == 0 {
154+
if let faculty, let buffer {
155+
// Mapping according to https://en.wikipedia.org/wiki/Syslog
156+
switch level {
157+
case 0...2: /* Emergency, Alert, Critical */
158+
self.logger.critical(
159+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
160+
)
161+
case 3: /* Error */
162+
self.logger.error(
163+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
164+
)
165+
case 4: /* Warning */
166+
self.logger.warning(
167+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
168+
)
169+
case 5: /* Notice */
170+
self.logger.notice(
171+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
172+
)
173+
case 6: /* Informational */
174+
self.logger.info(
175+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
176+
)
177+
default: /* Debug */
178+
self.logger.debug(
179+
Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty)
180+
)
181+
}
182+
}
183+
}
98184
}
99185

100186
/// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
@@ -271,4 +357,30 @@ final class KafkaClient: Sendable {
271357
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
272358
return try body(self.kafkaHandle)
273359
}
360+
361+
/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
362+
/// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
363+
/// - Returns: A ``KafkaAcknowledgementResult``.
364+
private static func convertMessageToAcknowledgementResult(
365+
messagePointer: UnsafePointer<rd_kafka_message_t>?
366+
) -> Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>? {
367+
guard let messagePointer else {
368+
return nil
369+
}
370+
371+
let messageID = KafkaProducerMessageID(rawValue: UInt(bitPattern: messagePointer.pointee._private))
372+
373+
let messageResult: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
374+
do {
375+
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID)
376+
messageResult = .success(message)
377+
} catch {
378+
guard let error = error as? KafkaAcknowledgedMessageError else {
379+
fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)")
380+
}
381+
messageResult = .failure(error)
382+
}
383+
384+
return messageResult
385+
}
274386
}

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ public final class KafkaConsumer: Sendable, Service {
7373
self.config = config
7474
self.logger = logger
7575

76-
let client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: logger)
76+
let client = try RDKafka.createClient(
77+
type: .consumer,
78+
configDictionary: config.dictionary,
79+
events: [.log],
80+
logger: logger
81+
)
7782

7883
self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger))
7984

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ public final class KafkaProducer: Service, Sendable {
9898
let client = try RDKafka.createClient(
9999
type: .producer,
100100
configDictionary: config.dictionary,
101-
// Having no callback will discard any incoming acknowledgement messages
102-
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
103-
deliveryReportCallback: nil,
101+
events: [.log], // No .deliveryReport here!
104102
logger: logger
105103
)
106104

@@ -149,15 +147,7 @@ public final class KafkaProducer: Service, Sendable {
149147
let client = try RDKafka.createClient(
150148
type: .producer,
151149
configDictionary: config.dictionary,
152-
deliveryReportCallback: { [logger, source] messageResult in
153-
guard let messageResult else {
154-
logger.error("Could not resolve acknowledged message")
155-
return
156-
}
157-
158-
// Ignore YieldResult as we don't support back pressure in KafkaProducer
159-
_ = source.yield(messageResult)
160-
},
150+
events: [.log, .deliveryReport],
161151
logger: logger
162152
)
163153

@@ -193,8 +183,15 @@ public final class KafkaProducer: Service, Sendable {
193183
while !Task.isCancelled {
194184
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
195185
switch nextAction {
196-
case .poll(let client):
197-
client.poll(timeout: 0)
186+
case .poll(let client, let source):
187+
let events = client.eventPoll()
188+
for event in events {
189+
switch event {
190+
case .deliveryReport(let results):
191+
// Ignore YieldResult as we don't support back pressure in KafkaProducer
192+
results.forEach { _ = source?.yield($0) }
193+
}
194+
}
198195
try await Task.sleep(for: self.config.pollInterval)
199196
case .terminatePollLoopAndFinishSource(let source):
200197
source?.finish()
@@ -297,7 +294,10 @@ extension KafkaProducer {
297294
/// Action to be taken when wanting to poll.
298295
enum PollLoopAction {
299296
/// Poll client for new consumer messages.
300-
case poll(client: KafkaClient)
297+
///
298+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
299+
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
300+
case poll(client: KafkaClient, source: Producer.Source?)
301301
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
302302
///
303303
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
@@ -314,11 +314,11 @@ extension KafkaProducer {
314314
switch self.state {
315315
case .uninitialized:
316316
fatalError("\(#function) invoked while still in state \(self.state)")
317-
case .started(let client, _, _, _):
318-
return .poll(client: client)
317+
case .started(let client, _, let source, _):
318+
return .poll(client: client, source: source)
319319
case .flushing(let client, let source):
320320
if client.outgoingQueueSize > 0 {
321-
return .poll(client: client)
321+
return .poll(client: client, source: source)
322322
} else {
323323
self.state = .finished
324324
return .terminatePollLoopAndFinishSource(source: source)

Sources/SwiftKafka/RDKafka/RDKafka.swift

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,14 @@ struct RDKafka {
2727
static func createClient(
2828
type: ClientType,
2929
configDictionary: [String: String],
30-
deliveryReportCallback: RDKafkaConfig.CapturedClosures.DeliveryReportClosure? = nil,
30+
events: [RDKafkaEvent],
3131
logger: Logger
3232
) throws -> KafkaClient {
3333
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
3434

3535
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)
36-
37-
// Check that delivery report callback can be only set for producer
38-
guard deliveryReportCallback == nil || type == .producer else {
39-
fatalError("Delivery report callback can't be defined for consumer client")
40-
}
41-
42-
let opaque = RDKafkaConfig.setCallbackClosures(
43-
configPointer: rdConfig,
44-
deliveryReportCallback: deliveryReportCallback,
45-
logger: logger
46-
)
36+
try RDKafkaConfig.set(configPointer: rdConfig, key: "log.queue", value: "true")
37+
RDKafkaConfig.setEvents(configPointer: rdConfig, events: events)
4738

4839
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
4940
defer { errorChars.deallocate() }
@@ -61,6 +52,6 @@ struct RDKafka {
6152
throw KafkaError.client(reason: errorString)
6253
}
6354

64-
return KafkaClient(kafkaHandle: handle, opaque: opaque, logger: logger)
55+
return KafkaClient(kafkaHandle: handle, logger: logger)
6556
}
6657
}

0 commit comments

Comments
 (0)