Skip to content

Commit e82f366

Browse files
committed
Move PingreqHandler into its own file
1 parent 6eac900 commit e82f366

File tree

2 files changed

+80
-76
lines changed

2 files changed

+80
-76
lines changed
Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import NIO
22

3+
/// Handler encoding MQTT Messages into ByteBuffers
34
final class MQTTEncodeHandler: ChannelOutboundHandler {
45
public typealias OutboundIn = MQTTOutboundMessage
56
public typealias OutboundOut = ByteBuffer
@@ -19,6 +20,7 @@ final class MQTTEncodeHandler: ChannelOutboundHandler {
1920
}
2021
}
2122

23+
/// Decode ByteBuffers into MQTT Messages
2224
struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
2325
typealias InboundOut = MQTTInboundMessage
2426

@@ -90,79 +92,3 @@ struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
9092
}
9193
}
9294

93-
/// Channel handler for sending PINGREQ messages to keep connect alive
94-
final class PingreqHandler: ChannelDuplexHandler {
95-
typealias OutboundIn = MQTTOutboundMessage
96-
typealias OutboundOut = MQTTOutboundMessage
97-
typealias InboundIn = MQTTInboundMessage
98-
typealias InboundOut = MQTTInboundMessage
99-
100-
let client: MQTTClient
101-
let timeout: TimeAmount
102-
var lastEventTime: NIODeadline
103-
var task: Scheduled<Void>?
104-
105-
init(client: MQTTClient, timeout: TimeAmount) {
106-
self.client = client
107-
self.timeout = timeout
108-
self.lastEventTime = .now()
109-
self.task = nil
110-
}
111-
112-
public func handlerAdded(context: ChannelHandlerContext) {
113-
if context.channel.isActive {
114-
scheduleTask(context)
115-
}
116-
}
117-
118-
public func handlerRemoved(context: ChannelHandlerContext) {
119-
cancelTask()
120-
}
121-
122-
public func channelActive(context: ChannelHandlerContext) {
123-
if self.task == nil {
124-
scheduleTask(context)
125-
}
126-
context.fireChannelActive()
127-
}
128-
129-
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
130-
self.lastEventTime = .now()
131-
context.fireChannelRead(data)
132-
}
133-
134-
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
135-
self.lastEventTime = .now()
136-
context.write(data, promise: promise)
137-
}
138-
139-
func scheduleTask(_ context: ChannelHandlerContext) {
140-
guard context.channel.isActive else { return }
141-
142-
self.task = context.eventLoop.scheduleTask(deadline: lastEventTime + timeout) {
143-
// if lastEventTime plus the timeout is less than now send PINGREQ
144-
// otherwise reschedule task
145-
if self.lastEventTime + self.timeout <= .now() {
146-
guard context.channel.isActive else { return }
147-
148-
self.client.pingreq().whenComplete { result in
149-
switch result {
150-
case .failure(let error):
151-
context.fireErrorCaught(error)
152-
case .success:
153-
break
154-
}
155-
self.lastEventTime = .now()
156-
self.scheduleTask(context)
157-
}
158-
} else {
159-
self.scheduleTask(context)
160-
}
161-
}
162-
}
163-
164-
func cancelTask() {
165-
self.task?.cancel()
166-
self.task = nil
167-
}
168-
}

Sources/MQTTNIO/PingreqHandler.swift

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import NIO
2+
3+
/// Channel handler for sending PINGREQ messages to keep connect alive
4+
final class PingreqHandler: ChannelDuplexHandler {
5+
typealias OutboundIn = MQTTOutboundMessage
6+
typealias OutboundOut = MQTTOutboundMessage
7+
typealias InboundIn = MQTTInboundMessage
8+
typealias InboundOut = MQTTInboundMessage
9+
10+
let client: MQTTClient
11+
let timeout: TimeAmount
12+
var lastEventTime: NIODeadline
13+
var task: Scheduled<Void>?
14+
15+
init(client: MQTTClient, timeout: TimeAmount) {
16+
self.client = client
17+
self.timeout = timeout
18+
self.lastEventTime = .now()
19+
self.task = nil
20+
}
21+
22+
public func handlerAdded(context: ChannelHandlerContext) {
23+
if context.channel.isActive {
24+
scheduleTask(context)
25+
}
26+
}
27+
28+
public func handlerRemoved(context: ChannelHandlerContext) {
29+
cancelTask()
30+
}
31+
32+
public func channelActive(context: ChannelHandlerContext) {
33+
if self.task == nil {
34+
scheduleTask(context)
35+
}
36+
context.fireChannelActive()
37+
}
38+
39+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
40+
self.lastEventTime = .now()
41+
context.fireChannelRead(data)
42+
}
43+
44+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
45+
self.lastEventTime = .now()
46+
context.write(data, promise: promise)
47+
}
48+
49+
func scheduleTask(_ context: ChannelHandlerContext) {
50+
guard context.channel.isActive else { return }
51+
52+
self.task = context.eventLoop.scheduleTask(deadline: lastEventTime + timeout) {
53+
// if lastEventTime plus the timeout is less than now send PINGREQ
54+
// otherwise reschedule task
55+
if self.lastEventTime + self.timeout <= .now() {
56+
guard context.channel.isActive else { return }
57+
58+
self.client.pingreq().whenComplete { result in
59+
switch result {
60+
case .failure(let error):
61+
context.fireErrorCaught(error)
62+
case .success:
63+
break
64+
}
65+
self.lastEventTime = .now()
66+
self.scheduleTask(context)
67+
}
68+
} else {
69+
self.scheduleTask(context)
70+
}
71+
}
72+
}
73+
74+
func cancelTask() {
75+
self.task?.cancel()
76+
self.task = nil
77+
}
78+
}

0 commit comments

Comments
 (0)