diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index f44b2444..bd3f5542 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -72,6 +72,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { public struct AsyncIterator: AsyncIteratorProtocol { private let stateMachineHolder: MachineHolder let pollInterval: Duration + #if swift(>=6.0) + private let queue: DispatchQueueTaskExecutor + #endif private final class MachineHolder: Sendable { // only for deinit let stateMachine: LockedMachine @@ -88,21 +91,35 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { init(stateMachine: LockedMachine, pollInterval: Duration) { self.stateMachineHolder = .init(stateMachine: stateMachine) self.pollInterval = pollInterval + #if swift(>=6.0) + self.queue = DispatchQueueTaskExecutor( + DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer") + ) + #endif } public func next() async throws -> Element? { - // swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165 - // Currently use Task.sleep() if no new messages, should use task executor preference when implemented: - // https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md while !Task.isCancelled { let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } switch action { case .poll(let client): - if let message = try client.consumerPoll() { // non-blocking call + // Attempt to fetch a message synchronously. Bail + // immediately if no message is waiting for us. + if let message = try client.consumerPoll() { return message } + + #if swift(>=6.0) + // Wait on a separate thread for the next message. + // The call below will block for `pollInterval`. + return try await withTaskExecutorPreference(queue) { + try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds)) + } + #else + // No messages. Sleep a little. try await Task.sleep(for: self.pollInterval) + #endif case .suspendPollLoop: try await Task.sleep(for: self.pollInterval) // not started yet case .terminatePollLoop: diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 40f914fe..68c22c02 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -447,8 +447,8 @@ public final class RDKafkaClient: Sendable { /// /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - func consumerPoll() throws -> KafkaConsumerMessage? { - guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else { + func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else { // No error, there might be no more messages return nil } diff --git a/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift new file mode 100644 index 00000000..2a2051b4 --- /dev/null +++ b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift @@ -0,0 +1,39 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if swift(>=6.0) +import Dispatch + +final class DispatchQueueTaskExecutor: TaskExecutor { + let queue: DispatchQueue + + init(_ queue: DispatchQueue) { + self.queue = queue + } + + public func enqueue(_ _job: consuming ExecutorJob) { + let job = UnownedJob(_job) + queue.async { + job.runSynchronously( + on: self.asUnownedTaskExecutor() + ) + } + } + + @inlinable + public func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } +} +#endif