@@ -34,20 +34,6 @@ final class AsyncMQTTNIOTests: XCTestCase {
34
34
return logger
35
35
} ( )
36
36
37
- func XCTRunAsyncAndBlock( _ closure: @escaping ( ) async throws -> Void ) {
38
- let dg = DispatchGroup ( )
39
- dg. enter ( )
40
- Task {
41
- do {
42
- try await closure ( )
43
- } catch {
44
- XCTFail ( " \( error) " )
45
- }
46
- dg. leave ( )
47
- }
48
- dg. wait ( )
49
- }
50
-
51
37
func createClient( identifier: String , version: MQTTClient . Version = . v3_1_1, timeout: TimeAmount ? = . seconds( 10 ) ) -> MQTTClient {
52
38
MQTTClient (
53
39
host: Self . hostname,
@@ -59,49 +45,45 @@ final class AsyncMQTTNIOTests: XCTestCase {
59
45
)
60
46
}
61
47
62
- func testConnect( ) {
48
+ func testConnect( ) async throws {
63
49
let client = self . createClient ( identifier: " testConnect+async " )
64
- self . XCTRunAsyncAndBlock {
65
- try await client. connect ( )
66
- try await client. disconnect ( )
67
- try await client. shutdown ( )
68
- }
50
+ try await client. connect ( )
51
+ try await client. disconnect ( )
52
+ try await client. shutdown ( )
69
53
}
70
54
71
- func testPublishSubscribe( ) {
55
+ func testPublishSubscribe( ) async throws {
72
56
let expectation = XCTestExpectation ( description: " testPublishSubscribe " )
73
57
expectation. expectedFulfillmentCount = 1
74
58
75
59
let client = self . createClient ( identifier: " testPublish+async " )
76
60
let client2 = self . createClient ( identifier: " testPublish+async2 " )
77
61
let payloadString = " Hello "
78
- self . XCTRunAsyncAndBlock {
79
- try await client. connect ( )
80
- try await client2. connect ( )
81
- _ = try await client2. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] )
82
- client2. addPublishListener ( named: " test " ) { result in
83
- switch result {
84
- case . success( let publish) :
85
- var buffer = publish. payload
86
- let string = buffer. readString ( length: buffer. readableBytes)
87
- XCTAssertEqual ( string, payloadString)
88
- expectation. fulfill ( )
89
- case . failure( let error) :
90
- XCTFail ( " \( error) " )
91
- }
62
+ try await client. connect ( )
63
+ try await client2. connect ( )
64
+ _ = try await client2. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] )
65
+ client2. addPublishListener ( named: " test " ) { result in
66
+ switch result {
67
+ case . success( let publish) :
68
+ var buffer = publish. payload
69
+ let string = buffer. readString ( length: buffer. readableBytes)
70
+ XCTAssertEqual ( string, payloadString)
71
+ expectation. fulfill ( )
72
+ case . failure( let error) :
73
+ XCTFail ( " \( error) " )
92
74
}
93
- try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
75
+ }
76
+ try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
94
77
95
- self . wait ( for: [ expectation] , timeout: 2 )
78
+ self . wait ( for: [ expectation] , timeout: 2 )
96
79
97
- try await client. disconnect ( )
98
- try await client2. disconnect ( )
99
- try await client. shutdown ( )
100
- try await client2. shutdown ( )
101
- }
80
+ try await client. disconnect ( )
81
+ try await client2. disconnect ( )
82
+ try await client. shutdown ( )
83
+ try await client2. shutdown ( )
102
84
}
103
85
104
- func testPing( ) {
86
+ func testPing( ) async throws {
105
87
let client = MQTTClient (
106
88
host: Self . hostname,
107
89
port: 1883 ,
@@ -111,15 +93,13 @@ final class AsyncMQTTNIOTests: XCTestCase {
111
93
configuration: . init( disablePing: true )
112
94
)
113
95
114
- self . XCTRunAsyncAndBlock {
115
- try await client. connect ( )
116
- try await client. ping ( )
117
- try await client. disconnect ( )
118
- try await client. shutdown ( )
119
- }
96
+ try await client. connect ( )
97
+ try await client. ping ( )
98
+ try await client. disconnect ( )
99
+ try await client. shutdown ( )
120
100
}
121
101
122
- func testAsyncSequencePublishListener( ) {
102
+ func testAsyncSequencePublishListener( ) async throws {
123
103
let expectation = XCTestExpectation ( description: " testAsyncSequencePublishListener " )
124
104
expectation. expectedFulfillmentCount = 2
125
105
let finishExpectation = XCTestExpectation ( description: " testAsyncSequencePublishListener.finish " )
@@ -128,43 +108,41 @@ final class AsyncMQTTNIOTests: XCTestCase {
128
108
let client = self . createClient ( identifier: " testAsyncSequencePublishListener+async " , version: . v5_0)
129
109
let client2 = self . createClient ( identifier: " testAsyncSequencePublishListener+async2 " , version: . v5_0)
130
110
131
- self . XCTRunAsyncAndBlock {
132
- try await client. connect ( )
133
- try await client2. connect ( )
134
- _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] )
135
- let task = Task {
136
- let publishListener = client2. createPublishListener ( )
137
- for await result in publishListener {
138
- switch result {
139
- case . success( let publish) :
140
- var buffer = publish. payload
141
- let string = buffer. readString ( length: buffer. readableBytes)
142
- print ( " Received: \( string ?? " nothing " ) " )
143
- expectation. fulfill ( )
144
-
145
- case . failure( let error) :
146
- XCTFail ( " \( error) " )
147
- }
111
+ try await client. connect ( )
112
+ try await client2. connect ( )
113
+ _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] )
114
+ let task = Task {
115
+ let publishListener = client2. createPublishListener ( )
116
+ for await result in publishListener {
117
+ switch result {
118
+ case . success( let publish) :
119
+ var buffer = publish. payload
120
+ let string = buffer. readString ( length: buffer. readableBytes)
121
+ print ( " Received: \( string ?? " nothing " ) " )
122
+ expectation. fulfill ( )
123
+
124
+ case . failure( let error) :
125
+ XCTFail ( " \( error) " )
148
126
}
149
- finishExpectation. fulfill ( )
150
127
}
151
- try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: " Hello " ) , qos: . atLeastOnce)
152
- try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: " Goodbye " ) , qos: . atLeastOnce)
153
- try await client. disconnect ( )
128
+ finishExpectation. fulfill ( )
129
+ }
130
+ try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: " Hello " ) , qos: . atLeastOnce)
131
+ try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: " Goodbye " ) , qos: . atLeastOnce)
132
+ try await client. disconnect ( )
154
133
155
- self . wait ( for: [ expectation] , timeout: 5.0 )
134
+ self . wait ( for: [ expectation] , timeout: 5.0 )
156
135
157
- try await client2. disconnect ( )
158
- try await client. shutdown ( )
159
- try await client2. shutdown ( )
136
+ try await client2. disconnect ( )
137
+ try await client. shutdown ( )
138
+ try await client2. shutdown ( )
160
139
161
- self . wait ( for: [ finishExpectation] , timeout: 5.0 )
140
+ self . wait ( for: [ finishExpectation] , timeout: 5.0 )
162
141
163
- _ = await task. result
164
- }
142
+ _ = await task. result
165
143
}
166
144
167
- func testAsyncSequencePublishSubscriptionIdListener( ) {
145
+ func testAsyncSequencePublishSubscriptionIdListener( ) async throws {
168
146
let expectation = XCTestExpectation ( description: " publish listener " )
169
147
let expectation2 = XCTestExpectation ( description: " publish listener2 " )
170
148
expectation. expectedFulfillmentCount = 3
@@ -173,38 +151,38 @@ final class AsyncMQTTNIOTests: XCTestCase {
173
151
let client = self . createClient ( identifier: " testAsyncSequencePublishSubscriptionIdListener+async " , version: . v5_0)
174
152
let client2 = self . createClient ( identifier: " testAsyncSequencePublishSubscriptionIdListener+async2 " , version: . v5_0)
175
153
let payloadString = " Hello "
176
- self . XCTRunAsyncAndBlock {
177
- try await client. connect ( )
178
- try await client2. connect ( )
179
- _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] , properties: [ . subscriptionIdentifier( 1 ) ] )
180
- _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject2 " , qos: . atLeastOnce) ] , properties: [ . subscriptionIdentifier( 2 ) ] )
181
- let task = Task {
182
- let publishListener = client2. v5. createPublishListener ( subscriptionId: 1 )
183
- for await _ in publishListener {
184
- expectation. fulfill ( )
185
- }
154
+
155
+ try await client. connect ( )
156
+ try await client2. connect ( )
157
+ _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject " , qos: . atLeastOnce) ] , properties: [ . subscriptionIdentifier( 1 ) ] )
158
+ _ = try await client2. v5. subscribe ( to: [ . init( topicFilter: " TestSubject2 " , qos: . atLeastOnce) ] , properties: [ . subscriptionIdentifier( 2 ) ] )
159
+ let task = Task {
160
+ let publishListener = client2. v5. createPublishListener ( subscriptionId: 1 )
161
+ for await _ in publishListener {
186
162
expectation. fulfill ( )
187
163
}
188
- let task2 = Task {
189
- let publishListener = client2 . v5 . createPublishListener ( subscriptionId : 2 )
190
- for await _ in publishListener {
191
- expectation2 . fulfill ( )
192
- }
164
+ expectation . fulfill ( )
165
+ }
166
+ let task2 = Task {
167
+ let publishListener = client2 . v5 . createPublishListener ( subscriptionId : 2 )
168
+ for await _ in publishListener {
193
169
expectation2. fulfill ( )
194
170
}
195
- try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
196
- try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
197
- try await client. publish ( to: " TestSubject2 " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
198
- try await client. disconnect ( )
199
- Thread . sleep ( forTimeInterval: 0.5 )
200
- try await client2. disconnect ( )
201
- Thread . sleep ( forTimeInterval: 0.5 )
202
- try client. syncShutdownGracefully ( )
203
- try client2. syncShutdownGracefully ( )
204
-
205
- _ = await task. result
206
- _ = await task2. result
171
+ expectation2. fulfill ( )
207
172
}
173
+ try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
174
+ try await client. publish ( to: " TestSubject " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
175
+ try await client. publish ( to: " TestSubject2 " , payload: ByteBufferAllocator ( ) . buffer ( string: payloadString) , qos: . atLeastOnce)
176
+ try await client. disconnect ( )
177
+ Thread . sleep ( forTimeInterval: 0.5 )
178
+ try await client2. disconnect ( )
179
+ Thread . sleep ( forTimeInterval: 0.5 )
180
+ try client. syncShutdownGracefully ( )
181
+ try client2. syncShutdownGracefully ( )
182
+
183
+ _ = await task. result
184
+ _ = await task2. result
185
+
208
186
wait ( for: [ expectation, expectation2] , timeout: 5.0 )
209
187
}
210
188
}
0 commit comments