Skip to content

Commit 7a31582

Browse files
dometecvietj
authored andcommitted
Pause/Resume Reading from Socket
Motivation: The ability to pause reading from the socket to avoid being overwhelmed by QoS 0 messages during traffic spikes. When reading is paused, incoming data accumulates in the TCP receive buffer. As the buffer fills up, the TCP stack advertises a reduced window size (eventually a zero window) to the broker, applying backpressure at the TCP level. At that point, outgoing data may accumulate in the broker’s socket send buffer until its limits are reached. Depending on the broker’s implementation and configuration, QoS 0 messages that cannot be written may be buffered or dropped. Changes: Implement new pause/resume methods that change the setAutoRead(bool) connection's option.
1 parent afdc250 commit 7a31582

File tree

3 files changed

+152
-0
lines changed

3 files changed

+152
-0
lines changed

src/main/java/io/vertx/mqtt/MqttClient.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,29 @@ static MqttClient create(Vertx vertx) {
321321
@Fluent
322322
MqttClient ping();
323323

324+
/**
325+
* Pause the reading channel, so no new byte are read from the server.
326+
* Available onliy after connection is established.
327+
* <p>
328+
* This simply delegates to {@link io.vertx.core.net.NetSocket#pause()}.
329+
*/
330+
void pause();
331+
332+
/**
333+
* Returns true if the reading channel is paused.
334+
* Available onliy after connection is established.
335+
* @return true if the client publishing is paused.
336+
*/
337+
boolean isPaused();
338+
339+
/**
340+
* Resume the reading channel. see {@link #pause()}
341+
* Available onliy after connection is established.
342+
* <p>
343+
* This simply delegates to {@link io.vertx.core.net.NetSocket#resume()}.
344+
*/
345+
void resume();
346+
324347
/**
325348
* @return the client identifier
326349
*/

src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.Unpooled;
21+
import io.netty.channel.ChannelConfig;
2122
import io.netty.channel.ChannelHandlerContext;
2223
import io.netty.channel.ChannelPipeline;
2324
import io.netty.handler.codec.DecoderResult;
@@ -76,6 +77,7 @@ private enum Status { CLOSED, CONNECTING, CONNECTED, CLOSING }
7677
private final VertxInternal vertx;
7778
private final MqttClientOptions options;
7879
private NetSocketInternal connection;
80+
private ChannelConfig connOption;
7981
private ContextInternal ctx;
8082

8183
// handler to call when a publish is complete
@@ -245,13 +247,15 @@ private Future<MqttConnAckMessage> doConnect(int port, String host, String serve
245247
initChannel(soi);
246248
synchronized (MqttClientImpl.this) {
247249
this.connection = soi;
250+
this.connOption = soi.channelHandlerContext().channel().config();
248251
}
249252

250253
soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg));
251254
soi.closeHandler(v2 -> {
252255
client.close();
253256
synchronized (MqttClientImpl.this) {
254257
this.connection = null;
258+
this.connOption = null;
255259
this.status = Status.CLOSED;
256260
this.connectPromise = null;
257261
this.disconnectPromise = null;
@@ -721,6 +725,20 @@ public MqttClient ping() {
721725
return this;
722726
}
723727

728+
@Override
729+
public synchronized void pause() {
730+
connOption.setAutoRead(false);
731+
}
732+
733+
public synchronized boolean isPaused() {
734+
return connOption.isAutoRead();
735+
}
736+
737+
@Override
738+
public synchronized void resume() {
739+
connOption.setAutoRead(true);
740+
}
741+
724742
@Override
725743
public synchronized String clientId() {
726744
return this.options.getClientId();
@@ -881,6 +899,7 @@ private void handleClosed() {
881899
this.disconnectPromise = null;
882900
this.status = Status.CLOSED;
883901
this.connection = null;
902+
this.connOption = null;
884903
this.ctx = null;
885904
this.client = null;
886905
this.pings = new ArrayDeque<>();
@@ -1274,6 +1293,7 @@ private void handleConnack(MqttConnAckMessage msg) {
12741293
this.disconnectPromise = null;
12751294
this.status = Status.CLOSED;
12761295
this.connection = null;
1296+
this.connOption = null;
12771297
this.client = null;
12781298
}
12791299
connection.closeHandler(null);
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2016 Red Hat Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.vertx.mqtt.test.client;
18+
19+
import io.netty.handler.codec.mqtt.MqttQoS;
20+
import io.vertx.core.Vertx;
21+
import io.vertx.core.buffer.Buffer;
22+
import io.vertx.ext.unit.Async;
23+
import io.vertx.ext.unit.TestContext;
24+
import io.vertx.ext.unit.junit.VertxUnitRunner;
25+
import io.vertx.mqtt.MqttClient;
26+
import io.vertx.mqtt.MqttClientOptions;
27+
import io.vertx.mqtt.MqttServer;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
/**
36+
* MQTT client pause tests.
37+
*/
38+
@RunWith(VertxUnitRunner.class)
39+
public class MqttClientPauseTest {
40+
41+
private Vertx vertx;
42+
private MqttServer server;
43+
44+
@Before
45+
public void before(TestContext ctx) {
46+
vertx = Vertx.vertx();
47+
server = MqttServer.create(vertx);
48+
server.endpointHandler(endpoint -> {
49+
endpoint.accept(false);
50+
endpoint.publishHandler(msg -> {
51+
// Echo back
52+
endpoint.publish(msg.topicName(), msg.payload(), MqttQoS.AT_MOST_ONCE, false, false);
53+
});
54+
});
55+
Async async = ctx.async();
56+
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
57+
async.awaitSuccess(10000);
58+
}
59+
60+
@After
61+
public void after(TestContext ctx) {
62+
server.close(ctx.asyncAssertSuccess(v -> {
63+
vertx.close(ctx.asyncAssertSuccess());
64+
}));
65+
}
66+
67+
@Test
68+
public void testPauseResume(TestContext ctx) {
69+
Async async = ctx.async();
70+
AtomicInteger received = new AtomicInteger();
71+
MqttClient client = MqttClient.create(vertx);
72+
73+
client.connect(MqttClientOptions.DEFAULT_PORT, "localhost", ctx.asyncAssertSuccess(ack -> {
74+
75+
client.publishHandler(msg -> {
76+
received.incrementAndGet();
77+
});
78+
79+
// Pause the client immediately
80+
client.pause();
81+
82+
// Publish a message from client to trigger server echo
83+
// Since client is paused for READING, it should still be able to WRITE.
84+
client.publish("test/topic", Buffer.buffer("hello"), MqttQoS.AT_MOST_ONCE, false, false);
85+
86+
// Wait a bit to ensure nothing is received, check that nothing is receives, and resume reading
87+
vertx.setTimer(1_000, id -> {
88+
89+
ctx.assertEquals(0, received.get(), "Should not receive message while paused");
90+
91+
// Wait for message
92+
long timerId = vertx.setTimer(2_000, id2 -> {
93+
ctx.fail("Did not receive message after resume");
94+
});
95+
96+
client.publishHandler(msg -> {
97+
vertx.cancelTimer(timerId);
98+
received.incrementAndGet();
99+
async.complete();
100+
});
101+
102+
client.resume();
103+
});
104+
}));
105+
106+
async.await();
107+
client.disconnect();
108+
}
109+
}

0 commit comments

Comments
 (0)