Skip to content

Commit 8fe6e21

Browse files
committed
Add timeout error for tasks that are expected to return a message
1 parent cf05ca7 commit 8fe6e21

File tree

3 files changed

+38
-20
lines changed

3 files changed

+38
-20
lines changed

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class MQTTClient {
1919
case unexpectedMessage
2020
case decodeError
2121
case websocketUpgradeFailed
22+
case timeout
2223
}
2324
/// EventLoopGroup used by MQTTCllent
2425
let eventLoopGroup: EventLoopGroup
@@ -28,10 +29,10 @@ public class MQTTClient {
2829
let host: String
2930
/// Port to connect to
3031
let port: Int
31-
/// Called whenever a publish event occurs
32-
let publishCallback: (Result<MQTTPublishInfo, Swift.Error>) -> ()
3332
/// Client configuration
3433
let configuration: Configuration
34+
/// Called whenever a publish event occurs
35+
let publishCallback: (Result<MQTTPublishInfo, Swift.Error>) -> ()
3536

3637
/// Channel client is running on
3738
var channel: Channel?
@@ -44,22 +45,35 @@ public class MQTTClient {
4445
public struct Configuration {
4546
public init(
4647
disablePingreq: Bool = false,
48+
pingreqInterval: TimeAmount? = nil,
49+
timeout: TimeAmount? = nil,
4750
useSSL: Bool = false,
4851
useWebSockets: Bool = false,
4952
tlsConfiguration: TLSConfiguration? = nil,
50-
webSocketURLPath: String? = nil)
51-
{
53+
webSocketURLPath: String? = nil
54+
) {
5255
self.disablePingreq = disablePingreq
56+
self.pingreqInterval = pingreqInterval
57+
self.timeout = timeout
5358
self.useSSL = useSSL
5459
self.useWebSockets = useWebSockets
5560
self.tlsConfiguration = tlsConfiguration
5661
self.webSocketURLPath = webSocketURLPath
5762
}
5863

64+
/// disable the sending of pingreq messages
5965
let disablePingreq: Bool
66+
/// override internal between each pingreq message
67+
let pingreqInterval: TimeAmount?
68+
/// timeout for server response
69+
let timeout: TimeAmount?
70+
/// use encrypted connection to server
6071
let useSSL: Bool
72+
/// use a websocket connection to server
6173
let useWebSockets: Bool
74+
/// TLS configuration
6275
let tlsConfiguration: TLSConfiguration?
76+
/// URL Path for web socket. Defaults to "/"
6377
let webSocketURLPath: String?
6478
}
6579

@@ -130,8 +144,10 @@ public class MQTTClient {
130144
/// - Returns: Future waiting for connect to fiinsh
131145
public func connect(info: MQTTConnectInfo, will: MQTTPublishInfo? = nil) -> EventLoopFuture<Void> {
132146
guard self.channel == nil else { return eventLoopGroup.next().makeFailedFuture(Error.alreadyConnected) }
133-
let timeout = TimeAmount.seconds(max(Int64(info.keepAliveSeconds - 5), 5))
134-
return createBootstrap(pingreqTimeout: timeout)
147+
// work out pingreq interval
148+
let pingreqInterval = configuration.pingreqInterval ?? TimeAmount.seconds(max(Int64(info.keepAliveSeconds - 5), 5))
149+
150+
return createBootstrap(pingreqInterval: pingreqInterval)
135151
.flatMap { _ -> EventLoopFuture<MQTTInboundMessage> in
136152
self.clientIdentifier = info.clientIdentifier
137153
return self.sendMessage(MQTTConnectMessage(connect: info, will: nil)) { message in
@@ -267,7 +283,7 @@ extension MQTTClient {
267283
return bootstrap
268284
}
269285

270-
func createBootstrap(pingreqTimeout: TimeAmount) -> EventLoopFuture<Void> {
286+
func createBootstrap(pingreqInterval: TimeAmount) -> EventLoopFuture<Void> {
271287
let promise = self.eventLoopGroup.next().makePromise(of: Void.self)
272288
do {
273289
// Work out what handlers to add
@@ -276,7 +292,7 @@ extension MQTTClient {
276292
ByteToMessageHandler(ByteToMQTTMessageDecoder(client: self))
277293
]
278294
if !configuration.disablePingreq {
279-
handlers = [PingreqHandler(client: self, timeout: pingreqTimeout)] + handlers
295+
handlers = [PingreqHandler(client: self, timeout: pingreqInterval)] + handlers
280296
}
281297
// get bootstrap based off what eventloop we are running on
282298
let bootstrap = try getBootstrap(self.eventLoopGroup)
@@ -351,7 +367,7 @@ extension MQTTClient {
351367

352368
func sendMessage(_ message: MQTTOutboundMessage, checkInbound: @escaping (MQTTInboundMessage) throws -> Bool) -> EventLoopFuture<MQTTInboundMessage> {
353369
guard let channel = self.channel else { return eventLoopGroup.next().makeFailedFuture(Error.noConnection) }
354-
let task = MQTTTask(on: eventLoopGroup.next(), checkInbound: checkInbound)
370+
let task = MQTTTask(on: eventLoopGroup.next(), timeout: configuration.timeout, checkInbound: checkInbound)
355371
let taskHandler = MQTTTaskHandler(task: task, channel: channel)
356372

357373
channel.pipeline.addHandler(taskHandler)

Sources/MQTTNIO/MQTTTask.swift

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,30 @@
22
import NIO
33

44
class MQTTTask {
5-
let eventLoop: EventLoop
65
let promise: EventLoopPromise<MQTTInboundMessage>
76
let checkInbound: (MQTTInboundMessage) throws -> Bool
7+
let timeoutTask: Scheduled<Void>?
88

9-
init(on eventLoop: EventLoop, checkInbound: @escaping (MQTTInboundMessage) throws -> Bool) {
10-
self.eventLoop = eventLoop
9+
init(on eventLoop: EventLoop, timeout: TimeAmount?, checkInbound: @escaping (MQTTInboundMessage) throws -> Bool) {
1110
self.checkInbound = checkInbound
12-
self.promise = self.eventLoop.makePromise(of: MQTTInboundMessage.self)
11+
let promise = eventLoop.makePromise(of: MQTTInboundMessage.self)
12+
self.promise = promise
13+
if let timeout = timeout {
14+
self.timeoutTask = eventLoop.scheduleTask(in: timeout) {
15+
promise.fail(MQTTClient.Error.timeout)
16+
}
17+
} else {
18+
self.timeoutTask = nil
19+
}
1320
}
1421

1522
func succeed(_ response: MQTTInboundMessage) {
23+
timeoutTask?.cancel()
1624
promise.succeed(response)
1725
}
1826

1927
func fail(_ error: Error) {
28+
timeoutTask?.cancel()
2029
promise.fail(error)
2130
}
2231
}

Tests/MQTTNIOTests/MQTTNIOTests.swift

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,6 @@ final class MQTTNIOTests: XCTestCase {
7171
try client.connect(info: connect).wait()
7272
}
7373

74-
func testBootstrap() throws {
75-
let client = self.createClient()
76-
_ = try client.createBootstrap(pingreqTimeout: .seconds(10)).wait()
77-
Thread.sleep(forTimeInterval: 10)
78-
try client.syncShutdownGracefully()
79-
}
80-
8174
func testWebsocketConnect() throws {
8275
let client = createWebSocketClient()
8376
try connect(to: client, identifier: "connect")

0 commit comments

Comments
 (0)