Skip to content

Commit 9254159

Browse files
committed
Poll for messages using TaskExecutor
We currently sleep for `pollInterval` when no new messages have been polled from the cluster. This leads to unnecessary slowness of the client. Instead of doing that, we now break up the polling of messages into two distinct approaches: 1. Attempt to poll synchronously: if there a message is polled, we return it. If there is no message, we immediately go to step 2. 2. We create a `DispatchQueue` and run the `consumerPoll` on it using `withTaskExecutorPreference`. We make the `consumerPoll` call wait for up to `pollInterval` before bailing. This prevents us from sleeping on the running thread, and frees up cycles to do other work if required. Resolves #165
1 parent 9986c5b commit 9254159

File tree

3 files changed

+54
-7
lines changed

3 files changed

+54
-7
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Dispatch
1516
import Logging
1617
import NIOConcurrencyHelpers
1718
import NIOCore
@@ -72,6 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
7273
public struct AsyncIterator: AsyncIteratorProtocol {
7374
private let stateMachineHolder: MachineHolder
7475
let pollInterval: Duration
76+
let queue: NaiveQueueExecutor
7577

7678
private final class MachineHolder: Sendable { // only for deinit
7779
let stateMachine: LockedMachine
@@ -88,21 +90,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
8890
init(stateMachine: LockedMachine, pollInterval: Duration) {
8991
self.stateMachineHolder = .init(stateMachine: stateMachine)
9092
self.pollInterval = pollInterval
93+
self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer"))
9194
}
9295

9396
public func next() async throws -> Element? {
94-
// swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165
95-
// Currently use Task.sleep() if no new messages, should use task executor preference when implemented:
96-
// https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md
9797
while !Task.isCancelled {
9898
let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }
9999

100100
switch action {
101101
case .poll(let client):
102-
if let message = try client.consumerPoll() { // non-blocking call
102+
// Attempt to fetch a message synchronously. Bail
103+
// immediately if no message is waiting for us.
104+
if let message = try client.consumerPoll() {
103105
return message
104106
}
105-
try await Task.sleep(for: self.pollInterval)
107+
108+
#if swift(>=6.0)
109+
// Wait on a separate thread for the next message.
110+
return try await withTaskExecutorPreference(queue) {
111+
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds))
112+
}
113+
#else
114+
// No messages. Sleep a little.
115+
return try await Task.sleep(for: self.pollInterval)
116+
#endif
106117
case .suspendPollLoop:
107118
try await Task.sleep(for: self.pollInterval) // not started yet
108119
case .terminatePollLoop:

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ public final class RDKafkaClient: Sendable {
446446
///
447447
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
448448
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
449-
func consumerPoll() throws -> KafkaConsumerMessage? {
450-
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
449+
func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? {
450+
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else {
451451
// No error, there might be no more messages
452452
return nil
453453
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-client open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Dispatch
16+
17+
final class NaiveQueueExecutor: TaskExecutor {
18+
let queue: DispatchQueue
19+
20+
init(_ queue: DispatchQueue) {
21+
self.queue = queue
22+
}
23+
24+
public func enqueue(_ _job: consuming ExecutorJob) {
25+
let job = UnownedJob(_job)
26+
queue.async {
27+
job.runSynchronously(
28+
on: self.asUnownedTaskExecutor())
29+
}
30+
}
31+
32+
@inlinable
33+
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
34+
UnownedTaskExecutor(ordinary: self)
35+
}
36+
}

0 commit comments

Comments
 (0)