Skip to content

Commit 162e622

Browse files
authored
Re-instate unhandled packet processing (#78)
* Re-instate unhandled packet processing * swift format
1 parent 75f32f5 commit 162e622

File tree

4 files changed

+30
-32
lines changed

4 files changed

+30
-32
lines changed
Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import NIO
22

3+
/// Task handler.
34
final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
45
typealias InboundIn = MQTTPacket
56

67
var eventLoop: EventLoop!
8+
var client: MQTTClient
79

8-
init() {
10+
init(client: MQTTClient) {
11+
self.client = client
912
self.eventLoop = nil
1013
self.tasks = []
1114
}
@@ -16,11 +19,11 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
1619
}
1720
}
1821

19-
func _removeTask(_ task: MQTTTask) {
22+
private func _removeTask(_ task: MQTTTask) {
2023
self.tasks.removeAll { $0 === task }
2124
}
2225

23-
func removeTask(_ task: MQTTTask) {
26+
private func removeTask(_ task: MQTTTask) {
2427
if self.eventLoop.inEventLoop {
2528
self._removeTask(task)
2629
} else {
@@ -38,6 +41,7 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
3841
let response = self.unwrapInboundIn(data)
3942
for task in self.tasks {
4043
do {
44+
// should this task respond to inbound packet
4145
if try task.checkInbound(response) {
4246
self.removeTask(task)
4347
task.succeed(response)
@@ -49,42 +53,37 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
4953
return
5054
}
5155
}
56+
57+
self.processUnhandledPacket(response)
58+
}
59+
60+
/// process packets where no equivalent task was found
61+
func processUnhandledPacket(_ packet: MQTTPacket) {
62+
// we only send response to v5 server
63+
guard self.client.configuration.version == .v5_0 else { return }
64+
guard let connection = client.connection else { return }
65+
66+
switch packet.type {
67+
case .PUBREC:
68+
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: packet.packetId, reason: .packetIdentifierNotFound))
69+
case .PUBREL:
70+
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: packet.packetId, reason: .packetIdentifierNotFound))
71+
default:
72+
break
73+
}
5274
}
5375

5476
func channelInactive(context: ChannelHandlerContext) {
77+
// channel is inactive so we should fail or tasks in progress
5578
self.tasks.forEach { $0.fail(MQTTError.serverClosedConnection) }
5679
self.tasks.removeAll()
5780
}
5881

5982
func errorCaught(context: ChannelHandlerContext, error: Error) {
83+
// we caught an error so we should fail all active tasks
6084
self.tasks.forEach { $0.fail(error) }
6185
self.tasks.removeAll()
6286
}
6387

6488
var tasks: [MQTTTask]
6589
}
66-
67-
/// If packet reaches this handler then it was never dealt with by a task
68-
final class MQTTUnhandledPacketHandler: ChannelInboundHandler {
69-
typealias InboundIn = MQTTPacket
70-
let client: MQTTClient
71-
72-
init(client: MQTTClient) {
73-
self.client = client
74-
}
75-
76-
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
77-
// we only send response to v5 server
78-
guard self.client.configuration.version == .v5_0 else { return }
79-
guard let connection = client.connection else { return }
80-
let response = self.unwrapInboundIn(data)
81-
switch response.type {
82-
case .PUBREC:
83-
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: response.packetId, reason: .packetIdentifierNotFound))
84-
case .PUBREL:
85-
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: response.packetId, reason: .packetIdentifierNotFound))
86-
default:
87-
break
88-
}
89-
}
90-
}

Sources/MQTTNIO/MQTTConnection.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ final class MQTTConnection {
2222
}
2323

2424
static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
25-
let taskHandler = MQTTTaskHandler()
25+
let taskHandler = MQTTTaskHandler(client: client)
2626
return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler)
2727
.map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) }
2828
}

Sources/MQTTNIO/MQTTError.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public enum MQTTError: Error {
3232
case noConnection
3333
/// the server disconnected
3434
case serverDisconnection(MQTTAckV5)
35-
/// the server closed the connection
35+
/// the server closed the connection. If this happens during a publish you can resend
36+
/// the publish packet by reconnecting to server with `cleanSession` set to false.
3637
case serverClosedConnection
3738
/// received unexpected message from broker
3839
case unexpectedMessage

Sources/MQTTNIO/MQTTTask.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ import NIO
55
final class MQTTTask {
66
let promise: EventLoopPromise<MQTTPacket>
77
let checkInbound: (MQTTPacket) throws -> Bool
8-
let timeout: TimeAmount?
98
let timeoutTask: Scheduled<Void>?
109

1110
init(on eventLoop: EventLoop, timeout: TimeAmount?, checkInbound: @escaping (MQTTPacket) throws -> Bool) {
1211
let promise = eventLoop.makePromise(of: MQTTPacket.self)
1312
self.promise = promise
1413
self.checkInbound = checkInbound
15-
self.timeout = timeout
1614
if let timeout = timeout {
1715
self.timeoutTask = eventLoop.scheduleTask(in: timeout) {
1816
promise.fail(MQTTError.timeout)

0 commit comments

Comments
 (0)