Skip to content

Commit 588af60

Browse files
Fix: KafkaClient.closeConsumer should not block (#73)
* Fix: make `KafkaConsumer.commitSync` non-blocking Motivation: Currently our invocation to `rd_kafka_commit` inside of `KafkaCosumer.commitSync` is blocking a cooperative thread. This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the callback-based commit API. Modifications: * move `commitSync` logic to `KafkaClient` * replace the blocking invocation to [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87) with a callback-based invocation to [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a) which is then wrapped inside a `withAsyncThrowingContinuation` statement * `KafkaClient.consumerClose`: make non-blocking Motivation: [rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d) was blocking. This PR proposes using the [rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d) API which is non-blocking and served through the normal poll loop. We now Modifications: * `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in favour of `rd_kakfa_consumer_close` * create a new variable `KafkaClient.isConsumerClosed` that indicates if the poll loop needs to continue polling or if it can stop running * updated state management in `KafkaConsumer` to accomodate for polling when the `KafkaConsumer` is in the process of closing Result: Calling `KafkaClient.consumerClose` is not blocking anymore. * Review Franz Modifications: * introduce new `KafkaConsumer.StateMachine.State` `.finishing` to avoid retaining `client` in state `.finished` * rename `KafkaConsumer.shutdownGracefully` to `KafkaConsumer.triggerGracefulShutdown` * add note that `KafkaConsumer.commitSync` does not support `Task` cancellation
1 parent 77d0b0e commit 588af60

File tree

3 files changed

+137
-58
lines changed

3 files changed

+137
-58
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,102 @@ final class KafkaClient {
118118
}
119119
}
120120

121-
/// Close the consumer.
121+
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
122+
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
123+
final class CapturedCommitCallback {
124+
typealias Closure = (Result<Void, KafkaError>) -> Void
125+
let closure: Closure
126+
127+
init(_ closure: @escaping Closure) {
128+
self.closure = closure
129+
}
130+
}
131+
132+
/// Non-blocking commit of a the `message`'s offset to Kafka.
133+
///
134+
/// - Parameter message: Last received message that shall be marked as read.
135+
func commitSync(_ message: KafkaConsumerMessage) async throws {
136+
// Declare captured closure outside of withCheckedContinuation.
137+
// We do that because do an unretained pass of the captured closure to
138+
// librdkafka which means we have to keep a reference to the closure
139+
// ourselves to make sure it does not get deallocated before
140+
// commitSync returns.
141+
var capturedClosure: CapturedCommitCallback!
142+
try await withCheckedThrowingContinuation { continuation in
143+
capturedClosure = CapturedCommitCallback { result in
144+
continuation.resume(with: result)
145+
}
146+
147+
// The offset committed is always the offset of the next requested message.
148+
// Thus, we increase the offset of the current message by one before committing it.
149+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
150+
let changesList = RDKafkaTopicPartitionList()
151+
changesList.setOffset(
152+
topic: message.topic,
153+
partition: message.partition,
154+
offset: Int64(message.offset + 1)
155+
)
156+
157+
// Unretained pass because the reference that librdkafka holds to capturedClosure
158+
// should not be counted in ARC as this can lead to memory leaks.
159+
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
160+
161+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
162+
163+
// Create a C closure that calls the captured closure
164+
let callbackWrapper: (
165+
@convention(c) (
166+
OpaquePointer?,
167+
rd_kafka_resp_err_t,
168+
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
169+
UnsafeMutableRawPointer?
170+
) -> Void
171+
) = { _, error, _, opaquePointer in
172+
173+
guard let opaquePointer = opaquePointer else {
174+
fatalError("Could not resolve reference to catpured Swift callback instance")
175+
}
176+
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()
177+
178+
let actualCallback = opaque.closure
179+
180+
if error == RD_KAFKA_RESP_ERR_NO_ERROR {
181+
actualCallback(.success(()))
182+
} else {
183+
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
184+
actualCallback(.failure(kafkaError))
185+
}
186+
}
187+
188+
changesList.withListPointer { listPointer in
189+
rd_kafka_commit_queue(
190+
self.kafkaHandle,
191+
listPointer,
192+
consumerQueue,
193+
callbackWrapper,
194+
opaquePointer
195+
)
196+
}
197+
}
198+
}
199+
200+
/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
201+
/// leaving the consumer group (if applicable).
202+
///
203+
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
122204
func consumerClose() throws {
123-
let result = rd_kafka_consumer_close(self.kafkaHandle)
124-
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
125-
throw KafkaError.rdKafkaError(wrapping: result)
205+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
206+
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
207+
let kafkaError = rd_kafka_error_code(result)
208+
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
209+
throw KafkaError.rdKafkaError(wrapping: kafkaError)
126210
}
127211
}
128212

213+
var isConsumerClosed: Bool {
214+
rd_kafka_consumer_closed(self.kafkaHandle) == 1
215+
}
216+
129217
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
130218
/// - Warning: Do not escape the pointer from the closure for later use.
131219
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 42 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
4040
}
4141

4242
func didTerminate() {
43-
// Duplicate of _shutdownGracefully
43+
// Duplicate of _triggerGracefulShutdown
4444
let action = self.stateMachine.withLockedValue { $0.finish() }
4545
switch action {
46-
case .shutdownGracefullyAndFinishSource(let client, let source):
46+
case .triggerGracefulShutdownAndFinishSource(let client, let source):
4747
source.finish()
4848

4949
do {
@@ -151,7 +151,7 @@ public final class KafkaConsumer {
151151
}
152152

153153
deinit {
154-
self.shutdownGracefully()
154+
self.triggerGracefulShutdown()
155155
}
156156

157157
/// Subscribe to the given list of `topics`.
@@ -201,16 +201,20 @@ public final class KafkaConsumer {
201201
switch nextAction {
202202
case .pollForAndYieldMessage(let client, let source):
203203
do {
204-
guard let message = try client.consumerPoll() else {
205-
break
204+
if let message = try client.consumerPoll() {
205+
// We do not support back pressure, we can ignore the yield result
206+
_ = source.yield(message)
206207
}
207-
// We do not support back pressure, we can ignore the yield result
208-
_ = source.yield(message)
209208
} catch {
210209
source.finish()
211210
throw error
212211
}
213212
try await Task.sleep(for: self.config.pollInterval)
213+
case .pollUntilClosed(let client):
214+
// Ignore poll result, we are closing down and just polling to commit
215+
// outstanding consumer state
216+
_ = try client.consumerPoll()
217+
try await Task.sleep(for: self.config.pollInterval)
214218
case .terminatePollLoop:
215219
return
216220
}
@@ -222,18 +226,8 @@ public final class KafkaConsumer {
222226
/// - Parameter message: Last received message that shall be marked as read.
223227
/// - Throws: A ``KafkaError`` if committing failed.
224228
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
229+
/// - Important: This method does not support `Task` cancellation.
225230
public func commitSync(_ message: KafkaConsumerMessage) async throws {
226-
try await withCheckedThrowingContinuation { continuation in
227-
do {
228-
try self._commitSync(message) // Blocks until commiting the offset is done
229-
continuation.resume()
230-
} catch {
231-
continuation.resume(throwing: error)
232-
}
233-
}
234-
}
235-
236-
private func _commitSync(_ message: KafkaConsumerMessage) throws {
237231
let action = self.stateMachine.withLockedValue { $0.commitSync() }
238232
switch action {
239233
case .throwClosedError:
@@ -243,41 +237,19 @@ public final class KafkaConsumer {
243237
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
244238
}
245239

246-
// The offset committed is always the offset of the next requested message.
247-
// Thus, we increase the offset of the current message by one before committing it.
248-
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
249-
let changesList = RDKafkaTopicPartitionList()
250-
changesList.setOffset(
251-
topic: message.topic,
252-
partition: message.partition,
253-
offset: Int64(message.offset + 1)
254-
)
255-
256-
let result = client.withKafkaHandlePointer { handle in
257-
changesList.withListPointer { listPointer in
258-
rd_kafka_commit(
259-
handle,
260-
listPointer,
261-
0
262-
) // Blocks until commiting the offset is done
263-
// -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68
264-
}
265-
}
266-
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
267-
throw KafkaError.rdKafkaError(wrapping: result)
268-
}
240+
try await client.commitSync(message)
269241
}
270242
}
271243

272244
/// This function is used to gracefully shut down a Kafka consumer client.
273245
///
274246
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
275247
/// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended.
276-
private func shutdownGracefully() {
248+
private func triggerGracefulShutdown() {
277249
let action = self.stateMachine.withLockedValue { $0.finish() }
278250
switch action {
279-
case .shutdownGracefullyAndFinishSource(let client, let source):
280-
self._shutdownGracefullyAndFinishSource(
251+
case .triggerGracefulShutdownAndFinishSource(let client, let source):
252+
self._triggerGracefulShutdownAndFinishSource(
281253
client: client,
282254
source: source,
283255
logger: self.logger
@@ -287,7 +259,7 @@ public final class KafkaConsumer {
287259
}
288260
}
289261

290-
private func _shutdownGracefullyAndFinishSource(
262+
private func _triggerGracefulShutdownAndFinishSource(
291263
client: KafkaClient,
292264
source: Producer.Source,
293265
logger: Logger
@@ -336,7 +308,12 @@ extension KafkaConsumer {
336308
client: KafkaClient,
337309
source: Producer.Source
338310
)
339-
/// The ``KafkaConsumer`` has been closed.
311+
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
312+
/// We are now in the process of commiting our last state to the broker.
313+
///
314+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
315+
case finishing(client: KafkaClient)
316+
/// The ``KafkaConsumer`` is closed.
340317
case finished
341318
}
342319

@@ -368,6 +345,11 @@ extension KafkaConsumer {
368345
client: KafkaClient,
369346
source: Producer.Source
370347
)
348+
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
349+
/// to commit its state to the broker.
350+
///
351+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
352+
case pollUntilClosed(client: KafkaClient)
371353
/// Terminate the poll loop.
372354
case terminatePollLoop
373355
}
@@ -376,14 +358,21 @@ extension KafkaConsumer {
376358
/// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken.
377359
///
378360
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
379-
func nextPollLoopAction() -> PollLoopAction {
361+
mutating func nextPollLoopAction() -> PollLoopAction {
380362
switch self.state {
381363
case .uninitialized:
382364
fatalError("\(#function) invoked while still in state \(self.state)")
383365
case .initializing:
384366
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
385367
case .consuming(let client, let source):
386368
return .pollForAndYieldMessage(client: client, source: source)
369+
case .finishing(let client):
370+
if client.isConsumerClosed {
371+
self.state = .finished
372+
return .terminatePollLoop
373+
} else {
374+
return .pollUntilClosed(client: client)
375+
}
387376
case .finished:
388377
return .terminatePollLoop
389378
}
@@ -409,7 +398,7 @@ extension KafkaConsumer {
409398
source: source
410399
)
411400
return .setUpConnection(client: client)
412-
case .consuming, .finished:
401+
case .consuming, .finishing, .finished:
413402
fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer")
414403
}
415404
}
@@ -438,7 +427,7 @@ extension KafkaConsumer {
438427
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
439428
case .consuming(let client, _):
440429
return .commitSync(client: client)
441-
case .finished:
430+
case .finishing, .finished:
442431
return .throwClosedError
443432
}
444433
}
@@ -449,7 +438,7 @@ extension KafkaConsumer {
449438
///
450439
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
451440
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
452-
case shutdownGracefullyAndFinishSource(
441+
case triggerGracefulShutdownAndFinishSource(
453442
client: KafkaClient,
454443
source: Producer.Source
455444
)
@@ -466,12 +455,12 @@ extension KafkaConsumer {
466455
case .initializing:
467456
fatalError("subscribe() / assign() should have been invoked before \(#function)")
468457
case .consuming(let client, let source):
469-
self.state = .finished
470-
return .shutdownGracefullyAndFinishSource(
458+
self.state = .finishing(client: client)
459+
return .triggerGracefulShutdownAndFinishSource(
471460
client: client,
472461
source: source
473462
)
474-
case .finished:
463+
case .finishing, .finished:
475464
return nil
476465
}
477466
}

Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ struct RDKafkaConfig {
7878
) -> CapturedClosures {
7979
let closures = CapturedClosures()
8080

81-
// Pass the the reference to Opaque as an opaque object
81+
// Pass the captured closure to the C closure as an opaque object.
82+
// Unretained pass because the reference that librdkafka holds to the captured closures
83+
// should not be counted in ARC as this can lead to memory leaks.
8284
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
8385
rd_kafka_conf_set_opaque(
8486
configPointer,

0 commit comments

Comments
 (0)