Skip to content

Commit 092fb58

Browse files
committed
Fix subscription authorization mode
1 parent a491ad7 commit 092fb58

File tree

2 files changed

+64
-2
lines changed

2 files changed

+64
-2
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,21 +393,30 @@ public void processSubscribe(MqttAdapterMessage adapter) {
393393
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
394394
String finalUserRole = userRole;
395395
authorizationFutures.add(this.authorizationService.canConsumeAsync(
396-
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, userRole)
396+
getEncodedPulsarTopicName(topic.topicName()), userRole, authData, clientId)
397397
.thenAccept((authorized) -> {
398398
if (!authorized) {
399399
authorizedFlag.set(false);
400400
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",
401401
topic.topicName(), finalUserRole, clientId);
402402
}
403-
}));
403+
}).exceptionally(ex -> {
404+
if (ex != null) {
405+
ex = FutureUtil.unwrapCompletionException(ex);
406+
authorizedFlag.set(false);
407+
log.warn("[Subscribe] failed to authorize sub topic={}, userRole={}, CId= {}, err= {}",
408+
topic.topicName(), finalUserRole, clientId, ex.getMessage());
409+
}
410+
return null;
411+
}));
404412
}
405413
FutureUtil.waitForAll(authorizationFutures).thenAccept(__ -> {
406414
if (!authorizedFlag.get()) {
407415
MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion())
408416
.packetId(packetId)
409417
.errorReason(MqttSubAck.ErrorReason.AUTHORIZATION_FAIL)
410418
.build();
419+
log.warn("send failed sub ack to clientId={}, subAck={}", clientId, subAck);
411420
connection.sendAckThenClose(subAck);
412421
} else {
413422
doSubscribe(msg);

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/AuthorizationTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
2323
import org.apache.pulsar.common.policies.data.AuthAction;
24+
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
2425
import org.awaitility.Awaitility;
2526
import org.fusesource.mqtt.client.BlockingConnection;
2627
import org.fusesource.mqtt.client.MQTT;
@@ -113,6 +114,58 @@ public void testAuthorizedOnTopic() throws Exception {
113114
consumer.disconnect();
114115
}
115116

117+
@Test(timeOut = TIMEOUT)
118+
public void testSubscriptionAuthMode() throws Exception {
119+
Set<AuthAction> user1Actions = new HashSet<>();
120+
user1Actions.add(AuthAction.produce);
121+
admin.namespaces().grantPermissionOnNamespace("public/default", "user1", user1Actions);
122+
123+
Set<AuthAction> user2Actions = new HashSet<>();
124+
user2Actions.add(AuthAction.consume);
125+
admin.namespaces().grantPermissionOnNamespace("public/default", "user2", user2Actions);
126+
127+
admin.namespaces().setSubscriptionAuthMode("public/default", SubscriptionAuthMode.Prefix);
128+
129+
String topicName = "persistent://public/default/testSubscriptionAuthMode";
130+
MQTT mqttConsumer = createMQTTClient();
131+
mqttConsumer.setClientId("user2-11");
132+
mqttConsumer.setUserName("user2");
133+
mqttConsumer.setPassword("pass2");
134+
BlockingConnection consumer = mqttConsumer.blockingConnection();
135+
consumer.connect();
136+
Topic[] topics = {new Topic(topicName, QoS.AT_LEAST_ONCE)};
137+
consumer.subscribe(topics);
138+
139+
MQTT mqttProducer = createMQTTClient();
140+
mqttProducer.setUserName("user1");
141+
mqttProducer.setPassword("pass1");
142+
BlockingConnection producer = mqttProducer.blockingConnection();
143+
producer.connect();
144+
String message = "Hello MQTT";
145+
producer.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false);
146+
147+
Message receive = consumer.receive();
148+
Assert.assertEquals(new String(receive.getPayload()), message);
149+
producer.disconnect();
150+
consumer.disconnect();
151+
152+
MQTT mqttConsumer3 = createMQTTClient();
153+
mqttConsumer3.setClientId("user3-11");
154+
mqttConsumer3.setUserName("user2");
155+
mqttConsumer3.setPassword("pass2");
156+
mqttConsumer3.setConnectAttemptsMax(0);
157+
mqttConsumer3.setReconnectAttemptsMax(0);
158+
BlockingConnection connection3 = mqttConsumer3.blockingConnection();
159+
connection3.connect();
160+
Awaitility.await().untilAsserted(() -> {
161+
Assert.assertTrue(connection3.isConnected());
162+
});
163+
connection3.subscribe(topics);
164+
Awaitility.await().untilAsserted(() -> {
165+
Assert.assertFalse(connection3.isConnected());
166+
});
167+
}
168+
116169
@Test
117170
public void testNotAuthorized() throws Exception {
118171
Set<AuthAction> user3Actions = new HashSet<>();

0 commit comments

Comments
 (0)