Skip to content

Commit fe3c9fc

Browse files
KafkaConsumer: replace callbacks with eventPoll (#86)
Motivation: By using the polling for events we can get rid of most of our callbacks and have the ability to conveniently listen to more `librdkafka` events like errors, statistics, etc. in the future. Modifications: * handle `RD_KAFKA_EVENT_FETCH` in `KafkaClient.eventPoll` * handle `RD_KAFKA_EVENT_OFFSET_COMMIT` in `KafkaClient.eventPoll` -> we cannot have an event handler and callbacks served at the same time, that is why the event handler also invokes the consumer commit callback * delete `KafkaClient.consumerPoll()` * use `KafkaClient.eventPoll` instead of `KafkaClient.consumerPoll` to receive `KafkaConsumerMessage`s * create `KafkaConsumerTests` * add `KafkaConsumerTests.testConsumerLog` * move `MockLogHandler` to `Utilities.swift`
1 parent 38ece7b commit fe3c9fc

File tree

6 files changed

+229
-142
lines changed

6 files changed

+229
-142
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ final class KafkaClient: Sendable {
8989
/// Swift wrapper for events from `librdkafka`'s event queue.
9090
enum KafkaEvent {
9191
case deliveryReport(results: [Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>])
92+
case consumerMessages(result: Result<KafkaConsumerMessage, Error>)
9293
}
9394

9495
/// Poll the event `rd_kafka_queue_t` for new events.
@@ -110,9 +111,15 @@ final class KafkaClient: Sendable {
110111
switch eventType {
111112
case .deliveryReport:
112113
let forwardEvent = self.handleDeliveryReportEvent(event)
113-
events.append(forwardEvent) // Return KafkaEvent.deliveryReport as part of this method
114+
events.append(forwardEvent)
115+
case .fetch:
116+
if let forwardEvent = self.handleFetchEvent(event) {
117+
events.append(forwardEvent)
118+
}
114119
case .log:
115120
self.handleLogEvent(event)
121+
case .offsetCommit:
122+
self.handleOffsetCommitEvent(event)
116123
case .none:
117124
// Finished reading events, return early
118125
return events
@@ -140,9 +147,30 @@ final class KafkaClient: Sendable {
140147
deliveryReportResults.append(message)
141148
}
142149

150+
// The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
143151
return .deliveryReport(results: deliveryReportResults)
144152
}
145153

154+
/// Handle event of type `RDKafkaEvent.fetch`.
155+
///
156+
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
157+
/// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`.
158+
private func handleFetchEvent(_ event: OpaquePointer?) -> KafkaEvent? {
159+
do {
160+
// RD_KAFKA_EVENT_FETCH only returns a single message:
161+
// https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a3a855eb7bdf17f5797d4911362a5fc7c
162+
if let messagePointer = rd_kafka_event_message_next(event) {
163+
let message = try KafkaConsumerMessage(messagePointer: messagePointer)
164+
return .consumerMessages(result: .success(message))
165+
} else {
166+
return nil
167+
}
168+
} catch {
169+
return .consumerMessages(result: .failure(error))
170+
}
171+
// The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
172+
}
173+
146174
/// Handle event of type `RDKafkaEvent.log`.
147175
///
148176
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
@@ -183,6 +211,25 @@ final class KafkaClient: Sendable {
183211
}
184212
}
185213

214+
/// Handle event of type `RDKafkaEvent.offsetCommit`.
215+
///
216+
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
217+
private func handleOffsetCommitEvent(_ event: OpaquePointer?) {
218+
guard let opaquePointer = rd_kafka_event_opaque(event) else {
219+
fatalError("Could not resolve reference to catpured Swift callback instance")
220+
}
221+
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()
222+
let actualCallback = opaque.closure
223+
224+
let error = rd_kafka_event_error(event)
225+
guard error == RD_KAFKA_RESP_ERR_NO_ERROR else {
226+
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
227+
actualCallback(.failure(kafkaError))
228+
return
229+
}
230+
actualCallback(.success(()))
231+
}
232+
186233
/// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
187234
/// queue (``KafkaClient/consumerPoll``).
188235
///
@@ -197,32 +244,6 @@ final class KafkaClient: Sendable {
197244
}
198245
}
199246

200-
/// Request a new message from the Kafka cluster.
201-
///
202-
/// - Important: This method should only be invoked from ``KafkaConsumer``.
203-
///
204-
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
205-
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
206-
func consumerPoll() throws -> KafkaConsumerMessage? {
207-
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else {
208-
// No error, there might be no more messages
209-
return nil
210-
}
211-
212-
defer {
213-
// Destroy message otherwise poll() will block forever
214-
rd_kafka_message_destroy(messagePointer)
215-
}
216-
217-
// Reached the end of the topic+partition queue on the broker
218-
if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
219-
return nil
220-
}
221-
222-
let message = try KafkaConsumerMessage(messagePointer: messagePointer)
223-
return message
224-
}
225-
226247
/// Subscribe to topic set using balanced consumer groups.
227248
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
228249
func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws {
@@ -285,39 +306,12 @@ final class KafkaClient: Sendable {
285306
// should not be counted in ARC as this can lead to memory leaks.
286307
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
287308

288-
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
289-
290-
// Create a C closure that calls the captured closure
291-
let callbackWrapper: (
292-
@convention(c) (
293-
OpaquePointer?,
294-
rd_kafka_resp_err_t,
295-
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
296-
UnsafeMutableRawPointer?
297-
) -> Void
298-
) = { _, error, _, opaquePointer in
299-
300-
guard let opaquePointer = opaquePointer else {
301-
fatalError("Could not resolve reference to catpured Swift callback instance")
302-
}
303-
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()
304-
305-
let actualCallback = opaque.closure
306-
307-
if error == RD_KAFKA_RESP_ERR_NO_ERROR {
308-
actualCallback(.success(()))
309-
} else {
310-
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
311-
actualCallback(.failure(kafkaError))
312-
}
313-
}
314-
315309
changesList.withListPointer { listPointer in
316310
rd_kafka_commit_queue(
317311
self.kafkaHandle,
318312
listPointer,
319-
consumerQueue,
320-
callbackWrapper,
313+
self.mainQueue,
314+
nil,
321315
opaquePointer
322316
)
323317
}

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public final class KafkaConsumer: Sendable, Service {
8484
let client = try RDKafka.createClient(
8585
type: .consumer,
8686
configDictionary: config.dictionary,
87-
events: [.log],
87+
events: [.log, .fetch, .offsetCommit],
8888
logger: logger
8989
)
9090

@@ -173,20 +173,27 @@ public final class KafkaConsumer: Sendable, Service {
173173
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
174174
switch nextAction {
175175
case .pollForAndYieldMessage(let client, let source):
176-
do {
177-
if let message = try client.consumerPoll() {
178-
// We do not support back pressure, we can ignore the yield result
179-
_ = source.yield(message)
176+
let events = client.eventPoll()
177+
for event in events {
178+
switch event {
179+
case .consumerMessages(let result):
180+
switch result {
181+
case .success(let message):
182+
// We do not support back pressure, we can ignore the yield result
183+
_ = source.yield(message)
184+
case .failure(let error):
185+
source.finish()
186+
throw error
187+
}
188+
default:
189+
break // Ignore
180190
}
181-
} catch {
182-
source.finish()
183-
throw error
184191
}
185192
try await Task.sleep(for: self.config.pollInterval)
186193
case .pollUntilClosed(let client):
187194
// Ignore poll result, we are closing down and just polling to commit
188195
// outstanding consumer state
189-
_ = try client.consumerPoll()
196+
let _ = client.eventPoll()
190197
try await Task.sleep(for: self.config.pollInterval)
191198
case .terminatePollLoop:
192199
return

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ public final class KafkaProducer: Service, Sendable {
218218
case .deliveryReport(let results):
219219
// Ignore YieldResult as we don't support back pressure in KafkaProducer
220220
results.forEach { _ = source?.yield($0) }
221+
default:
222+
break // Ignore
221223
}
222224
}
223225
try await Task.sleep(for: self.config.pollInterval)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import ServiceLifecycle
17+
@testable import SwiftKafka
18+
import XCTest
19+
20+
// For testing locally on Mac, do the following:
21+
//
22+
// 1. Install Kafka and Zookeeper using homebrew
23+
//
24+
// https://medium.com/@Ankitthakur/apache-kafka-installation-on-mac-using-homebrew-a367cdefd273
25+
//
26+
// 2. Start Zookeeper & Kafka Server
27+
//
28+
// (Homebrew - Apple Silicon)
29+
// zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties & kafka-server-start /opt/homebrew/etc/kafka/server.properties
30+
//
31+
// (Homebrew - Intel Mac)
32+
// zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
33+
34+
final class KafkaConsumerTests: XCTestCase {
35+
func testConsumerLog() async throws {
36+
let recorder = LogEventRecorder()
37+
let mockLogger = Logger(label: "kafka.test.consumer.log") {
38+
_ in MockLogHandler(recorder: recorder)
39+
}
40+
41+
// Set no bootstrap servers to trigger librdkafka configuration warning
42+
let config = KafkaConsumerConfiguration(
43+
consumptionStrategy: .partition(topic: "some topic", partition: .unassigned),
44+
bootstrapServers: [],
45+
saslMechanism: .gssapi // This should trigger a configuration error
46+
)
47+
48+
let consumer = try KafkaConsumer(config: config, logger: mockLogger)
49+
50+
let serviceGroup = ServiceGroup(
51+
services: [consumer],
52+
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
53+
logger: .kafkaTest
54+
)
55+
56+
await withThrowingTaskGroup(of: Void.self) { group in
57+
// Run Task
58+
group.addTask {
59+
try await serviceGroup.run()
60+
}
61+
62+
// Sleep for 1s to let poll loop receive log message
63+
try! await Task.sleep(for: .seconds(1))
64+
65+
// Shutdown the serviceGroup
66+
await serviceGroup.triggerGracefulShutdown()
67+
}
68+
69+
let recordedEvents = recorder.recordedEvents
70+
XCTAssertEqual(1, recordedEvents.count)
71+
72+
let expectedMessage = """
73+
[thrd:app]: Configuration property `sasl.mechanism` set to `GSSAPI` but `security.protocol` \
74+
is not configured for SASL: recommend setting `security.protocol` to SASL_SSL or SASL_PLAINTEXT
75+
"""
76+
let expectedLevel = Logger.Level.warning
77+
let expectedSource = "CONFWARN"
78+
79+
let receivedEvent = try XCTUnwrap(recordedEvents.first, "Expected log event, but found none")
80+
XCTAssertEqual(expectedMessage, receivedEvent.message.description)
81+
XCTAssertEqual(expectedLevel, receivedEvent.level)
82+
XCTAssertEqual(expectedSource, receivedEvent.source)
83+
}
84+
}

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Logging
16-
import NIOConcurrencyHelpers
1716
import NIOCore
1817
import ServiceLifecycle
1918
@testable import SwiftKafka
@@ -315,80 +314,4 @@ final class KafkaProducerTests: XCTestCase {
315314

316315
XCTAssertNil(producerCopy)
317316
}
318-
319-
// MARK: - Mocks
320-
321-
internal struct LogEvent {
322-
let level: Logger.Level
323-
let message: Logger.Message
324-
let source: String
325-
}
326-
327-
internal struct LogEventRecorder {
328-
let _recordedEvents = NIOLockedValueBox<[LogEvent]>([])
329-
330-
var recordedEvents: [LogEvent] {
331-
self._recordedEvents.withLockedValue { $0 }
332-
}
333-
334-
func record(_ event: LogEvent) {
335-
self._recordedEvents.withLockedValue { $0.append(event) }
336-
}
337-
}
338-
339-
internal struct MockLogHandler: LogHandler {
340-
let recorder: LogEventRecorder
341-
342-
init(recorder: LogEventRecorder) {
343-
self.recorder = recorder
344-
}
345-
346-
func log(
347-
level: Logger.Level,
348-
message: Logger.Message,
349-
metadata: Logger.Metadata?,
350-
source: String,
351-
file: String,
352-
function: String,
353-
line: UInt
354-
) {
355-
self.recorder.record(LogEvent(level: level, message: message, source: source))
356-
}
357-
358-
private var _logLevel: Logger.Level?
359-
var logLevel: Logger.Level {
360-
get {
361-
// get from config unless set
362-
return self._logLevel ?? .debug
363-
}
364-
set {
365-
self._logLevel = newValue
366-
}
367-
}
368-
369-
private var _metadataSet = false
370-
private var _metadata = Logger.Metadata() {
371-
didSet {
372-
self._metadataSet = true
373-
}
374-
}
375-
376-
public var metadata: Logger.Metadata {
377-
get {
378-
return self._metadata
379-
}
380-
set {
381-
self._metadata = newValue
382-
}
383-
}
384-
385-
subscript(metadataKey metadataKey: Logger.Metadata.Key) -> Logger.Metadata.Value? {
386-
get {
387-
return self._metadata[metadataKey]
388-
}
389-
set {
390-
self._metadata[metadataKey] = newValue
391-
}
392-
}
393-
}
394317
}

0 commit comments

Comments
 (0)