diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java
index 42e872e98..37fb96f9b 100644
--- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java
+++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java
@@ -393,14 +393,22 @@ public void processSubscribe(MqttAdapterMessage adapter) {
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
String finalUserRole = userRole;
authorizationFutures.add(this.authorizationService.canConsumeAsync(
- getEncodedPulsarTopicName(topic.topicName()), userRole, authData, userRole)
+ getEncodedPulsarTopicName(topic.topicName()), userRole, authData, clientId)
.thenAccept((authorized) -> {
if (!authorized) {
authorizedFlag.set(false);
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",
topic.topicName(), finalUserRole, clientId);
}
- }));
+ }).exceptionally(ex -> {
+ if (ex != null) {
+ ex = FutureUtil.unwrapCompletionException(ex);
+ authorizedFlag.set(false);
+ log.warn("[Subscribe] failed to authorize sub topic={}, userRole={}, CId= {}, err= {}",
+ topic.topicName(), finalUserRole, clientId, ex.getMessage());
+ }
+ return null;
+ }));
}
FutureUtil.waitForAll(authorizationFutures).thenAccept(__ -> {
if (!authorizedFlag.get()) {
diff --git a/pom.xml b/pom.xml
index d478ffeb0..7a20c1538 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,13 +53,13 @@
4.0.2
4.1.0-SNAPSHOT
4.1.0-SNAPSHOT
- 4.1.115.Final
2.18.0
1.16
1.2.2
1.9.4
1.45.1
2.14.2
+ 4.1.122.Final
17
17
@@ -77,11 +77,22 @@
io.streamnative
pulsar-broker
+
+
+ netty-codec-http2
+ io.netty
+
+
org.projectlombok
lombok
+
+ netty-codec-http2
+ io.netty
+ ${netty.version}
+
io.netty
netty-codec-mqtt
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java
index e6507d28e..501e9eb4b 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java
@@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.awaitility.Awaitility;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
@@ -113,6 +114,58 @@ public void testAuthorizedOnTopic() throws Exception {
consumer.disconnect();
}
+ @Test(timeOut = TIMEOUT)
+ public void testSubscriptionAuthMode() throws Exception {
+ Set user1Actions = new HashSet<>();
+ user1Actions.add(AuthAction.produce);
+ admin.namespaces().grantPermissionOnNamespace("public/default", "user1", user1Actions);
+
+ Set user2Actions = new HashSet<>();
+ user2Actions.add(AuthAction.consume);
+ admin.namespaces().grantPermissionOnNamespace("public/default", "user2", user2Actions);
+
+ admin.namespaces().setSubscriptionAuthMode("public/default", SubscriptionAuthMode.Prefix);
+
+ String topicName = "persistent://public/default/testSubscriptionAuthMode";
+ MQTT mqttConsumer = createMQTTClient();
+ mqttConsumer.setClientId("user2-11");
+ mqttConsumer.setUserName("user2");
+ mqttConsumer.setPassword("pass2");
+ BlockingConnection consumer = mqttConsumer.blockingConnection();
+ consumer.connect();
+ Topic[] topics = {new Topic(topicName, QoS.AT_LEAST_ONCE)};
+ consumer.subscribe(topics);
+
+ MQTT mqttProducer = createMQTTClient();
+ mqttProducer.setUserName("user1");
+ mqttProducer.setPassword("pass1");
+ BlockingConnection producer = mqttProducer.blockingConnection();
+ producer.connect();
+ String message = "Hello MQTT";
+ producer.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false);
+
+ Message receive = consumer.receive();
+ Assert.assertEquals(new String(receive.getPayload()), message);
+ producer.disconnect();
+ consumer.disconnect();
+
+ MQTT mqttConsumer3 = createMQTTClient();
+ mqttConsumer3.setClientId("user3-11");
+ mqttConsumer3.setUserName("user2");
+ mqttConsumer3.setPassword("pass2");
+ mqttConsumer3.setConnectAttemptsMax(0);
+ mqttConsumer3.setReconnectAttemptsMax(0);
+ BlockingConnection connection3 = mqttConsumer3.blockingConnection();
+ connection3.connect();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(connection3.isConnected());
+ });
+ connection3.subscribe(topics);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(connection3.isConnected());
+ });
+ }
+
@Test
public void testNotAuthorized() throws Exception {
Set user3Actions = new HashSet<>();