Skip to content

Commit d2c8afc

Browse files
authored
Support mTLS authentication for MoP (#1414)
1 parent 2b9f038 commit d2c8afc

36 files changed

+764
-41
lines changed

.github/workflows/pr_test.yml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ jobs:
2323
check:
2424
runs-on: ubuntu-latest
2525
steps:
26-
- uses: actions/checkout@v2
26+
- uses: actions/checkout@v3
2727

2828
- name: Cache Maven packages
29-
uses: actions/cache@v2
29+
uses: actions/cache@v3
3030
with:
3131
path: ~/.m2
3232
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
3333
restore-keys: ${{ runner.os }}-m2
3434

3535
- name: Set up JDK 17
36-
uses: actions/setup-java@v2
36+
uses: actions/setup-java@v3
3737
with:
3838
distribution: 'temurin'
3939
java-version: 17
@@ -55,7 +55,7 @@ jobs:
5555
runs-on: ubuntu-latest
5656
timeout-minutes: 5
5757
steps:
58-
- uses: actions/checkout@v2
58+
- uses: actions/checkout@v3
5959

6060
- name: Get All Tests
6161
id: list-test
@@ -78,17 +78,17 @@ jobs:
7878
runs-on: ubuntu-latest
7979
timeout-minutes: 20
8080
steps:
81-
- uses: actions/checkout@v2
81+
- uses: actions/checkout@v3
8282

8383
- name: Cache Maven packages
84-
uses: actions/cache@v2
84+
uses: actions/cache@v3
8585
with:
8686
path: ~/.m2
8787
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
8888
restore-keys: ${{ runner.os }}-m2
8989

9090
- name: Set up JDK 17
91-
uses: actions/setup-java@v2
91+
uses: actions/setup-java@v3
9292
with:
9393
distribution: 'temurin'
9494
java-version: 17
@@ -125,17 +125,17 @@ jobs:
125125
runs-on: ubuntu-latest
126126
timeout-minutes: 10
127127
steps:
128-
- uses: actions/checkout@v2
128+
- uses: actions/checkout@v3
129129

130130
- name: Cache Maven packages
131-
uses: actions/cache@v2
131+
uses: actions/cache@v3
132132
with:
133133
path: ~/.m2
134134
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
135135
restore-keys: ${{ runner.os }}-m2
136136

137137
- name: Set up JDK 17
138-
uses: actions/setup-java@v2
138+
uses: actions/setup-java@v3
139139
with:
140140
distribution: 'temurin'
141141
java-version: 17
@@ -144,7 +144,7 @@ jobs:
144144
run: mvn clean install -DskipTests
145145

146146
- name: Download jacoco artifact
147-
uses: actions/download-artifact@v2
147+
uses: actions/download-artifact@v3
148148
with:
149149
path: mqtt-impl/target
150150

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4545
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4646
import lombok.Getter;
47+
import lombok.Setter;
4748
import lombok.extern.slf4j.Slf4j;
4849
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
4950

@@ -69,7 +70,8 @@ public class Connection {
6970
@Getter
7071
private final TopicSubscriptionManager topicSubscriptionManager;
7172
@Getter
72-
private final MqttConnectMessage connectMessage;
73+
@Setter
74+
private MqttConnectMessage connectMessage;
7375
@Getter
7476
private final ClientRestrictions clientRestrictions;
7577
@Getter

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public final class Constants {
2424
public static final String AUTH_BASIC = "basic";
2525
public static final String AUTH_TOKEN = "token";
2626

27+
public static final String AUTH_MTLS = "mTls";
28+
2729
public static final String ATTR_TOPIC_SUBS = "topicSubs";
2830

2931
public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_";

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,25 @@
1414
package io.streamnative.pulsar.handlers.mqtt;
1515

1616
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_BASIC;
17+
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS;
1718
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN;
1819
import io.netty.handler.codec.mqtt.MqttConnectMessage;
1920
import io.netty.handler.codec.mqtt.MqttConnectPayload;
21+
import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls;
2022
import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils;
2123
import java.util.HashMap;
2224
import java.util.List;
2325
import java.util.Map;
2426
import javax.naming.AuthenticationException;
27+
import javax.net.ssl.SSLSession;
2528
import lombok.Getter;
2629
import lombok.RequiredArgsConstructor;
2730
import lombok.extern.slf4j.Slf4j;
2831
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
2932
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
3033
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
3134
import org.apache.pulsar.broker.authentication.AuthenticationService;
35+
import org.apache.pulsar.broker.service.BrokerService;
3236

3337
/**
3438
* MQTT authentication service.
@@ -42,8 +46,15 @@ public class MQTTAuthenticationService {
4246
@Getter
4347
private final Map<String, AuthenticationProvider> authenticationProviders;
4448

45-
public MQTTAuthenticationService(AuthenticationService authenticationService, List<String> authenticationMethods) {
46-
this.authenticationService = authenticationService;
49+
private final BrokerService brokerService;
50+
51+
private final boolean mqttProxyMTlsAuthenticationEnabled;
52+
53+
public MQTTAuthenticationService(BrokerService brokerService, List<String> authenticationMethods, boolean
54+
mqttProxyMTlsAuthenticationEnabled) {
55+
this.brokerService = brokerService;
56+
this.mqttProxyMTlsAuthenticationEnabled = mqttProxyMTlsAuthenticationEnabled;
57+
this.authenticationService = brokerService.getAuthenticationService();
4758
this.authenticationProviders = getAuthenticationProviders(authenticationMethods);
4859
}
4960

@@ -57,34 +68,49 @@ private Map<String, AuthenticationProvider> getAuthenticationProviders(List<Stri
5768
log.error("MQTT authentication method {} is not enabled in Pulsar configuration!", method);
5869
}
5970
}
71+
if (mqttProxyMTlsAuthenticationEnabled) {
72+
AuthenticationProviderMTls providerMTls = new AuthenticationProviderMTls();
73+
try {
74+
providerMTls.initialize(brokerService.pulsar().getLocalMetadataStore());
75+
providers.put(AUTH_MTLS, providerMTls);
76+
} catch (Exception e) {
77+
log.error("Failed to initialize MQTT authentication method {} ", AUTH_MTLS, e);
78+
}
79+
}
6080
if (providers.isEmpty()) {
6181
throw new IllegalArgumentException(
6282
"MQTT authentication is enabled but no providers were successfully configured");
6383
}
84+
6485
return providers;
6586
}
6687

67-
public AuthenticationResult authenticate(MqttConnectMessage connectMessage) {
88+
public AuthenticationResult authenticate(boolean fromProxy,
89+
SSLSession session, MqttConnectMessage connectMessage) {
6890
String authMethod = MqttMessageUtils.getAuthMethod(connectMessage);
6991
if (authMethod != null) {
7092
byte[] authData = MqttMessageUtils.getAuthData(connectMessage);
7193
if (authData == null) {
7294
return AuthenticationResult.FAILED;
7395
}
96+
if (fromProxy && AUTH_MTLS.equalsIgnoreCase(authMethod)) {
97+
return new AuthenticationResult(true, new String(authData),
98+
new AuthenticationDataCommand(new String(authData), null, session));
99+
}
74100
return authenticate(connectMessage.payload().clientIdentifier(), authMethod,
75-
new AuthenticationDataCommand(new String(authData)));
101+
new AuthenticationDataCommand(new String(authData), null, session));
76102
}
77-
return authenticate(connectMessage.payload());
103+
return authenticate(connectMessage.payload(), session);
78104
}
79105

80-
public AuthenticationResult authenticate(MqttConnectPayload payload) {
106+
public AuthenticationResult authenticate(MqttConnectPayload payload, SSLSession session) {
81107
String userRole = null;
82108
boolean authenticated = false;
83109
AuthenticationDataSource authenticationDataSource = null;
84110
for (Map.Entry<String, AuthenticationProvider> entry : authenticationProviders.entrySet()) {
85111
String authMethod = entry.getKey();
86112
try {
87-
AuthenticationDataSource authData = getAuthData(authMethod, payload);
113+
AuthenticationDataSource authData = getAuthData(authMethod, payload, session);
88114
userRole = entry.getValue().authenticate(authData);
89115
authenticated = true;
90116
authenticationDataSource = authData;
@@ -116,12 +142,14 @@ public AuthenticationResult authenticate(String clientIdentifier,
116142
return new AuthenticationResult(authenticated, userRole, command);
117143
}
118144

119-
public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload) {
145+
public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload, SSLSession session) {
120146
switch (authMethod) {
121147
case AUTH_BASIC:
122148
return new AuthenticationDataCommand(payload.userName() + ":" + payload.password());
123149
case AUTH_TOKEN:
124150
return new AuthenticationDataCommand(payload.password());
151+
case AUTH_MTLS:
152+
return new AuthenticationDataCommand(null, null, session);
125153
default:
126154
throw new IllegalArgumentException(
127155
String.format("Unsupported authentication method : %s!", authMethod));

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
145145
)
146146
private boolean mqttProxyTlsEnabled = false;
147147

148+
@FieldContext(
149+
category = CATEGORY_MQTT_PROXY,
150+
required = false,
151+
doc = "Whether use mTLS authenticate for mTLS connection"
152+
)
153+
private boolean mqttProxyMTlsAuthenticationEnabled = false;
154+
148155
@FieldContext(
149156
category = CATEGORY_MQTT_PROXY,
150157
required = false,

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo
9696
this.metricsProvider = new MQTTMetricsProvider(metricsCollector);
9797
this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider);
9898
this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled()
99-
? new MQTTAuthenticationService(brokerService.getAuthenticationService(),
100-
serverConfiguration.getMqttAuthenticationMethods()) : null;
99+
? new MQTTAuthenticationService(brokerService,
100+
serverConfiguration.getMqttAuthenticationMethods(),
101+
serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null;
101102
this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress());
102103
this.subscriptionManager = new MQTTSubscriptionManager();
103104
if (getServerConfiguration().isMqttProxyEnabled()) {

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public CompletableFuture<Void> writeAndFlush(final MqttAdapterMessage adapterMsg
5151
});
5252
future.exceptionally(ex -> {
5353
log.warn("[AdapterChannel][{}] Proxy write to broker {} failed."
54-
+ " error message: {}", clientId, broker, ex.getMessage());
54+
+ " adapterMsg message: {}", clientId, broker, adapterMsg, ex);
5555
return null;
5656
});
5757
return future;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.mqtt.exception;
15+
16+
/**
17+
* Internal server exception.
18+
*/
19+
public class MQTTAuthException extends Exception {
20+
21+
public MQTTAuthException() {
22+
}
23+
24+
public MQTTAuthException(String message) {
25+
super(message);
26+
}
27+
28+
public MQTTAuthException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
32+
public MQTTAuthException(Throwable cause) {
33+
super(cause);
34+
}
35+
}

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.mqtt.identitypool;
1515

16-
1716
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN;
1817
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS;
1918
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN;

0 commit comments

Comments
 (0)