Skip to content

Commit 26249a2

Browse files
authored
fix topic authentication issue (#1734)
1 parent 4584ecb commit 26249a2

File tree

3 files changed

+64
-5
lines changed

3 files changed

+64
-5
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.pulsar.broker.service.Subscription;
9292
import org.apache.pulsar.common.api.proto.CommandAck;
9393
import org.apache.pulsar.common.api.proto.CommandSubscribe;
94+
import org.apache.pulsar.common.naming.TopicDomain;
9495
import org.apache.pulsar.common.naming.TopicName;
9596
import org.apache.pulsar.common.util.Codec;
9697
import org.apache.pulsar.common.util.FutureUtil;
@@ -211,8 +212,8 @@ public void processPublish(MqttAdapterMessage adapter) {
211212
userRole = authenticationRole.get();
212213
}
213214
}
214-
result = this.authorizationService.canProduceAsync(TopicName.get(msg.variableHeader().topicName()),
215-
userRole, authData)
215+
result = this.authorizationService.canProduceAsync(
216+
getEncodedPulsarTopicName(msg.variableHeader().topicName()), userRole, authData)
216217
.thenCompose(authorized -> authorized ? doPublish(adapter) : doUnauthorized(adapter));
217218
}
218219
result.thenAccept(__ -> msg.release())
@@ -225,6 +226,12 @@ public void processPublish(MqttAdapterMessage adapter) {
225226
});
226227
}
227228

229+
private TopicName getEncodedPulsarTopicName(String mqttTopicName) {
230+
return TopicName.get(PulsarTopicUtils.getEncodedPulsarTopicName(mqttTopicName,
231+
configuration.getDefaultTenant(), configuration.getDefaultNamespace(),
232+
TopicDomain.getEnum(configuration.getDefaultTopicDomain())));
233+
}
234+
228235
private CompletableFuture<Void> doUnauthorized(MqttAdapterMessage adapter) {
229236
final MqttPublishMessage msg = (MqttPublishMessage) adapter.getMqttMessage();
230237
log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}",
@@ -385,8 +392,9 @@ public void processSubscribe(MqttAdapterMessage adapter) {
385392
AtomicBoolean authorizedFlag = new AtomicBoolean(true);
386393
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
387394
String finalUserRole = userRole;
388-
authorizationFutures.add(this.authorizationService.canConsumeAsync(TopicName.get(topic.topicName()),
389-
userRole, authData, userRole).thenAccept((authorized) -> {
395+
authorizationFutures.add(this.authorizationService.canConsumeAsync(
396+
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, userRole)
397+
.thenAccept((authorized) -> {
390398
if (!authorized) {
391399
authorizedFlag.set(false);
392400
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.testng.Assert.assertEquals;
1717
import static org.testng.Assert.assertFalse;
1818
import static org.testng.Assert.assertNotEquals;
19+
import static org.testng.Assert.assertNotNull;
1920
import io.netty.channel.Channel;
2021
import io.netty.channel.ChannelId;
2122
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -84,7 +85,9 @@ public void testAdapterChannelAutoGetConnection() throws InterruptedException {
8485
.connectMessage(fakeConnectMessage).build();
8586
adapterChannel.writeAndFlush(connection, mqttAdapterMessage).join();
8687
CompletableFuture<Channel> channelFutureAfterSend = brokerChannels.get(key);
88+
assertNotNull(channelFutureAfterSend);
8789
Channel channelAfterSend = channelFutureAfterSend.join();
90+
assertNotNull(channelAfterSend);
8891
assertNotEquals(channelAfterSend.id(), previousChannelId);
8992
}
9093
}

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.mqtt.mqtt3.fusesource.base;
1515

16+
import com.google.common.collect.Lists;
1617
import io.streamnative.pulsar.handlers.mqtt.base.AuthorizationConfig;
18+
import java.net.URLEncoder;
1719
import java.util.HashSet;
1820
import java.util.Set;
1921
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
2023
import org.apache.pulsar.common.policies.data.AuthAction;
2124
import org.awaitility.Awaitility;
2225
import org.fusesource.mqtt.client.BlockingConnection;
@@ -33,7 +36,7 @@
3336
public class AuthorizationTest extends AuthorizationConfig {
3437

3538
@Test(timeOut = TIMEOUT)
36-
public void testAuthorized() throws Exception {
39+
public void testAuthorizedOnNamespace() throws Exception {
3740
Set<AuthAction> user1Actions = new HashSet<>();
3841
user1Actions.add(AuthAction.produce);
3942
admin.namespaces().grantPermissionOnNamespace("public/default", "user1", user1Actions);
@@ -65,6 +68,51 @@ public void testAuthorized() throws Exception {
6568
consumer.disconnect();
6669
}
6770

71+
@Test(timeOut = TIMEOUT)
72+
public void testAuthorizedOnTopic() throws Exception {
73+
String topicName = "persistent://public/default/testAuthorizedOnTopic/a";
74+
String encodedTopicName = "persistent://public/default/" + URLEncoder.encode("testAuthorizedOnTopic/a");
75+
admin.topics().createNonPartitionedTopic(encodedTopicName);
76+
Set<AuthAction> user1Actions = new HashSet<>();
77+
user1Actions.add(AuthAction.produce);
78+
final GrantTopicPermissionOptions permission1 = GrantTopicPermissionOptions.builder()
79+
.topic(encodedTopicName)
80+
.role("user1")
81+
.actions(user1Actions)
82+
.build();
83+
admin.namespaces().grantPermissionOnTopics(Lists.newArrayList(permission1));
84+
85+
Set<AuthAction> user2Actions = new HashSet<>();
86+
user2Actions.add(AuthAction.consume);
87+
final GrantTopicPermissionOptions permission2 = GrantTopicPermissionOptions.builder()
88+
.topic(encodedTopicName)
89+
.role("user2")
90+
.actions(user2Actions)
91+
.build();
92+
admin.namespaces().grantPermissionOnTopics(Lists.newArrayList(permission2));
93+
94+
MQTT mqttConsumer = createMQTTClient();
95+
mqttConsumer.setUserName("user2");
96+
mqttConsumer.setPassword("pass2");
97+
BlockingConnection consumer = mqttConsumer.blockingConnection();
98+
consumer.connect();
99+
Topic[] topics = {new Topic(topicName, QoS.AT_LEAST_ONCE)};
100+
consumer.subscribe(topics);
101+
102+
MQTT mqttProducer = createMQTTClient();
103+
mqttProducer.setUserName("user1");
104+
mqttProducer.setPassword("pass1");
105+
BlockingConnection producer = mqttProducer.blockingConnection();
106+
producer.connect();
107+
String message = "Hello MQTT";
108+
producer.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false);
109+
110+
Message receive = consumer.receive();
111+
Assert.assertEquals(new String(receive.getPayload()), message);
112+
producer.disconnect();
113+
consumer.disconnect();
114+
}
115+
68116
@Test
69117
public void testNotAuthorized() throws Exception {
70118
Set<AuthAction> user3Actions = new HashSet<>();

0 commit comments

Comments
 (0)