Skip to content

Commit ccc1e63

Browse files
coderzcTechnoboy-
authored andcommitted
Fix subscription authorization PREFIX mode (#1756)
1 parent 5e11f40 commit ccc1e63

File tree

3 files changed

+79
-18
lines changed

3 files changed

+79
-18
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,14 +392,22 @@ public void processSubscribe(MqttAdapterMessage adapter) {
392392
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
393393
String finalUserRole = userRole;
394394
authorizationFutures.add(this.authorizationService.canConsumeAsync(
395-
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, userRole)
395+
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, clientId)
396396
.thenAccept((authorized) -> {
397397
if (!authorized) {
398398
authorizedFlag.set(false);
399399
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",
400400
topic.topicName(), finalUserRole, clientId);
401401
}
402-
}));
402+
}).exceptionally(ex -> {
403+
if (ex != null) {
404+
ex = FutureUtil.unwrapCompletionException(ex);
405+
authorizedFlag.set(false);
406+
log.warn("[Subscribe] failed to authorize sub topic={}, userRole={}, CId= {}, err= {}",
407+
topic.topicName(), finalUserRole, clientId, ex.getMessage());
408+
}
409+
return null;
410+
}));
403411
}
404412
FutureUtil.waitForAll(authorizationFutures).thenAccept(__ -> {
405413
if (!authorizedFlag.get()) {

pom.xml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -158,16 +158,16 @@
158158

159159
<dependencyManagement>
160160
<dependencies>
161-
<dependency>
162-
<groupId>com.fasterxml.jackson</groupId>
163-
<artifactId>jackson-bom</artifactId>
164-
<version>${jackson.version}</version>
165-
<scope>import</scope>
166-
<type>pom</type>
167-
</dependency>
161+
<dependency>
162+
<groupId>com.fasterxml.jackson</groupId>
163+
<artifactId>jackson-bom</artifactId>
164+
<version>${jackson.version}</version>
165+
<scope>import</scope>
166+
<type>pom</type>
167+
</dependency>
168168
</dependencies>
169169
</dependencyManagement>
170-
170+
171171

172172
<build>
173173
<pluginManagement>
@@ -206,14 +206,14 @@
206206
</plugins>
207207
</pluginManagement>
208208
<plugins>
209-
<!-- <plugin>-->
210-
<!-- <groupId>com.github.spotbugs</groupId>-->
211-
<!-- <artifactId>spotbugs-maven-plugin</artifactId>-->
212-
<!-- <version>${spotbugs-maven-plugin.version}</version>-->
213-
<!-- <configuration>-->
214-
<!-- <excludeFilterFile>resources/findbugsExclude.xml</excludeFilterFile>-->
215-
<!-- </configuration>-->
216-
<!-- </plugin>-->
209+
<!-- <plugin>-->
210+
<!-- <groupId>com.github.spotbugs</groupId>-->
211+
<!-- <artifactId>spotbugs-maven-plugin</artifactId>-->
212+
<!-- <version>${spotbugs-maven-plugin.version}</version>-->
213+
<!-- <configuration>-->
214+
<!-- <excludeFilterFile>resources/findbugsExclude.xml</excludeFilterFile>-->
215+
<!-- </configuration>-->
216+
<!-- </plugin>-->
217217
<plugin>
218218
<artifactId>maven-compiler-plugin</artifactId>
219219
<version>${maven-compiler-plugin.version}</version>

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
2323
import org.apache.pulsar.common.policies.data.AuthAction;
24+
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
2425
import org.awaitility.Awaitility;
2526
import org.fusesource.mqtt.client.BlockingConnection;
2627
import org.fusesource.mqtt.client.MQTT;
@@ -113,6 +114,58 @@ public void testAuthorizedOnTopic() throws Exception {
113114
consumer.disconnect();
114115
}
115116

117+
@Test(timeOut = TIMEOUT)
118+
public void testSubscriptionAuthMode() throws Exception {
119+
Set<AuthAction> user1Actions = new HashSet<>();
120+
user1Actions.add(AuthAction.produce);
121+
admin.namespaces().grantPermissionOnNamespace("public/default", "user1", user1Actions);
122+
123+
Set<AuthAction> user2Actions = new HashSet<>();
124+
user2Actions.add(AuthAction.consume);
125+
admin.namespaces().grantPermissionOnNamespace("public/default", "user2", user2Actions);
126+
127+
admin.namespaces().setSubscriptionAuthMode("public/default", SubscriptionAuthMode.Prefix);
128+
129+
String topicName = "persistent://public/default/testSubscriptionAuthMode";
130+
MQTT mqttConsumer = createMQTTClient();
131+
mqttConsumer.setClientId("user2-11");
132+
mqttConsumer.setUserName("user2");
133+
mqttConsumer.setPassword("pass2");
134+
BlockingConnection consumer = mqttConsumer.blockingConnection();
135+
consumer.connect();
136+
Topic[] topics = {new Topic(topicName, QoS.AT_LEAST_ONCE)};
137+
consumer.subscribe(topics);
138+
139+
MQTT mqttProducer = createMQTTClient();
140+
mqttProducer.setUserName("user1");
141+
mqttProducer.setPassword("pass1");
142+
BlockingConnection producer = mqttProducer.blockingConnection();
143+
producer.connect();
144+
String message = "Hello MQTT";
145+
producer.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false);
146+
147+
Message receive = consumer.receive();
148+
Assert.assertEquals(new String(receive.getPayload()), message);
149+
producer.disconnect();
150+
consumer.disconnect();
151+
152+
MQTT mqttConsumer3 = createMQTTClient();
153+
mqttConsumer3.setClientId("user3-11");
154+
mqttConsumer3.setUserName("user2");
155+
mqttConsumer3.setPassword("pass2");
156+
mqttConsumer3.setConnectAttemptsMax(0);
157+
mqttConsumer3.setReconnectAttemptsMax(0);
158+
BlockingConnection connection3 = mqttConsumer3.blockingConnection();
159+
connection3.connect();
160+
Awaitility.await().untilAsserted(() -> {
161+
Assert.assertTrue(connection3.isConnected());
162+
});
163+
connection3.subscribe(topics);
164+
Awaitility.await().untilAsserted(() -> {
165+
Assert.assertFalse(connection3.isConnected());
166+
});
167+
}
168+
116169
@Test
117170
public void testNotAuthorized() throws Exception {
118171
Set<AuthAction> user3Actions = new HashSet<>();

0 commit comments

Comments
 (0)