Skip to content

Commit 26eec37

Browse files
authored
Fix mTls authorize bug (#1456)
1 parent 13db85b commit 26eec37

File tree

4 files changed

+28
-17
lines changed

4 files changed

+28
-17
lines changed

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private Map<String, AuthenticationProvider> getAuthenticationProviders(List<Stri
8888
public AuthenticationResult authenticate(boolean fromProxy,
8989
SSLSession session, MqttConnectMessage connectMessage) {
9090
if (fromProxy) {
91-
return new AuthenticationResult(true, null, null);
91+
return AuthenticationResult.PASSED;
9292
}
9393
String authMethod = MqttMessageUtils.getAuthMethod(connectMessage);
9494
if (authMethod != null) {
@@ -161,6 +161,7 @@ public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayloa
161161
public static class AuthenticationResult {
162162

163163
public static final AuthenticationResult FAILED = new AuthenticationResult(false, null, null);
164+
public static final AuthenticationResult PASSED = new AuthenticationResult(true, null, null);
164165
private final boolean authenticated;
165166
private final String userRole;
166167
private final AuthenticationDataSource authData;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.mqtt.proxy;
1515

16-
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttConnectMessage;
16+
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqtt5ConnectMessage;
1717
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttPublishMessage;
1818
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttSubscribeMessage;
1919
import com.google.common.collect.Lists;
@@ -140,9 +140,12 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole,
140140
.processor(this)
141141
.build();
142142
connection.sendConnAck();
143-
MqttConnectMessage connectMessage = createMqttConnectMessage(msg, userRole);
144-
msg = connectMessage;
145-
connection.setConnectMessage(msg);
143+
144+
if (proxyConfig.isMqttAuthorizationEnabled()) {
145+
MqttConnectMessage connectMessage = createMqtt5ConnectMessage(msg);
146+
msg = connectMessage;
147+
connection.setConnectMessage(msg);
148+
}
146149

147150
ConnectEvent connectEvent = ConnectEvent.builder()
148151
.clientId(connection.getClientId())
@@ -163,8 +166,10 @@ public void processPublish(MqttAdapterMessage adapter) {
163166
proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),
164167
TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));
165168
adapter.setClientId(connection.getClientId());
166-
MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, connection.getUserRole());
167-
adapter.setMqttMessage(mqttMessage);
169+
if (proxyConfig.isMqttAuthorizationEnabled()) {
170+
MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, connection.getUserRole());
171+
adapter.setMqttMessage(mqttMessage);
172+
}
168173
startPublish()
169174
.thenCompose(__ -> writeToBroker(pulsarTopicName, adapter))
170175
.whenComplete((unused, ex) -> {
@@ -295,8 +300,10 @@ public void processSubscribe(final MqttAdapterMessage adapter) {
295300
log.debug("[Proxy Subscribe] [{}] msg: {}", clientId, msg);
296301
}
297302
registerTopicListener(adapter);
298-
MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, connection.getUserRole());
299-
adapter.setMqttMessage(mqttMessage);
303+
if (proxyConfig.isMqttAuthorizationEnabled()) {
304+
MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, connection.getUserRole());
305+
adapter.setMqttMessage(mqttMessage);
306+
}
300307
doSubscribe(adapter, false)
301308
.exceptionally(ex -> {
302309
Throwable realCause = FutureUtil.unwrapCompletionException(ex);

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createWillMessage;
1717
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getAuthenticationRole;
18+
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getPacketId;
1819
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.pingResp;
1920
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.topicSubscriptions;
2021
import io.netty.channel.ChannelHandlerContext;
@@ -221,8 +222,7 @@ private CompletableFuture<Void> doUnauthorized(MqttAdapterMessage adapter) {
221222
log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}",
222223
msg.variableHeader().topicName(), connection.getUserRole(),
223224
connection.getClientId());
224-
int packetId = msg.variableHeader().packetId();
225-
packetId = packetId == -1 ? 1 : packetId;
225+
int packetId = getPacketId(msg.variableHeader().packetId());
226226
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck
227227
.errorBuilder(connection.getProtocolVersion())
228228
.packetId(packetId)

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,12 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage)
189189
return builder.build();
190190
}
191191

192-
public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage,
193-
String authData) {
192+
public static MqttConnectMessage createMqtt5ConnectMessage(MqttConnectMessage connectMessage) {
194193
final MqttConnectVariableHeader header = connectMessage.variableHeader();
195-
MqttProperties properties = new MqttProperties();
196-
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
197194
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
198195
MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(),
199196
header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(),
200-
header.isCleanSession(), header.keepAliveTimeSeconds(), properties
201-
);
197+
header.isCleanSession(), header.keepAliveTimeSeconds(), connectMessage.variableHeader().properties());
202198
MqttConnectMessage newConnectMessage = new MqttConnectMessage(connectMessage.fixedHeader(), variableHeader,
203199
connectMessage.payload());
204200
return newConnectMessage;
@@ -287,4 +283,11 @@ public static byte[] getAuthData(MqttConnectMessage connectMessage) {
287283
.getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value());
288284
return authDataProperty != null ? authDataProperty.value() : null;
289285
}
286+
287+
public static int getPacketId(int packetId) {
288+
if (packetId < 1) {
289+
return 1;
290+
}
291+
return packetId;
292+
}
290293
}

0 commit comments

Comments
 (0)