Skip to content

Commit c440699

Browse files
authored
Fix proxy conn multi broker (#1680)
1 parent c43451a commit c440699

File tree

5 files changed

+144
-32
lines changed

5 files changed

+144
-32
lines changed

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import io.netty.channel.Channel;
1818
import io.netty.handler.codec.mqtt.MqttConnectMessage;
19+
import io.netty.util.concurrent.FutureListener;
1920
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
2021
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
2122
import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils;
@@ -31,10 +32,11 @@ public class AdapterChannel {
3132
private final MQTTProxyAdapter adapter;
3233
@Getter
3334
private final InetSocketAddress broker;
35+
@Getter
3436
private CompletableFuture<Channel> channelFuture;
3537

36-
public AdapterChannel(MQTTProxyAdapter adapter,
37-
InetSocketAddress broker, CompletableFuture<Channel> channelFuture) {
38+
public AdapterChannel(MQTTProxyAdapter adapter, InetSocketAddress broker,
39+
CompletableFuture<Channel> channelFuture) {
3840
this.adapter = adapter;
3941
this.broker = broker;
4042
this.channelFuture = channelFuture;
@@ -68,16 +70,9 @@ private CompletableFuture<Void> writeConnectMessage(final Connection connection)
6870
return writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage));
6971
}
7072

71-
/**
72-
* When client subscribes, the adapter channel maybe close in exception, so register listener to close the
73-
* related client channel and trigger reconnection.
74-
* @param connection
75-
*/
76-
public void registerAdapterChannelInactiveListener(Connection connection) {
73+
public void registerClosureListener(FutureListener<Void> closeListener) {
7774
channelFuture.thenAccept(channel -> {
78-
MQTTProxyAdapter.AdapterHandler channelHandler = (MQTTProxyAdapter.AdapterHandler)
79-
channel.pipeline().get(MQTTProxyAdapter.AdapterHandler.NAME);
80-
channelHandler.registerAdapterChannelInactiveListener(connection);
75+
channel.closeFuture().addListener(closeListener);
8176
});
8277
}
8378
}

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,8 @@
4545
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
4646
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyProtocolMethodProcessor;
4747
import java.net.InetSocketAddress;
48-
import java.util.Collections;
4948
import java.util.List;
5049
import java.util.Map;
51-
import java.util.Set;
5250
import java.util.concurrent.CompletableFuture;
5351
import java.util.concurrent.ConcurrentHashMap;
5452
import java.util.concurrent.ConcurrentMap;
@@ -157,17 +155,6 @@ public class AdapterHandler extends ChannelInboundHandlerAdapter {
157155

158156
public static final String NAME = "adapter-handler";
159157

160-
private final Set<Connection> callbackConnections = Collections.newSetFromMap(new ConcurrentHashMap<>());
161-
162-
public void registerAdapterChannelInactiveListener(Connection connection) {
163-
callbackConnections.add(connection);
164-
}
165-
166-
@Override
167-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
168-
callbackConnections.forEach(connection -> connection.getChannel().close());
169-
}
170-
171158
@Override
172159
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
173160
checkArgument(message instanceof MqttAdapterMessage);

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth
8383
private Connection connection;
8484
private final LookupHandler lookupHandler;
8585
private final MQTTProxyConfiguration proxyConfig;
86+
@Getter
8687
private final Map<String, CompletableFuture<AdapterChannel>> topicBrokers;
8788
private final Map<InetSocketAddress, AdapterChannel> adapterChannels;
8889
@Getter
@@ -396,9 +397,7 @@ private CompletableFuture<Void> doSubscribe(final MqttAdapterMessage adapter, fi
396397
.build();
397398
MqttAdapterMessage mqttAdapterMessage =
398399
new MqttAdapterMessage(connection.getClientId(), subscribeMessage);
399-
return writeToBroker(encodedPulsarTopicName, mqttAdapterMessage)
400-
.thenAccept(__ ->
401-
registerAdapterChannelInactiveListener(encodedPulsarTopicName));
400+
return writeToBroker(encodedPulsarTopicName, mqttAdapterMessage);
402401
}).collect(Collectors.toList());
403402
return FutureUtil.waitForAll(writeFutures);
404403
})
@@ -420,11 +419,6 @@ private String getMqttTopicName(MqttTopicSubscription subscription, String encod
420419
}
421420
}
422421

