Skip to content

Commit 13db85b

Browse files
authored
Fix mTLS authorization bug (#1455)
1 parent d2c8afc commit 13db85b

File tree

4 files changed

+45
-72
lines changed

4 files changed

+45
-72
lines changed

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,16 @@ private Map<String, AuthenticationProvider> getAuthenticationProviders(List<Stri
8787

8888
public AuthenticationResult authenticate(boolean fromProxy,
8989
SSLSession session, MqttConnectMessage connectMessage) {
90+
if (fromProxy) {
91+
return new AuthenticationResult(true, null, null);
92+
}
9093
String authMethod = MqttMessageUtils.getAuthMethod(connectMessage);
9194
if (authMethod != null) {
9295
byte[] authData = MqttMessageUtils.getAuthData(connectMessage);
9396
if (authData == null) {
9497
return AuthenticationResult.FAILED;
9598
}
96-
if (fromProxy && AUTH_MTLS.equalsIgnoreCase(authMethod)) {
97-
return new AuthenticationResult(true, new String(authData),
98-
new AuthenticationDataCommand(new String(authData), null, session));
99-
}
99+
100100
return authenticate(connectMessage.payload().clientIdentifier(), authMethod,
101101
new AuthenticationDataCommand(new String(authData), null, session));
102102
}

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.mqtt.proxy;
1515

16-
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS;
1716
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttConnectMessage;
1817
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttPublishMessage;
1918
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttSubscribeMessage;
@@ -141,11 +140,9 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole,
141140
.processor(this)
142141
.build();
143142
connection.sendConnAck();
144-
if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
145-
MqttConnectMessage connectMessage = createMqttConnectMessage(msg, AUTH_MTLS, userRole);
146-
msg = connectMessage;
147-
connection.setConnectMessage(msg);
148-
}
143+
MqttConnectMessage connectMessage = createMqttConnectMessage(msg, userRole);
144+
msg = connectMessage;
145+
connection.setConnectMessage(msg);
149146

150147
ConnectEvent connectEvent = ConnectEvent.builder()
151148
.clientId(connection.getClientId())
@@ -166,10 +163,8 @@ public void processPublish(MqttAdapterMessage adapter) {
166163
proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),
167164
TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));
168165
adapter.setClientId(connection.getClientId());
169-
if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
170-
MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, AUTH_MTLS, connection.getUserRole());
171-
adapter.setMqttMessage(mqttMessage);
172-
}
166+
MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, connection.getUserRole());
167+
adapter.setMqttMessage(mqttMessage);
173168
startPublish()
174169
.thenCompose(__ -> writeToBroker(pulsarTopicName, adapter))
175170
.whenComplete((unused, ex) -> {
@@ -300,10 +295,8 @@ public void processSubscribe(final MqttAdapterMessage adapter) {
300295
log.debug("[Proxy Subscribe] [{}] msg: {}", clientId, msg);
301296
}
302297
registerTopicListener(adapter);
303-
if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
304-
MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, AUTH_MTLS, connection.getUserRole());
305-
adapter.setMqttMessage(mqttMessage);
306-
}
298+
MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, connection.getUserRole());
299+
adapter.setMqttMessage(mqttMessage);
307300
doSubscribe(adapter, false)
308301
.exceptionally(ex -> {
309302
Throwable realCause = FutureUtil.unwrapCompletionException(ex);

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
package io.streamnative.pulsar.handlers.mqtt.support;
1515

1616
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createWillMessage;
17-
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getMtlsAuthMethodAndData;
17+
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getAuthenticationRole;
1818
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.pingResp;
1919
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.topicSubscriptions;
2020
import io.netty.channel.ChannelHandlerContext;
@@ -75,9 +75,7 @@
7575
import org.apache.bookkeeper.mledger.Position;
7676
import org.apache.bookkeeper.mledger.PositionFactory;
7777
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
78-
import org.apache.commons.lang3.tuple.Pair;
7978
import org.apache.pulsar.broker.PulsarService;
80-
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
8179
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
8280
import org.apache.pulsar.broker.authorization.AuthorizationService;
8381
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -199,10 +197,9 @@ public void processPublish(MqttAdapterMessage adapter) {
199197
String userRole = connection.getUserRole();
200198
AuthenticationDataSource authData = connection.getAuthData();
201199
if (adapter.fromProxy()) {
202-
final Optional<Pair<String, byte[]>> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg);
203-
if (mtlsAuthMethodAndData.isPresent()) {
204-
userRole = mtlsAuthMethodAndData.get().getKey();
205-
authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue()));
200+
final Optional<String> authenticationRole = getAuthenticationRole(msg);
201+
if (authenticationRole.isPresent()) {
202+
userRole = authenticationRole.get();
206203
}
207204
}
208205
result = this.authorizationService.canProduceAsync(TopicName.get(msg.variableHeader().topicName()),
@@ -224,9 +221,11 @@ private CompletableFuture<Void> doUnauthorized(MqttAdapterMessage adapter) {
224221
log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}",
225222
msg.variableHeader().topicName(), connection.getUserRole(),
226223
connection.getClientId());
224+
int packetId = msg.variableHeader().packetId();
225+
packetId = packetId == -1 ? 1 : packetId;
227226
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck
228227
.errorBuilder(connection.getProtocolVersion())
229-
.packetId(msg.variableHeader().packetId())
228+
.packetId(packetId)
230229
.reasonCode(Mqtt5PubReasonCode.NOT_AUTHORIZED);
231230
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
232231
pubAckBuilder.reasonString("Not Authorized!");
@@ -367,10 +366,9 @@ public void processSubscribe(MqttAdapterMessage adapter) {
367366
} else {
368367
AuthenticationDataSource authData = connection.getAuthData();
369368
if (adapter.fromProxy()) {
370-
final Optional<Pair<String, byte[]>> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg);
371-
if (mtlsAuthMethodAndData.isPresent()) {
372-
userRole = mtlsAuthMethodAndData.get().getKey();
373-
authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue()));
369+
final Optional<String> authenticationRole = getAuthenticationRole(msg);
370+
if (authenticationRole.isPresent()) {
371+
userRole = authenticationRole.get();
374372
}
375373
}
376374
List<CompletableFuture<Void>> authorizationFutures = new ArrayList<>();

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
18-
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS;
1918
import io.netty.buffer.Unpooled;
2019
import io.netty.channel.Channel;
2120
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -41,7 +40,6 @@
4140
import java.util.UUID;
4241
import java.util.stream.Collectors;
4342
import org.apache.commons.codec.binary.Hex;
44-
import org.apache.commons.lang3.tuple.Pair;
4543

