Skip to content

Commit aa3222c

Browse files
authored
Async/Await (#34)
* Async versions of functions * Fixes to a/a build after rebase * Add V5 async/await functions
1 parent 976a4b6 commit aa3222c

File tree

4 files changed

+279
-0
lines changed

4 files changed

+279
-0
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ let package = Package(
2323
.product(name: "NIOWebSocket", package: "swift-nio"),
2424
.product(name: "NIOSSL", package: "swift-nio-ssl", condition: .when(platforms: [.linux, .macOS])),
2525
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
26+
.product(name: "_NIOConcurrency", package: "swift-nio"),
2627
]),
2728
.testTarget(name: "MQTTNIOTests", dependencies: ["MQTTNIO"]),
2829
]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#if compiler(>=5.5)
2+
3+
import _NIOConcurrency
4+
import NIO
5+
6+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
7+
extension MQTTClient {
8+
/// Connect to MQTT server
9+
///
10+
/// Completes when CONNACK is received
11+
///
12+
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
13+
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
14+
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
15+
///
16+
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
17+
///
18+
/// - Parameters:
19+
/// - cleanSession: should we start with a new session
20+
/// - will: Publish message to be posted as soon as connection is made
21+
/// - Returns: Whether server holds a session for this client
22+
@discardableResult public func connect(
23+
cleanSession: Bool = true,
24+
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil
25+
) async throws -> Bool {
26+
return try await self.connect(cleanSession: cleanSession, will: will).get()
27+
}
28+
29+
/// Publish message to topic
30+
///
31+
/// Depending on QoS completes when message is sent, when PUBACK is received or when PUBREC
32+
/// and following PUBCOMP are received
33+
///
34+
/// Waits for publish to complete. Depending on QoS setting the future will complete
35+
/// when message is sent, when PUBACK is received or when PUBREC and following PUBCOMP are
36+
/// received
37+
/// - Parameters:
38+
/// - topicName: Topic name on which the message is published
39+
/// - payload: Message payload
40+
/// - qos: Quality of Service for message.
41+
/// - retain: Whether this is a retained message.
42+
public func publish(to topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool = false) async throws {
43+
return try await self.publish(to: topicName, payload: payload, qos: qos).get()
44+
}
45+
46+
/// Subscribe to topic
47+
///
48+
/// Completes when SUBACK is received
49+
/// - Parameter subscriptions: Subscription infos
50+
public func subscribe(to subscriptions: [MQTTSubscribeInfo]) async throws -> MQTTSuback {
51+
return try await self.subscribe(to: subscriptions).get()
52+
}
53+
54+
/// Unsubscribe from topic
55+
///
56+
/// Completes when UNSUBACK is received
57+
/// - Parameter subscriptions: List of subscriptions to unsubscribe from
58+
public func unsubscribe(from subscriptions: [String]) async throws {
59+
return try await unsubscribe(from: subscriptions).get()
60+
}
61+
62+
/// Ping the server to test if it is still alive and to tell it you are alive.
63+
///
64+
/// Completes when PINGRESP is received
65+
///
66+
/// You shouldn't need to call this as the `MQTTClient` automatically sends PINGREQ messages to the server to ensure
67+
/// the connection is still live. If you initialize the client with the configuration `disablePingReq: true` then these
68+
/// are disabled and it is up to you to send the PINGREQ messages yourself
69+
public func ping() async throws {
70+
return try await ping().get()
71+
}
72+
73+
/// Disconnect from server
74+
public func disconnect() async throws {
75+
return try await disconnect().get()
76+
}
77+
78+
}
79+
80+
#endif // compiler(>=5.5)
81+
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#if compiler(>=5.5)
2+
3+
import _NIOConcurrency
4+
import NIO
5+
6+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
7+
extension MQTTClient.V5 {
8+
/// Connect to MQTT server
9+
///
10+
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
11+
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
12+
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
13+
///
14+
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
15+
///
16+
/// - Parameters:
17+
/// - cleanSession: should we start with a new session
18+
/// - properties: properties to attach to connect message
19+
/// - will: Publish message to be posted as soon as connection is made
20+
/// - Returns: EventLoopFuture to be updated with connack
21+
public func connect(
22+
cleanStart: Bool = true,
23+
properties: MQTTProperties = .init(),
24+
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil,
25+
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil
26+
) async throws -> MQTTConnackV5 {
27+
return try await connect(cleanStart: cleanStart, properties: properties, will: will, authWorkflow: authWorkflow).get()
28+
}
29+
30+
/// Publish message to topic
31+
/// - Parameters:
32+
/// - topicName: Topic name on which the message is published
33+
/// - payload: Message payload
34+
/// - qos: Quality of Service for message.
35+
/// - retain: Whether this is a retained message.
36+
/// - properties: properties to attach to publish message
37+
/// - Returns: Future waiting for publish to complete. Depending on QoS setting the future will complete
38+
/// when message is sent, when PUBACK is received or when PUBREC and following PUBCOMP are
39+
/// received. QoS1 and above return an `MQTTAckV5` which contains a `reason` and `properties`
40+
public func publish(
41+
to topicName: String,
42+
payload: ByteBuffer,
43+
qos: MQTTQoS,
44+
retain: Bool = false,
45+
properties: MQTTProperties = .init()
46+
) async throws -> MQTTAckV5? {
47+
return try await publish(to: topicName, payload: payload, qos: qos, retain: retain, properties: properties).get()
48+
}
49+
50+
/// Subscribe to topic
51+
/// - Parameters:
52+
/// - subscriptions: Subscription infos
53+
/// - properties: properties to attach to subscribe message
54+
/// - Returns: Future waiting for subscribe to complete. Will wait for SUBACK message from server and
55+
/// return its contents
56+
public func subscribe(
57+
to subscriptions: [MQTTSubscribeInfoV5],
58+
properties: MQTTProperties = .init()
59+
) async throws -> MQTTSubackV5 {
60+
return try await subscribe(to: subscriptions, properties: properties).get()
61+
}
62+
63+
/// Unsubscribe from topic
64+
/// - Parameters:
65+
/// - subscriptions: List of subscriptions to unsubscribe from
66+
/// - properties: properties to attach to unsubscribe message
67+
/// - Returns: Future waiting for unsubscribe to complete. Will wait for UNSUBACK message from server and
68+
/// return its contents
69+
public func unsubscribe(
70+
from subscriptions: [String],
71+
properties: MQTTProperties = .init()
72+
) async throws -> MQTTSubackV5 {
73+
return try await unsubscribe(from: subscriptions, properties: properties).get()
74+
}
75+
76+
/// Disconnect from server
77+
/// - Parameter properties: properties to attach to disconnect packet
78+
/// - Returns: Future waiting on disconnect message to be sent
79+
public func disconnect(properties: MQTTProperties = .init()) async throws {
80+
return try await disconnect(properties: properties).get()
81+
}
82+
83+
/// Re-authenticate with server
84+
///
85+
/// - Parameters:
86+
/// - properties: properties to attach to auth packet. Must include `authenticationMethod`
87+
/// - authWorkflow: Respond to auth packets from server
88+
/// - Returns: final auth packet returned from server
89+
public func auth(
90+
properties: MQTTProperties,
91+
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil
92+
) async throws -> MQTTAuthV5 {
93+
return try await auth(properties: properties, authWorkflow: authWorkflow).get()
94+
}
95+
}
96+
97+
#endif // compiler(>=5.5)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#if compiler(>=5.5)
2+
3+
import XCTest
4+
import Foundation
5+
import Logging
6+
import NIO
7+
import NIOConcurrencyHelpers
8+
import NIOFoundationCompat
9+
import NIOHTTP1
10+
#if canImport(NIOSSL)
11+
import NIOSSL
12+
#endif
13+
@testable import MQTTNIO
14+
15+
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
16+
final class AsyncMQTTNIOTests: XCTestCase {
17+
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
18+
static let logger: Logger = {
19+
var logger = Logger(label: "MQTTTests")
20+
logger.logLevel = .trace
21+
return logger
22+
}()
23+
24+
func XCTRunAsyncAndBlock(_ closure: @escaping () async throws -> Void) {
25+
let dg = DispatchGroup()
26+
dg.enter()
27+
Task {
28+
do {
29+
try await closure()
30+
} catch {
31+
XCTFail("\(error)")
32+
}
33+
dg.leave()
34+
}
35+
dg.wait()
36+
}
37+
38+
func createClient(identifier: String, timeout: TimeAmount? = .seconds(10)) -> MQTTClient {
39+
MQTTClient(
40+
host: Self.hostname,
41+
port: 1883,
42+
identifier: identifier,
43+
eventLoopGroupProvider: .createNew,
44+
logger: Self.logger,
45+
configuration: .init(timeout: timeout)
46+
)
47+
}
48+
49+
func testConnect() {
50+
let client = createClient(identifier: "testConnect+async")
51+
XCTRunAsyncAndBlock {
52+
try await client.connect()
53+
try await client.disconnect()
54+
}
55+
}
56+
57+
func testPublishSubscribe() {
58+
let client = createClient(identifier: "testPublish+async")
59+
let client2 = createClient(identifier: "testPublish+async2")
60+
let payloadString = "Hello"
61+
XCTRunAsyncAndBlock {
62+
try await client.connect()
63+
try await client2.connect()
64+
_ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)])
65+
client2.addPublishListener(named: "test") { result in
66+
switch result {
67+
case .success(let publish):
68+
var buffer = publish.payload
69+
let string = buffer.readString(length: buffer.readableBytes)
70+
XCTAssertEqual(string, payloadString)
71+
case .failure(let error):
72+
XCTFail("\(error)")
73+
}
74+
}
75+
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
76+
try await client.disconnect()
77+
Thread.sleep(forTimeInterval: 2)
78+
try await client2.disconnect()
79+
}
80+
}
81+
82+
func testPing() {
83+
let client = MQTTClient(
84+
host: Self.hostname,
85+
port: 1883,
86+
identifier: "TestPing",
87+
eventLoopGroupProvider: .createNew,
88+
logger: Self.logger,
89+
configuration: .init(disablePing: true)
90+
)
91+
92+
XCTRunAsyncAndBlock {
93+
try await client.connect()
94+
try await client.ping()
95+
try await client.disconnect()
96+
}
97+
}
98+
}
99+
100+
#endif // compiler(>=5.5)

0 commit comments

Comments
 (0)