Skip to content

Commit 4a74297

Browse files
mimischiktoso
andauthored
Poll for messages using TaskExecutor (swift-server#178)
We currently sleep for `pollInterval` when no new messages have been polled from the cluster. This leads to unnecessary slowness of the client. Instead of doing that, we now break up the polling of messages into two distinct approaches: 1. Attempt to poll synchronously: if there a message is polled, we return it. If there is no message, we immediately go to step 2. 2. We create a `DispatchQueue` and run the `consumerPoll` on it using `withTaskExecutorPreference`. We make the `consumerPoll` call wait for up to `pollInterval` before bailing. This prevents us from sleeping on the running thread, and frees up cycles to do other work if required. Resolves swift-server#165 --------- Co-authored-by: Konrad `ktoso` Malawski <[email protected]>
1 parent ebfa094 commit 4a74297

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
7272
public struct AsyncIterator: AsyncIteratorProtocol {
7373
private let stateMachineHolder: MachineHolder
7474
let pollInterval: Duration
75+
#if swift(>=6.0)
76+
private let queue: DispatchQueueTaskExecutor
77+
#endif
7578

7679
private final class MachineHolder: Sendable { // only for deinit
7780
let stateMachine: LockedMachine
@@ -88,21 +91,35 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
8891
init(stateMachine: LockedMachine, pollInterval: Duration) {
8992
self.stateMachineHolder = .init(stateMachine: stateMachine)
9093
self.pollInterval = pollInterval
94+
#if swift(>=6.0)
95+
self.queue = DispatchQueueTaskExecutor(
96+
DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")
97+
)
98+
#endif
9199
}
92100

93101
public func next() async throws -> Element? {
94-
// swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165
95-
// Currently use Task.sleep() if no new messages, should use task executor preference when implemented:
96-
// https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md
97102
while !Task.isCancelled {
98103
let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }
99104

100105
switch action {
101106
case .poll(let client):
102-
if let message = try client.consumerPoll() { // non-blocking call
107+
// Attempt to fetch a message synchronously. Bail
108+
// immediately if no message is waiting for us.
109+
if let message = try client.consumerPoll() {
103110
return message
104111
}
112+
113+
#if swift(>=6.0)
114+
// Wait on a separate thread for the next message.
115+
// The call below will block for `pollInterval`.
116+
return try await withTaskExecutorPreference(queue) {
117+
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds))
118+
}
119+
#else
120+
// No messages. Sleep a little.
105121
try await Task.sleep(for: self.pollInterval)
122+
#endif
106123
case .suspendPollLoop:
107124
try await Task.sleep(for: self.pollInterval) // not started yet
108125
case .terminatePollLoop:

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ public final class RDKafkaClient: Sendable {
447447
///
448448
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
449449
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
450-
func consumerPoll() throws -> KafkaConsumerMessage? {
451-
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
450+
func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? {
451+
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else {
452452
// No error, there might be no more messages
453453
return nil
454454
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-client open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if swift(>=6.0)
16+
import Dispatch
17+
18+
final class DispatchQueueTaskExecutor: TaskExecutor {
19+
let queue: DispatchQueue
20+
21+
init(_ queue: DispatchQueue) {
22+
self.queue = queue
23+
}
24+
25+
public func enqueue(_ _job: consuming ExecutorJob) {
26+
let job = UnownedJob(_job)
27+
queue.async {
28+
job.runSynchronously(
29+
on: self.asUnownedTaskExecutor()
30+
)
31+
}
32+
}
33+
34+
@inlinable
35+
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
36+
UnownedTaskExecutor(ordinary: self)
37+
}
38+
}
39+
#endif

0 commit comments

Comments
 (0)