Skip to content

Commit 4601dec

Browse files
authored
Fix inflight publish msgs (#89)
* Added test for inflight * Fixed inflight PUBLISH messages Only delete them if you get a malformed packet error * Update MQTTClient.swift
1 parent b606c3b commit 4601dec

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,9 @@ internal extension MQTTClient {
629629
}
630630
.flatMapErrorThrowing { error in
631631
// if publish caused server to close the connection then remove from inflight array
632-
if case MQTTError.serverClosedConnection = error {
632+
if case MQTTError.serverDisconnection(let ack) = error,
633+
ack.reason == .malformedPacket
634+
{
633635
self.inflight.remove(id: packet.packetId)
634636
}
635637
throw error

Tests/MQTTNIOTests/MQTTNIOTests.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,39 @@ final class MQTTNIOTests: XCTestCase {
435435
try client2.disconnect().wait()
436436
}
437437

438+
func testInflight() throws {
439+
let expectation = XCTestExpectation(description: "testPersistentSession")
440+
expectation.expectedFulfillmentCount = 1
441+
442+
let client = self.createClient(identifier: "testInflight")
443+
defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) }
444+
let client2 = self.createClient(identifier: "testPersistentSession_subscriber")
445+
defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) }
446+
_ = try client2.connect(cleanSession: true).wait()
447+
client2.addPublishListener(named: "test") { result in
448+
switch result {
449+
case .success:
450+
expectation.fulfill()
451+
452+
case .failure(let error):
453+
XCTFail("\(error)")
454+
}
455+
}
456+
457+
_ = try client.connect(cleanSession: true).wait()
458+
_ = try client.connect(cleanSession: false).wait()
459+
_ = client.publish(to: "test/Inflight", payload: ByteBuffer(string: "Flying"), qos: .exactlyOnce)
460+
try client.disconnect().wait()
461+
462+
_ = try client2.subscribe(to: [.init(topicFilter: "test/Inflight", qos: .atLeastOnce)]).wait()
463+
_ = try client.connect(cleanSession: false).wait()
464+
465+
wait(for: [expectation], timeout: 5.0)
466+
467+
try client.disconnect().wait()
468+
try client2.disconnect().wait()
469+
}
470+
438471
func testSubscribeAll() throws {
439472
if ProcessInfo.processInfo.environment["CI"] != nil {
440473
return

0 commit comments

Comments
 (0)