Skip to content

Commit f72a33f

Browse files
committed
connect returns whether a session was restored for the client
1 parent c62f6f4 commit f72a33f

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,20 @@ final public class MQTTClient {
200200

201201
/// Connect to MQTT server
202202
///
203-
/// If set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
203+
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
204204
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
205205
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
206+
///
207+
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
208+
///
206209
/// - Parameters:
207210
/// - cleanSession: should we start with a new session
208211
/// - will: Publish message to be posted as soon as connection is made
209-
/// - Returns: Future waiting for connect to fiinsh
212+
/// - Returns: EventLoopFuture to be updated with whether server holds a session for this client
210213
public func connect(
211214
cleanSession: Bool = true,
212215
will: (topicName: String, payload: ByteBuffer, retain: Bool)? = nil
213-
) -> EventLoopFuture<Void> {
216+
) -> EventLoopFuture<Bool> {
214217
//guard self.connection == nil else { return eventLoopGroup.next().makeFailedFuture(Error.alreadyConnected) }
215218

216219
let info = MQTTConnectInfo(
@@ -241,7 +244,10 @@ final public class MQTTClient {
241244
return true
242245
}
243246
}
244-
.map { _ in }
247+
.map { message in
248+
guard let connack = message as? MQTTConnAckMessage else { return false }
249+
return connack.sessionPresent
250+
}
245251
}
246252

247253
/// Publish message to topic

Tests/MQTTNIOTests/MQTTNIOTests.swift

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ final class MQTTNIOTests: XCTestCase {
1212
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
1313

1414
func connect(to client: MQTTClient) throws {
15-
try client.connect().wait()
15+
_ = try client.connect().wait()
1616
}
1717

1818
func testConnectWithWill() throws {
1919
let client = createClient(identifier: "testConnectWithWill")
20-
try client.connect(
20+
_ = try client.connect(
2121
will: (topicName: "MyWillTopic", payload: ByteBufferAllocator().buffer(string: "Test payload"), retain: false)
2222
).wait()
2323
try client.ping().wait()
@@ -27,63 +27,63 @@ final class MQTTNIOTests: XCTestCase {
2727

2828
func testWebsocketConnect() throws {
2929
let client = createWebSocketClient(identifier: "testWebsocketConnect")
30-
try client.connect().wait()
30+
_ = try client.connect().wait()
3131
try client.ping().wait()
3232
try client.disconnect().wait()
3333
try client.syncShutdownGracefully()
3434
}
3535

3636
func testSSLConnect() throws {
3737
let client = try createSSLClient(identifier: "testSSLConnect")
38-
try client.connect().wait()
38+
_ = try client.connect().wait()
3939
try client.ping().wait()
4040
try client.disconnect().wait()
4141
try client.syncShutdownGracefully()
4242
}
4343

4444
func testWebsocketAndSSLConnect() throws {
4545
let client = try createWebSocketAndSSLClient(identifier: "testWebsocketAndSSLConnect")
46-
try client.connect().wait()
46+
_ = try client.connect().wait()
4747
try client.ping().wait()
4848
try client.disconnect().wait()
4949
try client.syncShutdownGracefully()
5050
}
5151

5252
func testMQTTPublishQoS0() throws {
5353
let client = self.createClient(identifier: "testMQTTPublishQoS0")
54-
try client.connect().wait()
54+
_ = try client.connect().wait()
5555
try client.publish(to: "testMQTTPublishQoS", payload: ByteBufferAllocator().buffer(string: "Test payload"), qos: .atMostOnce).wait()
5656
try client.disconnect().wait()
5757
try client.syncShutdownGracefully()
5858
}
5959

6060
func testMQTTPublishQoS1() throws {
6161
let client = try self.createSSLClient(identifier: "testMQTTPublishQoS1")
62-
try client.connect().wait()
62+
_ = try client.connect().wait()
6363
try client.publish(to: "testMQTTPublishQoS", payload: ByteBufferAllocator().buffer(string: "Test payload"), qos: .atLeastOnce).wait()
6464
try client.disconnect().wait()
6565
try client.syncShutdownGracefully()
6666
}
6767

6868
func testMQTTPublishQoS2() throws {
6969
let client = try self.createWebSocketAndSSLClient(identifier: "testMQTTPublishQoS2")
70-
try client.connect().wait()
70+
_ = try client.connect().wait()
7171
try client.publish(to: "testMQTTPublishQoS", payload: ByteBufferAllocator().buffer(string: "Test payload"), qos: .exactlyOnce).wait()
7272
try client.disconnect().wait()
7373
try client.syncShutdownGracefully()
7474
}
7575

7676
func testMQTTPingreq() throws {
7777
let client = self.createClient(identifier: "testMQTTPingreq")
78-
try client.connect().wait()
78+
_ = try client.connect().wait()
7979
try client.ping().wait()
8080
try client.disconnect().wait()
8181
try client.syncShutdownGracefully()
8282
}
8383

8484
func testMQTTSubscribe() throws {
8585
let client = self.createClient(identifier: "testMQTTSubscribe")
86-
try client.connect().wait()
86+
_ = try client.connect().wait()
8787
try client.subscribe(to: [.init(topicFilter: "iphone", qos: .atLeastOnce)]).wait()
8888
Thread.sleep(forTimeInterval: 5)
8989
try client.disconnect().wait()
@@ -103,7 +103,7 @@ final class MQTTNIOTests: XCTestCase {
103103
}
104104

105105
let client = self.createClient(identifier: "testMQTTServerDisconnect")
106-
try client.connect().wait()
106+
_ = try client.connect().wait()
107107
try client.connection?.sendMessageNoWait(MQTTForceDisconnectMessage()).wait()
108108
Thread.sleep(forTimeInterval: 2)
109109
XCTAssertFalse(client.isActive())
@@ -117,7 +117,7 @@ final class MQTTNIOTests: XCTestCase {
117117
let payload = ByteBufferAllocator().buffer(string: payloadString)
118118

119119
let client = self.createWebSocketClient(identifier: "testMQTTPublishToClient_publisher")
120-
try client.connect().wait()
120+
_ = try client.connect().wait()
121121
let client2 = self.createWebSocketClient(identifier: "testMQTTPublishToClient_subscriber")
122122
client2.addPublishListener(named: "test") { result in
123123
switch result {
@@ -132,7 +132,7 @@ final class MQTTNIOTests: XCTestCase {
132132
XCTFail("\(error)")
133133
}
134134
}
135-
try client2.connect().wait()
135+
_ = try client2.connect().wait()
136136
try client2.subscribe(to: [.init(topicFilter: "testMQTTAtLeastOnce", qos: .atLeastOnce)]).wait()
137137
try client2.subscribe(to: [.init(topicFilter: "testMQTTExactlyOnce", qos: .exactlyOnce)]).wait()
138138
try client.publish(to: "testMQTTAtLeastOnce", payload: payload, qos: .atLeastOnce).wait()
@@ -154,7 +154,7 @@ final class MQTTNIOTests: XCTestCase {
154154
let payload = ByteBufferAllocator().buffer(string: payloadString)
155155

156156
let client = self.createClient(identifier: "testUnsubscribe_publisher")
157-
try client.connect().wait()
157+
_ = try client.connect().wait()
158158
let client2 = self.createClient(identifier: "testUnsubscribe_subscriber")
159159
client2.addPublishListener(named: "test") { result in
160160
switch result {
@@ -169,7 +169,7 @@ final class MQTTNIOTests: XCTestCase {
169169
XCTFail("\(error)")
170170
}
171171
}
172-
try client2.connect().wait()
172+
_ = try client2.connect().wait()
173173
try client2.subscribe(to: [.init(topicFilter: "testUnsubscribe", qos: .atLeastOnce)]).wait()
174174
try client.publish(to: "testUnsubscribe", payload: payload, qos: .atLeastOnce).wait()
175175
try client2.unsubscribe(from: ["testUnsubscribe"]).wait()
@@ -192,7 +192,7 @@ final class MQTTNIOTests: XCTestCase {
192192
let payload = ByteBufferAllocator().buffer(data: payloadData)
193193

194194
let client = self.createWebSocketClient(identifier: "testMQTTPublishToClientLargePayload_publisher")
195-
try client.connect().wait()
195+
_ = try client.connect().wait()
196196
let client2 = self.createWebSocketClient(identifier: "testMQTTPublishToClientLargePayload_subscriber")
197197
client2.addPublishListener(named: "test") { result in
198198
switch result {
@@ -207,7 +207,7 @@ final class MQTTNIOTests: XCTestCase {
207207
XCTFail("\(error)")
208208
}
209209
}
210-
try client2.connect().wait()
210+
_ = try client2.connect().wait()
211211
try client2.subscribe(to: [.init(topicFilter: "testMQTTAtLeastOnce", qos: .atLeastOnce)]).wait()
212212
try client.publish(to: "testMQTTAtLeastOnce", payload: payload, qos: .atLeastOnce).wait()
213213
Thread.sleep(forTimeInterval: 2)
@@ -234,9 +234,9 @@ final class MQTTNIOTests: XCTestCase {
234234
}
235235
}
236236

237-
try client.connect().wait()
237+
_ = try client.connect().wait()
238238
// by connecting with same identifier the first client uses the first client is forced to disconnect
239-
try client2.connect().wait()
239+
_ = try client2.connect().wait()
240240

241241
Thread.sleep(forTimeInterval: 5)
242242
XCTAssertTrue(disconnected.load())
@@ -248,8 +248,11 @@ final class MQTTNIOTests: XCTestCase {
248248

249249
func testDoubleConnect() throws {
250250
let client = self.createClient(identifier: "DoubleConnect")
251-
try client.connect(cleanSession: true).wait()
252-
try client.connect(cleanSession: false).wait()
251+
_ = try client.connect(cleanSession: true).wait()
252+
let sessionPresent = try client.connect(cleanSession: false).wait()
253+
let sessionPresent2 = try client.connect(cleanSession: false).wait()
254+
XCTAssertFalse(sessionPresent)
255+
XCTAssertTrue(sessionPresent2)
253256
try client.disconnect().wait()
254257
try client.syncShutdownGracefully()
255258
}
@@ -262,7 +265,7 @@ final class MQTTNIOTests: XCTestCase {
262265
return nil
263266
}
264267
let client = self.createClient(identifier: "testMQTTPublishQoS2WithStall", timeout: .seconds(4))
265-
try client.connect().wait()
268+
_ = try client.connect().wait()
266269
try client.connection?.channel.pipeline.addHandler(stallHandler).wait()
267270
try client.publish(to: "testMQTTPublishQoS2WithStall", payload: ByteBufferAllocator().buffer(string: "Test payload"), qos: .exactlyOnce).wait()
268271
try client.disconnect().wait()
@@ -281,7 +284,7 @@ final class MQTTNIOTests: XCTestCase {
281284
let payload = ByteBufferAllocator().buffer(string: "This is the Test payload")
282285

283286
let client = self.createClient(identifier: "testMQTTPublishToClient_publisher", timeout: .seconds(2))
284-
try client.connect().wait()
287+
_ = try client.connect().wait()
285288
let client2 = self.createClient(identifier: "testMQTTPublishToClient_subscriber", timeout: .seconds(10))
286289
client2.addPublishListener(named: "test") { result in
287290
switch result {
@@ -297,7 +300,7 @@ final class MQTTNIOTests: XCTestCase {
297300
XCTFail("\(error)")
298301
}
299302
}
300-
try client2.connect().wait()
303+
_ = try client2.connect().wait()
301304
try client2.connection?.channel.pipeline.addHandler(stallHandler, position: .first).wait()
302305
try client2.subscribe(to: [.init(topicFilter: "testMQTTSubscribeQoS2WithStall", qos: .exactlyOnce)]).wait()
303306
try client.publish(to: "testMQTTSubscribeQoS2WithStall", payload: payload, qos: .exactlyOnce).wait()
@@ -319,7 +322,7 @@ final class MQTTNIOTests: XCTestCase {
319322
let payload = ByteBufferAllocator().buffer(string: payloadString)
320323

321324
let client = try self.createSSLClient(identifier: "testPersistentSession_publisher")
322-
try client.connect().wait()
325+
_ = try client.connect().wait()
323326
let client2 = try self.createSSLClient(identifier: "testPersistentSession_subscriber")
324327
client2.addPublishListener(named: "test") { result in
325328
switch result {
@@ -334,22 +337,22 @@ final class MQTTNIOTests: XCTestCase {
334337
XCTFail("\(error)")
335338
}
336339
}
337-
try client2.connect(cleanSession: true).wait()
338-
try client2.connect(cleanSession: false).wait()
340+
_ = try client2.connect(cleanSession: true).wait()
341+
_ = try client2.connect(cleanSession: false).wait()
339342
try client2.subscribe(to: [.init(topicFilter: "testMQTTAtLeastOnce", qos: .atLeastOnce)]).wait()
340343
try client.publish(to: "testMQTTAtLeastOnce", payload: payload, qos: .atLeastOnce).wait()
341344
Thread.sleep(forTimeInterval: 1)
342345
try client2.disconnect().wait()
343346
try client.publish(to: "testMQTTAtLeastOnce", payload: payload, qos: .atLeastOnce).wait()
344347
Thread.sleep(forTimeInterval: 1)
345348
// should receive previous publish on new connect as this is not a cleanSession
346-
try client2.connect(cleanSession: false).wait()
349+
_ = try client2.connect(cleanSession: false).wait()
347350
Thread.sleep(forTimeInterval: 1)
348351
try client2.disconnect().wait()
349352
Thread.sleep(forTimeInterval: 1)
350353
try client.publish(to: "testMQTTAtLeastOnce", payload: payload, qos: .atLeastOnce).wait()
351354
// should not receive previous publish on connect as this is a cleanSession
352-
try client2.connect(cleanSession: true).wait()
355+
_ = try client2.connect(cleanSession: true).wait()
353356
Thread.sleep(forTimeInterval: 1)
354357
lock.withLock {
355358
XCTAssertEqual(publishReceived.count, 2)

0 commit comments

Comments
 (0)