Skip to content

Commit cda953f

Browse files
authored
Fix subscription authorization PREFIX mode (#1756)
1 parent a491ad7 commit cda953f

File tree

3 files changed

+75
-3
lines changed

3 files changed

+75
-3
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,14 +393,22 @@ public void processSubscribe(MqttAdapterMessage adapter) {
393393
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
394394
String finalUserRole = userRole;
395395
authorizationFutures.add(this.authorizationService.canConsumeAsync(
396-
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, userRole)
396+
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, clientId)
397397
.thenAccept((authorized) -> {
398398
if (!authorized) {
399399
authorizedFlag.set(false);
400400
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",
401401
topic.topicName(), finalUserRole, clientId);
402402
}
403-
}));
403+
}).exceptionally(ex -> {
404+
if (ex != null) {
405+
ex = FutureUtil.unwrapCompletionException(ex);
406+
authorizedFlag.set(false);
407+
log.warn("[Subscribe] failed to authorize sub topic={}, userRole={}, CId= {}, err= {}",
408+
topic.topicName(), finalUserRole, clientId, ex.getMessage());
409+
}
410+
return null;
411+
}));
404412
}
405413
FutureUtil.waitForAll(authorizationFutures).thenAccept(__ -> {
406414
if (!authorizedFlag.get()) {

pom.xml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@
5353
<awaitility.version>4.0.2</awaitility.version>
5454
<pulsar.version>4.1.0-SNAPSHOT</pulsar.version>
5555
<sn.bom.version>4.1.0-SNAPSHOT</sn.bom.version>
56-
<mqtt.codec.version>4.1.115.Final</mqtt.codec.version>
5756
<log4j2.version>2.18.0</log4j2.version>
5857
<fusesource.client.version>1.16</fusesource.client.version>
5958
<hivemq.mqtt.client.version>1.2.2</hivemq.mqtt.client.version>
6059
<apache.commons.bean-utils.version>1.9.4</apache.commons.bean-utils.version>
6160
<grpc.version>1.45.1</grpc.version>
6261
<jackson.version>2.14.2</jackson.version>
62+
<netty.version>4.1.122.Final</netty.version>
6363
<!-- plugins -->
6464
<javac.source>17</javac.source>
6565
<javac.target>17</javac.target>
@@ -77,11 +77,22 @@
7777
<dependency>
7878
<groupId>io.streamnative</groupId>
7979
<artifactId>pulsar-broker</artifactId>
80+
<exclusions>
81+
<exclusion>
82+
<artifactId>netty-codec-http2</artifactId>
83+
<groupId>io.netty</groupId>
84+
</exclusion>
85+
</exclusions>
8086
</dependency>
8187
<dependency>
8288
<groupId>org.projectlombok</groupId>
8389
<artifactId>lombok</artifactId>
8490
</dependency>
91+
<dependency>
92+
<artifactId>netty-codec-http2</artifactId>
93+
<groupId>io.netty</groupId>
94+
<version>${netty.version}</version>
95+
</dependency>
8596
<dependency>
8697
<groupId>io.netty</groupId>
8798
<artifactId>netty-codec-mqtt</artifactId>

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
2323
import org.apache.pulsar.common.policies.data.AuthAction;
24+
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
2425
import org.awaitility.Awaitility;
2526
import org.fusesource.mqtt.client.BlockingConnection;
2627
import org.fusesource.mqtt.client.MQTT;
@@ -113,6 +114,58 @@ public void testAuthorizedOnTopic() throws Exception {
113114
consumer.disconnect();
114115
}
115116

117+
@Test(timeOut = TIMEOUT)
118+
public void testSubscriptionAuthMode() throws Exception {
119+
Set<AuthAction> user1Actions = new HashSet<>();
120+
user1Actions.add(AuthAction.produce);
121+
admin.namespaces().grantPermissionOnNamespace("public/default", "user1", user1Actions);
122+
123+
Set<AuthAction> user2Actions = new HashSet<>();
124+
user2Actions.add(AuthAction.consume);
125+
admin.namespaces().grantPermissionOnNamespace("public/default", "user2", user2Actions);
126+
127+
admin.namespaces().setSubscriptionAuthMode("public/default", SubscriptionAuthMode.Prefix);
128+
129+
String topicName = "persistent://public/default/testSubscriptionAuthMode";
130+
MQTT mqttConsumer = createMQTTClient();
131+
mqttConsumer.setClientId("user2-11");
132+
mqttConsumer.setUserName("user2");
133+
mqttConsumer.setPassword("pass2");
134+
BlockingConnection consumer = mqttConsumer.blockingConnection();
135+
consumer.connect();
136+
Topic[] topics = {new Topic(topicName, QoS.AT_LEAST_ONCE)};
137+
consumer.subscribe(topics);
138+
139+
MQTT mqttProducer = createMQTTClient();
140+
mqttProducer.setUserName("user1");
141+
mqttProducer.setPassword("pass1");
142+
BlockingConnection producer = mqttProducer.blockingConnection();
143+
producer.connect();
144+
String message = "Hello MQTT";
145+
producer.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false);
146+
147+
Message receive = consumer.receive();
148+
Assert.assertEquals(new String(receive.getPayload()), message);
149+
producer.disconnect();
150+
consumer.disconnect();
151+
152+
MQTT mqttConsumer3 = createMQTTClient();
153+
mqttConsumer3.setClientId("user3-11");
154+
mqttConsumer3.setUserName("user2");
155+
mqttConsumer3.setPassword("pass2");
156+
mqttConsumer3.setConnectAttemptsMax(0);
157+
mqttConsumer3.setReconnectAttemptsMax(0);
158+
BlockingConnection connection3 = mqttConsumer3.blockingConnection();
159+
connection3.connect();
160+
Awaitility.await().untilAsserted(() -> {
161+
Assert.assertTrue(connection3.isConnected());
162+
});
163+
connection3.subscribe(topics);
164+
Awaitility.await().untilAsserted(() -> {
165+
Assert.assertFalse(connection3.isConnected());
166+
});
167+
}
168+
116169
@Test
117170
public void testNotAuthorized() throws Exception {
118171
Set<AuthAction> user3Actions = new HashSet<>();

0 commit comments

Comments
 (0)