Skip to content

Commit bd559b1

Browse files
authored
Don't call EventLoop.submit if already on the event loop (#107)
* Minor optimisation Don't call EventLoop.Submit is on the event loop * Stop using XCTestExpectation * Update MQTTNIOTests+async.swift
1 parent 40e025e commit bd559b1

File tree

2 files changed

+31
-22
lines changed

2 files changed

+31
-22
lines changed

Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,20 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
2727
}
2828

2929
func addTask(_ task: MQTTTask) -> EventLoopFuture<Void> {
30-
return self.eventLoop.submit {
31-
self.tasks.append(task)
30+
if self.eventLoop.inEventLoop {
31+
self._addTask(task)
32+
return self.eventLoop.makeSucceededVoidFuture()
33+
} else {
34+
return self.eventLoop.submit {
35+
self.tasks.append(task)
36+
}
3237
}
3338
}
3439

40+
private func _addTask(_ task: MQTTTask) {
41+
self.tasks.append(task)
42+
}
43+
3544
private func _removeTask(_ task: MQTTTask) {
3645
self.tasks.removeAll { $0 === task }
3746
}

Tests/MQTTNIOTests/MQTTNIOTests+async.swift

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,8 @@ final class AsyncMQTTNIOTests: XCTestCase {
100100
}
101101

102102
func testAsyncSequencePublishListener() async throws {
103-
let expectation = XCTestExpectation(description: "testAsyncSequencePublishListener")
104-
expectation.expectedFulfillmentCount = 2
105-
let finishExpectation = XCTestExpectation(description: "testAsyncSequencePublishListener.finish")
106-
finishExpectation.expectedFulfillmentCount = 1
103+
let expectation = NIOAtomic.makeAtomic(value: 0)
104+
let finishExpectation = NIOAtomic.makeAtomic(value: 0)
107105

108106
let client = self.createClient(identifier: "testAsyncSequencePublishListener+async", version: .v5_0)
109107
let client2 = self.createClient(identifier: "testAsyncSequencePublishListener+async2", version: .v5_0)
@@ -119,34 +117,33 @@ final class AsyncMQTTNIOTests: XCTestCase {
119117
var buffer = publish.payload
120118
let string = buffer.readString(length: buffer.readableBytes)
121119
print("Received: \(string ?? "nothing")")
122-
expectation.fulfill()
120+
expectation.add(1)
123121

124122
case .failure(let error):
125123
XCTFail("\(error)")
126124
}
127125
}
128-
finishExpectation.fulfill()
126+
finishExpectation.add(1)
129127
}
130128
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Hello"), qos: .atLeastOnce)
131129
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Goodbye"), qos: .atLeastOnce)
132130
try await client.disconnect()
133131

134-
self.wait(for: [expectation], timeout: 5.0)
132+
_ = try await Task.sleep(nanoseconds: 500_000_000)
135133

136134
try await client2.disconnect()
137135
try await client.shutdown()
138136
try await client2.shutdown()
139137

140-
self.wait(for: [finishExpectation], timeout: 5.0)
141-
142138
_ = await task.result
139+
140+
XCTAssertEqual(expectation.load(), 2)
141+
XCTAssertEqual(finishExpectation.load(), 1)
143142
}
144143

145144
func testAsyncSequencePublishSubscriptionIdListener() async throws {
146-
let expectation = XCTestExpectation(description: "publish listener")
147-
let expectation2 = XCTestExpectation(description: "publish listener2")
148-
expectation.expectedFulfillmentCount = 3
149-
expectation2.expectedFulfillmentCount = 2
145+
let expectation = NIOAtomic.makeAtomic(value: 0)
146+
let expectation2 = NIOAtomic.makeAtomic(value: 0)
150147

151148
let client = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async", version: .v5_0)
152149
let client2 = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async2", version: .v5_0)
@@ -159,31 +156,34 @@ final class AsyncMQTTNIOTests: XCTestCase {
159156
let task = Task {
160157
let publishListener = client2.v5.createPublishListener(subscriptionId: 1)
161158
for await _ in publishListener {
162-
expectation.fulfill()
159+
expectation.add(1)
163160
}
164-
expectation.fulfill()
161+
expectation.add(1)
165162
}
166163
let task2 = Task {
167164
let publishListener = client2.v5.createPublishListener(subscriptionId: 2)
168165
for await _ in publishListener {
169-
expectation2.fulfill()
166+
expectation2.add(1)
170167
}
171-
expectation2.fulfill()
168+
expectation2.add(1)
172169
}
173170
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
174171
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
175172
try await client.publish(to: "TestSubject2", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
173+
176174
try await client.disconnect()
177-
Thread.sleep(forTimeInterval: 0.5)
175+
176+
_ = try await Task.sleep(nanoseconds: 500_000_000)
177+
178178
try await client2.disconnect()
179-
Thread.sleep(forTimeInterval: 0.5)
180179
try client.syncShutdownGracefully()
181180
try client2.syncShutdownGracefully()
182181

183182
_ = await task.result
184183
_ = await task2.result
185184

186-
wait(for: [expectation, expectation2], timeout: 5.0)
185+
XCTAssertEqual(expectation.load(), 3)
186+
XCTAssertEqual(expectation2.load(), 2)
187187
}
188188
}
189189

0 commit comments

Comments
 (0)