Skip to content

Commit 9268027

Browse files
authored
Add an option to redirect librdkafka logging to Logger (#61)
* Add an option to redirect librdkafka logging to Logger * address PR feedback * polishes
1 parent b0bc014 commit 9268027

File tree

5 files changed

+114
-37
lines changed

5 files changed

+114
-37
lines changed

.gitignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
/*.xcodeproj
55
xcuserdata/
66
DerivedData/
7-
.swiftpm/config/registries.json
8-
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
97
.netrc
108
Package.resolved
119
.*.sw?
10+
.swiftpm

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ final class KafkaClient {
2424
/// Handle for the C library's Kafka instance.
2525
private let kafkaHandle: OpaquePointer
2626
/// References the opaque object passed to the config to ensure ARC retains it as long as the client exists.
27-
private let opaque: RDKafkaConfig.CapturedClosure?
27+
private let opaque: RDKafkaConfig.CapturedClosures
2828
/// A logger.
2929
private let logger: Logger
3030

3131
init(
3232
kafkaHandle: OpaquePointer,
33-
opaque: RDKafkaConfig.CapturedClosure?,
33+
opaque: RDKafkaConfig.CapturedClosures,
3434
logger: Logger
3535
) {
3636
self.kafkaHandle = kafkaHandle

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public actor KafkaProducer {
110110
configDictionary: config.dictionary,
111111
// Having no callback will discard any incoming acknowledgement messages
112112
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
113-
callback: nil,
113+
deliveryReportCallback: nil,
114114
logger: logger
115115
)
116116

@@ -149,7 +149,7 @@ public actor KafkaProducer {
149149
let client = try RDKafka.createClient(
150150
type: .producer,
151151
configDictionary: config.dictionary,
152-
callback: { [logger, streamContinuation] messageResult in
152+
deliveryReportCallback: { [logger, streamContinuation] messageResult in
153153
guard let messageResult else {
154154
logger.error("Could not resolve acknowledged message")
155155
return

Sources/SwiftKafka/RDKafka/RDKafka.swift

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,24 @@ struct RDKafka {
2727
static func createClient(
2828
type: ClientType,
2929
configDictionary: [String: String],
30-
callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil,
30+
deliveryReportCallback: RDKafkaConfig.CapturedClosures.DeliveryReportClosure? = nil,
3131
logger: Logger
3232
) throws -> KafkaClient {
3333
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
3434

3535
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)
3636

37-
let closurePointer: RDKafkaConfig.CapturedClosure?
38-
if let callback {
39-
// CapturedClosure must be retained by KafkaClient as long as message acknowledgements are received
40-
closurePointer = RDKafkaConfig.setDeliveryReportCallback(configPointer: rdConfig, callback)
41-
} else {
42-
closurePointer = nil
37+
// Check that delivery report callback can be only set for producer
38+
guard deliveryReportCallback == nil || type == .producer else {
39+
fatalError("Delivery report callback can't be defined for consumer client")
4340
}
4441

42+
let opaque = RDKafkaConfig.setCallbackClosures(
43+
configPointer: rdConfig,
44+
deliveryReportCallback: deliveryReportCallback,
45+
logger: logger
46+
)
47+
4548
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
4649
defer { errorChars.deallocate() }
4750

@@ -58,6 +61,6 @@ struct RDKafka {
5861
throw KafkaError.client(reason: errorString)
5962
}
6063

61-
return KafkaClient(kafkaHandle: handle, opaque: closurePointer, logger: logger)
64+
return KafkaClient(kafkaHandle: handle, opaque: opaque, logger: logger)
6265
}
6366
}

Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift

Lines changed: 98 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Crdkafka
16+
import Logging
1617

1718
/// A collection of helper functions wrapping common `rd_kafka_conf_*` functions in Swift.
1819
struct RDKafkaConfig {
1920
typealias KafkaAcknowledgementResult = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
2021
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
21-
final class CapturedClosure {
22-
typealias Closure = (KafkaAcknowledgementResult?) -> Void
23-
let closure: Closure
22+
final class CapturedClosures {
23+
typealias DeliveryReportClosure = (KafkaAcknowledgementResult?) -> Void
24+
var deliveryReportClosure: DeliveryReportClosure?
2425

25-
init(_ closure: @escaping Closure) {
26-
self.closure = closure
27-
}
26+
typealias LoggingClosure = (Int32, UnsafePointer<CChar>, UnsafePointer<CChar>) -> Void
27+
var loggingClosure: LoggingClosure?
28+
29+
init() { }
2830
}
2931

3032
/// Create a new `rd_kafka_conf_t` object in memory and initialize it with the given configuration properties.
@@ -63,46 +65,119 @@ struct RDKafkaConfig {
6365
}
6466
}
6567

66-
/// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`.
67-
/// Defines a function that is called upon every message acknowledgement.
68+
/// Registers passed closures as callbacks and sets the application's opaque pointer that will be passed to callbacks
69+
/// - Parameter type: Kafka client type: `Consumer` or `Producer`
6870
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
69-
/// - Parameter callback: A closure that is invoked upon message acknowledgement.
70-
/// - Returns: A ``CapturedClosure`` object that must me retained by the caller as long as acknowledgements are received.
71-
static func setDeliveryReportCallback(
71+
/// - Parameter deliveryReportCallback: A closure that is invoked upon message acknowledgement.
72+
/// - Parameter logger: Logger instance
73+
/// - Returns: A ``CapturedClosures`` object that must me retained by the caller as long as it exists.
74+
static func setCallbackClosures(
7275
configPointer: OpaquePointer,
73-
_ callback: @escaping ((KafkaAcknowledgementResult?) -> Void)
74-
) -> CapturedClosure {
75-
let capturedClosure = CapturedClosure(callback)
76-
// Pass the captured closure to the C closure as an opaque object
77-
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
76+
deliveryReportCallback: CapturedClosures.DeliveryReportClosure? = nil,
77+
logger: Logger
78+
) -> CapturedClosures {
79+
let closures = CapturedClosures()
80+
81+
// Pass the the reference to Opaque as an opaque object
82+
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
7883
rd_kafka_conf_set_opaque(
7984
configPointer,
8085
opaquePointer
8186
)
8287

88+
// Set delivery report callback
89+
if let deliveryReportCallback {
90+
Self.setDeliveryReportCallback(configPointer: configPointer, capturedClosures: closures, deliveryReportCallback)
91+
}
92+
// Set logging callback
93+
Self.setLoggingCallback(configPointer: configPointer, capturedClosures: closures, logger: logger)
94+
95+
return closures
96+
}
97+
98+
/// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`.
99+
/// Defines a function that is called upon every message acknowledgement.
100+
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
101+
/// - Parameter callback: A closure that is invoked upon message acknowledgement.
102+
private static func setDeliveryReportCallback(
103+
configPointer: OpaquePointer,
104+
capturedClosures: CapturedClosures,
105+
_ deliveryReportCallback: @escaping RDKafkaConfig.CapturedClosures.DeliveryReportClosure
106+
) {
107+
capturedClosures.deliveryReportClosure = deliveryReportCallback
108+
83109
// Create a C closure that calls the captured closure
84110
let callbackWrapper: (
85111
@convention(c) (OpaquePointer?, UnsafePointer<rd_kafka_message_t>?, UnsafeMutableRawPointer?) -> Void
86112
) = { _, messagePointer, opaquePointer in
87113
guard let opaquePointer = opaquePointer else {
88-
fatalError("Could not resolve reference to KafkaProducer instance")
114+
fatalError("Could not resolve reference to CapturedClosures")
89115
}
90-
let opaque = Unmanaged<CapturedClosure>.fromOpaque(opaquePointer).takeUnretainedValue()
116+
let closures = Unmanaged<CapturedClosures>.fromOpaque(opaquePointer).takeUnretainedValue()
91117

92-
let actualCallback = opaque.closure
118+
guard let actualCallback = closures.deliveryReportClosure else {
119+
fatalError("Delivery report callback is set, but user closure is not defined")
120+
}
93121
let messageResult = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer)
94122
actualCallback(messageResult)
95-
96-
// The messagePointer is automatically destroyed by librdkafka
97-
// For safety reasons, we only use it inside of this callback
98123
}
99124

100125
rd_kafka_conf_set_dr_msg_cb(
101126
configPointer,
102127
callbackWrapper
103128
)
129+
}
130+
131+
/// A Swift wrapper for `rd_kafka_conf_set_log_cb`.
132+
/// Defines a function that is called upon every log and redirects output to ``logger``.
133+
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
134+
/// - Parameter logger: Logger instance
135+
private static func setLoggingCallback(
136+
configPointer: OpaquePointer,
137+
capturedClosures: CapturedClosures,
138+
logger: Logger
139+
) {
140+
let loggingClosure: RDKafkaConfig.CapturedClosures.LoggingClosure = { level, fac, buf in
141+
// Mapping according to https://en.wikipedia.org/wiki/Syslog
142+
switch level {
143+
case 0 ... 2: /* Emergency, Alert, Critical */
144+
logger.critical(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
145+
case 3: /* Error */
146+
logger.error(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
147+
case 4: /* Warning */
148+
logger.warning(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
149+
case 5: /* Notice */
150+
logger.notice(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
151+
case 6: /* Informational */
152+
logger.info(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
153+
default: /* Debug */
154+
logger.debug(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
155+
}
156+
}
157+
capturedClosures.loggingClosure = loggingClosure
158+
159+
let loggingWrapper: (
160+
@convention(c) (OpaquePointer?, Int32, UnsafePointer<CChar>?, UnsafePointer<CChar>?) -> Void
161+
) = { rkKafkaT, level, fac, buf in
162+
guard let fac, let buf else {
163+
return
164+
}
165+
166+
guard let opaquePointer = rd_kafka_opaque(rkKafkaT) else {
167+
fatalError("Could not resolve reference to CapturedClosures")
168+
}
169+
let opaque = Unmanaged<CapturedClosures>.fromOpaque(opaquePointer).takeUnretainedValue()
170+
171+
guard let closure = opaque.loggingClosure else {
172+
fatalError("Could not resolve logger instance")
173+
}
174+
closure(level, fac, buf)
175+
}
104176

105-
return capturedClosure
177+
rd_kafka_conf_set_log_cb(
178+
configPointer,
179+
loggingWrapper
180+
)
106181
}
107182

108183
/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.

0 commit comments

Comments
 (0)