Skip to content

Commit 1daa690

Browse files
authored
Fix user properties lost when enable authorization (#1731)
Fix user properties lost when enable authorized
1 parent 7dd74fc commit 1daa690

File tree

3 files changed

+63
-1
lines changed

3 files changed

+63
-1
lines changed

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/PulsarMessageConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import static io.streamnative.pulsar.handlers.mqtt.common.Constants.MQTT_PROPERTIES;
1818
import static io.streamnative.pulsar.handlers.mqtt.common.Constants.MQTT_PROPERTIES_PREFIX;
19+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.AUTHENTICATE_ROLE_KEY;
1920
import com.google.common.base.Splitter;
2021
import com.google.common.collect.Lists;
2122
import io.netty.buffer.ByteBuf;
@@ -100,8 +101,10 @@ public static MessageImpl<byte[]> toPulsarMsg(MQTTServerConfiguration configurat
100101
metadata.setPartitionKey(pair.value);
101102
metadata.setPartitionKeyB64Encoded(false);
102103
}
103-
metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key)
104+
if (!pair.key.equalsIgnoreCase(AUTHENTICATE_ROLE_KEY)) {
105+
metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key)
104106
.setValue(pair.value);
107+
}
105108
});
106109
} else if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) {
107110
MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop;

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/MqttMessageUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ public static MqttPublishMessage createMqttPublishMessage(MqttPublishMessage pub
203203
String authData) {
204204
final MqttPublishVariableHeader header = publishMessage.variableHeader();
205205
MqttProperties properties = new MqttProperties();
206+
header.properties().listAll().forEach(properties::add);
206207
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
207208
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(
208209
header.topicName(), header.packetId(), properties);
@@ -251,6 +252,7 @@ public static MqttSubscribeMessage createMqttSubscribeMessage(MqttSubscribeMessa
251252
String authData) {
252253
final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader();
253254
MqttProperties properties = new MqttProperties();
255+
header.properties().listAll().forEach(properties::add);
254256
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
255257
MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(
256258
header.messageId(), properties);

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
1717
import com.hivemq.client.mqtt.datatypes.MqttQos;
1818
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
19+
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
20+
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
1921
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException;
2022
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
2123
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
@@ -33,12 +35,14 @@
3335
import java.util.HashSet;
3436
import java.util.Random;
3537
import java.util.Set;
38+
import lombok.extern.slf4j.Slf4j;
3639
import org.apache.pulsar.client.admin.PulsarAdminException;
3740
import org.apache.pulsar.common.policies.data.AuthAction;
3841
import org.awaitility.Awaitility;
3942
import org.testng.Assert;
4043
import org.testng.annotations.Test;
4144

45+
@Slf4j
4246
public class MQTT5AuthorizationProxyReasonCodeOnAllAckTest extends AuthorizationConfig {
4347
private final Random random = new Random();
4448

@@ -200,5 +204,58 @@ public void testAuthenticationSuccess() {
200204
.send();
201205
Assert.assertEquals(connAck.getReasonCode(), Mqtt5ConnAckReasonCode.SUCCESS);
202206
}
207+
208+
@Test
209+
public void testPublishWithUserPropertiesAndEnableAuthorization() throws Exception {
210+
Set<AuthAction> userActions = new HashSet<>();
211+
userActions.add(AuthAction.produce);
212+
userActions.add(AuthAction.consume);
213+
admin.namespaces().grantPermissionOnNamespace("public/default", "user1", userActions);
214+
215+
216+
final String topic = "testPublishWithUserProperties";
217+
Mqtt5BlockingClient client1 = MQTT5ClientUtils.createMqtt5ProxyClient(
218+
getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())));
219+
Mqtt5UserProperties userProperty = Mqtt5UserProperties.builder()
220+
.add("user-1", "value-1")
221+
.add("user-2", "value-2")
222+
.build();
223+
Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1");
224+
Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2");
225+
client1.connectWith()
226+
.simpleAuth()
227+
.username("user1")
228+
.password("pass1".getBytes(StandardCharsets.UTF_8))
229+
.applySimpleAuth()
230+
.send();
231+
Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE)
232+
.userProperties(userProperty)
233+
.asWill().build();
234+
235+
Mqtt5BlockingClient client2 = MQTT5ClientUtils.createMqtt5ProxyClient(
236+
getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())));
237+
client2.connectWith()
238+
.simpleAuth()
239+
.username("user1")
240+
.password("pass1".getBytes(StandardCharsets.UTF_8))
241+
.applySimpleAuth()
242+
.send();
243+
client2.subscribeWith()
244+
.topicFilter(topic)
245+
.qos(MqttQos.AT_LEAST_ONCE)
246+
.send();
247+
Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL);
248+
client1.publish(publishMessage);
249+
Mqtt5Publish message = publishes.receive();
250+
Assert.assertNotNull(message);
251+
log.info("Received message properties: {}", message.getUserProperties());
252+
// Validate the user properties order, must be the same with set order.
253+
Assert.assertEquals(message.getUserProperties().asList().get(0).compareTo(userProperty1), 0);
254+
Assert.assertEquals(message.getUserProperties().asList().get(1).compareTo(userProperty2), 0);
255+
publishes.close();
256+
client2.unsubscribeWith().topicFilter(topic).send();
257+
client1.disconnect();
258+
client2.disconnect();
259+
}
203260
}
204261

0 commit comments

Comments
 (0)