423-
private void registerAdapterChannelInactiveListener(final String topic) {
424-
CompletableFuture<AdapterChannel> adapterChannel = topicBrokers.get(topic);
425-
adapterChannel.thenAccept(channel -> channel.registerAdapterChannelInactiveListener(connection));
426-
}
427-
428422
@Override
429423
public void processUnSubscribe(final MqttAdapterMessage adapter) {
430424
final MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) adapter.getMqttMessage();
@@ -479,6 +473,19 @@ private CompletableFuture<AdapterChannel> connectToBroker(final String topic) {
479473
final MqttConnectMessage connectMessage = connection.getConnectMessage();
480474
adapterChannel.writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(),
481475
connectMessage));
476+
adapterChannel.registerClosureListener(future -> {
477+
topicBrokers.values().remove(adapterChannel);
478+
if (topicBrokers.values().size() <= 1) {
479+
if (log.isDebugEnabled()) {
480+
log.debug("Adapter channel inactive, close related connection {}", connection);
481+
}
482+
connection.getChannel().close();
483+
} else {
484+
if (log.isDebugEnabled()) {
485+
log.debug("connection {} has more than one AdapterChannel", connection);
486+
}
487+
}
488+
});
482489
return adapterChannel;
483490
})
484491
)

tests/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
<version>${project.version}</version>
3535
<scope>test</scope>
3636
</dependency>
37+
<dependency>
38+
<groupId>org.eclipse.paho</groupId>
39+
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
40+
<version>1.2.5</version>
41+
<scope>test</scope>
42+
</dependency>
3743
<dependency>
3844
<groupId>io.streamnative.pulsar.handlers</groupId>
3945
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
@@ -106,4 +112,10 @@
106112
</plugin>
107113
</plugins>
108114
</build>
115+
<repositories>
116+
<repository>
117+
<id>Eclipse Paho Repo</id>
118+
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
119+
</repository>
120+
</repositories>
109121
</project>
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package io.streamnative.pulsar.handlers.mqtt.mqtt3.paho.proxy;
16+
17+
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
18+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
25+
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
26+
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
27+
import org.eclipse.paho.client.mqttv3.MqttCallback;
28+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
29+
import org.eclipse.paho.client.mqttv3.MqttMessage;
30+
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
31+
import org.testng.Assert;
32+
import org.testng.annotations.Test;
33+
34+
@Test
35+
@Slf4j
36+
public class TestProxyConnectMultiBroker extends MQTTTestBase {
37+
38+
@Override
39+
protected MQTTCommonConfiguration initConfig() throws Exception {
40+
MQTTCommonConfiguration mqtt = super.initConfig();
41+
mqtt.setDefaultNumberOfNamespaceBundles(4);
42+
mqtt.setMqttProxyEnabled(true);
43+
return mqtt;
44+
}
45+
46+
public static class Callback implements MqttCallback {
47+
48+
@Override
49+
public void connectionLost(Throwable throwable) {
50+
log.info("Connection lost");
51+
}
52+
53+
@Override
54+
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
55+
log.info("Message arrived");
56+
}
57+
58+
@Override
59+
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
60+
}
61+
}
62+
63+
@Test(timeOut = 1000 * 60 * 5)
64+
public void testProxyConnectMultiBroker() throws Exception {
65+
int port = getMqttProxyPortList().get(0);
66+
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + port, "test", new MemoryPersistence());
67+
MqttConnectOptions connectOptions = new MqttConnectOptions();
68+
connectOptions.setCleanSession(true);
69+
connectOptions.setKeepAliveInterval(5);
70+
log.info("connecting...");
71+
client.connect(connectOptions).waitForCompletion();
72+
log.info("connected");
73+
74+
client.subscribe("testsub1", 1).waitForCompletion();
75+
log.info("subscribed testsub1");
76+
// sleep the keep alive period to show that PING will happen in abscence of other messages.
77+
Thread.sleep(6000);
78+
79+
// make more subscriptions to connect to multiple brokers.
80+
client.subscribe("testsub2", 1).waitForCompletion();
81+
log.info("subscribed testsub2");
82+
client.subscribe("testsub3", 1).waitForCompletion();
83+
log.info("subscribed testsub3");
84+
Map<String, List<String>> msgs = new HashMap<>();
85+
String topic = "test1";
86+
client.subscribe(topic, 1, new IMqttMessageListener() {
87+
88+
@Override
89+
public void messageArrived(String topic, MqttMessage message) throws Exception {
90+
msgs.compute(topic, (k, v) -> {
91+
if (v == null) {
92+
v = new ArrayList<>();
93+
}
94+
v.add(new String(message.getPayload()));
95+
return v;
96+
});
97+
}
98+
}).waitForCompletion();
99+
100+
// publish QoS 1 message to prevent the need for PINGREQ. Keep alive only sends ping in abscence of other
101+
// messages. Refer to section 3.1.2.10 of the MQTT 3.1.1 specification.
102+
for (int i = 0; i < 130; i++) {
103+
log.info("Publishing message..." + System.currentTimeMillis());
104+
client.publish(topic, "test".getBytes(), 1, false).waitForCompletion();
105+
Thread.sleep(1000);
106+
}
107+
Assert.assertNotNull(msgs.get(topic) != null);
108+
Assert.assertEquals(msgs.get(topic).size(), 130);
109+
client.disconnect().waitForCompletion();
110+
}
111+
}

0 commit comments

Comments
 (0)