Skip to content

Commit 493ca8b

Browse files
authored
End publish listener on connection close if clean session is set to true (#171)
1 parent 270c973 commit 493ca8b

File tree

7 files changed

+283
-156
lines changed

7 files changed

+283
-156
lines changed

.github/workflows/ci.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ on:
1010
release:
1111
types: [published]
1212
workflow_dispatch:
13+
concurrency:
14+
group: ${{ github.workflow }}-${{ github.ref }}-ci
15+
cancel-in-progress: true
1316

1417
jobs:
1518
macos:
1619
runs-on: macOS-latest
20+
timeout-minutes: 15
1721
steps:
1822
- name: Checkout
1923
uses: actions/checkout@v4
@@ -37,6 +41,7 @@ jobs:
3741

3842
ios:
3943
runs-on: macOS-latest
44+
timeout-minutes: 15
4045
steps:
4146
- name: Checkout
4247
uses: actions/checkout@v4
@@ -46,13 +51,13 @@ jobs:
4651
4752
linux:
4853
runs-on: ubuntu-latest
54+
timeout-minutes: 15
4955
strategy:
5056
matrix:
5157
tag:
52-
- swift:5.9
5358
- swift:5.10
5459
- swift:6.0
55-
- swiftlang/swift:nightly-6.1-jammy
60+
- swift:6.1
5661
container:
5762
image: ${{ matrix.tag }}
5863
services:

Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,18 +173,25 @@ public class MQTTPublishListener: AsyncSequence {
173173
let name = UUID().uuidString
174174
self.client = client
175175
self.name = name
176+
let cleanSession = client.connection?.cleanSession ?? true
176177
self.stream = AsyncStream { cont in
177178
client.addPublishListener(named: name) { result in
178179
cont.yield(result)
179180
}
180181
client.addShutdownListener(named: name) { _ in
181182
cont.finish()
182183
}
184+
client.addCloseListener(named: name) { connectResult in
185+
if cleanSession {
186+
cont.finish()
187+
}
188+
}
183189
}
184190
}
185191

186192
deinit {
187193
self.client.removePublishListener(named: self.name)
194+
self.client.removeCloseListener(named: self.name)
188195
self.client.removeShutdownListener(named: self.name)
189196
}
190197

Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,18 +166,25 @@ public class MQTTPublishIdListener: AsyncSequence {
166166
let name = UUID().uuidString
167167
self.client = client.client
168168
self.name = name
169+
let cleanSession = client.client.connection?.cleanSession ?? true
169170
self.stream = AsyncStream { cont in
170171
client.addPublishListener(named: name, subscriptionId: subscriptionId) { result in
171172
cont.yield(result)
172173
}
173174
client.client.addShutdownListener(named: name) { _ in
174175
cont.finish()
175176
}
177+
client.client.addCloseListener(named: name) { connectResult in
178+
if cleanSession {
179+
cont.finish()
180+
}
181+
}
176182
}
177183
}
178184

179185
deinit {
180186
self.client.removePublishListener(named: self.name)
187+
self.client.removeCloseListener(named: self.name)
181188
self.client.removeShutdownListener(named: self.name)
182189
}
183190

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,9 +463,9 @@ public final class MQTTClient {
463463
}
464464

465465
var connectionParameters = ConnectionParameters()
466-
let publishListeners = MQTTListeners<MQTTPublishInfo>()
467-
let closeListeners = MQTTListeners<Void>()
468-
let shutdownListeners = MQTTListeners<Void>()
466+
let publishListeners = MQTTListeners<Result<MQTTPublishInfo, Error>>()
467+
let closeListeners = MQTTListeners<Result<Void, Error>>()
468+
let shutdownListeners = MQTTListeners<Result<Void, Error>>()
469469
private var _connection: MQTTConnection?
470470
private var lock = NIOLock()
471471
}
@@ -477,8 +477,17 @@ extension MQTTClient {
477477
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil
478478
) -> EventLoopFuture<MQTTConnAckPacket> {
479479
let pingInterval = self.configuration.pingInterval ?? TimeAmount.seconds(max(Int64(packet.keepAliveSeconds - 5), 5))
480-
481-
let connectFuture = MQTTConnection.create(client: self, pingInterval: pingInterval)
480+
var cleanSession = packet.cleanSession
481+
// if connection has non zero session expiry then assume it doesnt clean session on close
482+
for p in packet.properties {
483+
// check topic alias
484+
if case .sessionExpiryInterval(let interval) = p {
485+
if interval > 0 {
486+
cleanSession = false
487+
}
488+
}
489+
}
490+
let connectFuture = MQTTConnection.create(client: self, cleanSession: cleanSession, pingInterval: pingInterval)
482491
let eventLoop = connectFuture.eventLoop
483492
return
484493
connectFuture

Sources/MQTTNIO/MQTTConnection.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,21 @@ import NIOSSL
3030

3131
final class MQTTConnection {
3232
let channel: Channel
33+
let cleanSession: Bool
3334
let timeout: TimeAmount?
3435
let taskHandler: MQTTTaskHandler
3536

36-
private init(channel: Channel, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) {
37+
private init(channel: Channel, cleanSession: Bool, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) {
3738
self.channel = channel
39+
self.cleanSession = cleanSession
3840
self.timeout = timeout
3941
self.taskHandler = taskHandler
4042
}
4143

42-
static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
44+
static func create(client: MQTTClient, cleanSession: Bool, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
4345
let taskHandler = MQTTTaskHandler(client: client)
4446
return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler)
45-
.map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) }
47+
.map { MQTTConnection(channel: $0, cleanSession: cleanSession, timeout: client.configuration.timeout, taskHandler: taskHandler) }
4648
}
4749

4850
static func createBootstrap(client: MQTTClient, pingInterval: TimeAmount, taskHandler: MQTTTaskHandler) -> EventLoopFuture<Channel> {

Sources/MQTTNIO/MQTTListeners.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import NIO
1515
import NIOConcurrencyHelpers
1616

1717
final class MQTTListeners<ReturnType> {
18-
typealias Listener = (Result<ReturnType, Error>) -> Void
18+
typealias Listener = (ReturnType) -> Void
1919

20-
func notify(_ result: Result<ReturnType, Error>) {
20+
func notify(_ result: ReturnType) {
2121
let listeners = self.lock.withLock {
2222
return self.listeners
2323
}

0 commit comments

Comments
 (0)