Skip to content

Poll for messages using TaskExecutor #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Dispatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need this import here

Copy link
Member Author

@mimischi mimischi Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Left over from a refactor. Is there a linter that can help point out unused imports?

import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -72,6 +73,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
private let stateMachineHolder: MachineHolder
let pollInterval: Duration
#if swift(>=6.0)
let queue: NaiveQueueExecutor
#endif

private final class MachineHolder: Sendable { // only for deinit
let stateMachine: LockedMachine
Expand All @@ -88,21 +92,32 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
init(stateMachine: LockedMachine, pollInterval: Duration) {
self.stateMachineHolder = .init(stateMachine: stateMachine)
self.pollInterval = pollInterval
#if swift(>=6.0)
self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer"))
#endif
}

public func next() async throws -> Element? {
// swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165
// Currently use Task.sleep() if no new messages, should use task executor preference when implemented:
// https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md
while !Task.isCancelled {
let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }

switch action {
case .poll(let client):
if let message = try client.consumerPoll() { // non-blocking call
// Attempt to fetch a message synchronously. Bail
// immediately if no message is waiting for us.
if let message = try client.consumerPoll() {
return message
}

#if swift(>=6.0)
// Wait on a separate thread for the next message.
return try await withTaskExecutorPreference(queue) {
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens after the time out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We attempt to retrieve a message for self.pollInterval. If there's still no message, we return nil—the same behavior as in the above if let. I'd expect we get caught up in the while-loop on line 100 until we do receive a message eventually.

}
#else
// No messages. Sleep a little.
try await Task.sleep(for: self.pollInterval)
#endif
case .suspendPollLoop:
try await Task.sleep(for: self.pollInterval) // not started yet
case .terminatePollLoop:
Expand Down
4 changes: 2 additions & 2 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ public final class RDKafkaClient: Sendable {
///
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
func consumerPoll() throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else {
// No error, there might be no more messages
return nil
}
Expand Down
39 changes: 39 additions & 0 deletions Sources/Kafka/Utilities/NaiveQueueExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if swift(>=6.0)
import Dispatch

final class NaiveQueueExecutor: TaskExecutor {
let queue: DispatchQueue

init(_ queue: DispatchQueue) {
self.queue = queue
}

public func enqueue(_ _job: consuming ExecutorJob) {
let job = UnownedJob(_job)
queue.async {
job.runSynchronously(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktoso Should we call this runSynchronously or the one that also takes isolatedTo?

on: self.asUnownedTaskExecutor()
)
}
}

@inlinable
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}
}
#endif