4644
/**
4745
* Mqtt message utils.
@@ -50,6 +48,8 @@ public class MqttMessageUtils {
5048

5149
public static final int CLIENT_IDENTIFIER_MAX_LENGTH = 23;
5250

51+
public static final String AUTHENTICATE_ROLE_KEY = "__mop_auth_role";
52+
5353
public static void checkState(MqttMessage msg) {
5454
if (!msg.decoderResult().isSuccess()) {
5555
throw new IllegalStateException(msg.decoderResult().cause().getMessage());
@@ -190,14 +190,10 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage)
190190
}
191191

192192
public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage,
193-
String authMethod,
194193
String authData) {
195194
final MqttConnectVariableHeader header = connectMessage.variableHeader();
196195
MqttProperties properties = new MqttProperties();
197-
properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()
198-
, authMethod));
199-
properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()
200-
, authData.getBytes()));
196+
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
201197
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
202198
MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(),
203199
header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(),
@@ -209,72 +205,58 @@ public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage con
209205
}
210206

211207
public static MqttPublishMessage createMqttPublishMessage(MqttPublishMessage publishMessage,
212-
String authMethod,
213208
String authData) {
214209
final MqttPublishVariableHeader header = publishMessage.variableHeader();
215210
MqttProperties properties = new MqttProperties();
216-
properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()
217-
, authMethod));
218-
properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()
219-
, authData.getBytes()));
211+
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
220212
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(
221213
header.topicName(), header.packetId(), properties);
222214
MqttPublishMessage newPublishMessage = new MqttPublishMessage(publishMessage.fixedHeader(), variableHeader,
223215
publishMessage.payload());
224216
return newPublishMessage;
225217
}
226218

227-
public static Optional<Pair<String, byte[]>> getMtlsAuthMethodAndData(MqttConnectMessage connectMessage) {
219+
public static Optional<String> getAuthenticationRole(MqttConnectMessage connectMessage) {
228220
final MqttConnectVariableHeader header = connectMessage.variableHeader();
229221
MqttProperties properties = header.properties();
230-
final MqttProperties.MqttProperty property = properties.getProperty(
231-
MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value());
232-
if (property != null && property.value() instanceof String
233-
&& ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) {
234-
final MqttProperties.MqttProperty data = properties.getProperty(
235-
MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value());
236-
return Optional.of(Pair.of((String) property.value(), (byte[]) data.value()));
222+
final MqttProperties.UserProperties data = (MqttProperties.UserProperties) properties.getProperty(
223+
MqttProperties.MqttPropertyType.USER_PROPERTY.value());
224+
if (data != null && data.value() instanceof List<MqttProperties.StringPair>) {
225+
return data.value().stream().filter(d -> d.key.equalsIgnoreCase(AUTHENTICATE_ROLE_KEY))
226+
.map(e -> e.value).findFirst();
237227
}
238228
return Optional.empty();
239229
}
240230

241-
public static Optional<Pair<String, byte[]>> getMtlsAuthMethodAndData(MqttPublishMessage publishMessage) {
231+
public static Optional<String> getAuthenticationRole(MqttPublishMessage publishMessage) {
242232
final MqttPublishVariableHeader header = publishMessage.variableHeader();
243233
MqttProperties properties = header.properties();
244-
final MqttProperties.MqttProperty property = properties.getProperty(
245-
MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value());
246-
if (property != null && property.value() instanceof String
247-
&& ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) {
248-
final MqttProperties.MqttProperty data = properties.getProperty(
249-
MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value());
250-
return Optional.of(Pair.of((String) property.value(), (byte[]) data.value()));
251-
}
234+
final MqttProperties.UserProperties data = (MqttProperties.UserProperties) properties.getProperty(
235+
MqttProperties.MqttPropertyType.USER_PROPERTY.value());
236+
if (data != null && data.value() instanceof List<MqttProperties.StringPair>) {
237+
return data.value().stream().filter(d -> d.key.equalsIgnoreCase(AUTHENTICATE_ROLE_KEY))
238+
.map(e -> e.value).findFirst();
239+
}
252240
return Optional.empty();
253241
}
254242

255-
public static Optional<Pair<String, byte[]>> getMtlsAuthMethodAndData(MqttSubscribeMessage subscribeMessage) {
243+
public static Optional<String> getAuthenticationRole(MqttSubscribeMessage subscribeMessage) {
256244
final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader();
257245
MqttProperties properties = header.properties();
258-
final MqttProperties.MqttProperty property = properties.getProperty(
259-
MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value());
260-
if (property != null && property.value() instanceof String
261-
&& ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) {
262-
final MqttProperties.MqttProperty data = properties.getProperty(
263-
MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value());
264-
return Optional.of(Pair.of((String) property.value(), (byte[]) data.value()));
246+
final MqttProperties.UserProperties data = (MqttProperties.UserProperties) properties.getProperty(
247+
MqttProperties.MqttPropertyType.USER_PROPERTY.value());
248+
if (data != null && data.value() instanceof List<MqttProperties.StringPair>) {
249+
return data.value().stream().filter(d -> d.key.equalsIgnoreCase(AUTHENTICATE_ROLE_KEY))
250+
.map(e -> e.value).findFirst();
265251
}
266252
return Optional.empty();
267253
}
268254

269255
public static MqttSubscribeMessage createMqttSubscribeMessage(MqttSubscribeMessage subscribeMessage,
270-
String authMethod,
271256
String authData) {
272257
final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader();
273258
MqttProperties properties = new MqttProperties();
274-
properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()
275-
, authMethod));
276-
properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()
277-
, authData.getBytes()));
259+
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
278260
MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(
279261
header.messageId(), properties);
280262
MqttSubscribeMessage newSubscribeMessage = new MqttSubscribeMessage(subscribeMessage.fixedHeader(),

0 commit comments

Comments
 (0)