Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.

Commit 9c6ebfd

Browse files
author
tnewman-at-gm
authored
Merge pull request #3 from tnewman-at-gm/feature/entra-auth
FE/BE - Azure Entra Support (Event Hub with Kafka Protocol)
2 parents 682d37f + 4860e84 commit 9c6ebfd

File tree

4 files changed

+18
-23
lines changed

4 files changed

+18
-23
lines changed

api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandler.java renamed to api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandler.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
package io.kafbat.ui.sasl.azure.entra;
1+
package io.kafbat.ui.config.auth.azure;
22

33
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
44

5-
import com.azure.core.credential.AccessToken;
65
import com.azure.core.credential.TokenCredential;
76
import com.azure.core.credential.TokenRequestContext;
87
import com.azure.identity.DefaultAzureCredentialBuilder;
@@ -13,16 +12,14 @@
1312
import javax.security.auth.callback.Callback;
1413
import javax.security.auth.callback.UnsupportedCallbackException;
1514
import javax.security.auth.login.AppConfigurationEntry;
15+
import lombok.extern.slf4j.Slf4j;
1616
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
1717
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
1818
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
19-
import org.slf4j.Logger;
20-
import org.slf4j.LoggerFactory;
2119

20+
@Slf4j
2221
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {
2322

24-
private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class);
25-
2623
private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);
2724

2825
private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
@@ -53,15 +50,15 @@ private URI buildEventHubsServerUri(Map<String, ?> configs) {
5350

5451
if (null == bootstrapServers) {
5552
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
56-
LOGGER.error(message);
53+
log.error(message);
5754
throw new IllegalArgumentException(message);
5855
}
5956

6057
if (bootstrapServers.size() != 1) {
6158
final String message =
6259
BOOTSTRAP_SERVERS_CONFIG
6360
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
64-
LOGGER.error(message);
61+
log.error(message);
6562
throw new IllegalArgumentException(message);
6663
}
6764

@@ -87,9 +84,9 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
8784
try {
8885
final OAuthBearerToken token = tokenCredential
8986
.getToken(tokenRequestContext)
90-
.map(AzureEntraOAuthBearerTokenImpl::new)
87+
.map(AzureEntraOAuthBearerToken::new)
9188
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
92-
.doOnError(e -> LOGGER.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
89+
.doOnError(e -> log.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
9390
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
9491
.block();
9592

@@ -98,7 +95,7 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
9895
final String message =
9996
"Failed to acquire Azure token for Event Hub Authentication. "
10097
+ "Please ensure valid Azure credentials are configured.";
101-
LOGGER.error(message, e);
98+
log.error(message, e);
10299
oauthCallback.error("invalid_grant", message, null);
103100
}
104101
}

api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImpl.java renamed to api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerToken.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.kafbat.ui.sasl.azure.entra;
1+
package io.kafbat.ui.config.auth.azure;
22

33
import com.azure.core.credential.AccessToken;
44
import com.nimbusds.jwt.JWTClaimsSet;
@@ -11,13 +11,13 @@
1111
import org.apache.kafka.common.errors.SaslAuthenticationException;
1212
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
1313

14-
public class AzureEntraOAuthBearerTokenImpl implements OAuthBearerToken {
14+
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {
1515

1616
private final AccessToken accessToken;
1717

1818
private final JWTClaimsSet claims;
1919

20-
public AzureEntraOAuthBearerTokenImpl(AccessToken accessToken) {
20+
public AzureEntraOAuthBearerToken(AccessToken accessToken) {
2121
this.accessToken = accessToken;
2222

2323
try {
@@ -48,9 +48,7 @@ public Set<String> scope() {
4848
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
4949
// scp
5050
// claim is a String which is presented as a space separated list.
51-
return Optional.ofNullable(claims.getClaim("scp"))
52-
.map(s -> Arrays.stream(((String) s).split(" ")).collect(Collectors.toSet()))
53-
.orElse(null);
51+
return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet());
5452
}
5553

5654
@Override

api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandlerTest.java renamed to api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.kafbat.ui.sasl.azure.entra;
1+
package io.kafbat.ui.config.auth.azure;
22

33
import static org.hamcrest.CoreMatchers.is;
44
import static org.hamcrest.CoreMatchers.notNullValue;
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.kafbat.ui.sasl.azure.entra;
1+
package io.kafbat.ui.config.auth.azure;
22

33
import static org.hamcrest.CoreMatchers.is;
44
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -12,7 +12,7 @@
1212
import org.apache.kafka.common.errors.SaslAuthenticationException;
1313
import org.junit.jupiter.api.Test;
1414

15-
public class AzureEntraOAuthBearerTokenImplTest {
15+
public class AzureEntraOAuthBearerTokenTest {
1616

1717
// These are not real tokens. It was generated using fake values with an invalid signature,
1818
// so it is safe to store here.
@@ -39,8 +39,8 @@ public class AzureEntraOAuthBearerTokenImplTest {
3939
void constructorShouldParseToken() {
4040
final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN);
4141

42-
final AzureEntraOAuthBearerTokenImpl azureOAuthBearerToken =
43-
new AzureEntraOAuthBearerTokenImpl(accessToken);
42+
final AzureEntraOAuthBearerToken azureOAuthBearerToken =
43+
new AzureEntraOAuthBearerToken(accessToken);
4444

4545
assertThat(azureOAuthBearerToken, is(notNullValue()));
4646
assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN));
@@ -53,7 +53,7 @@ void constructorShouldParseToken() {
5353

5454
@Test
5555
void constructorShouldRejectInvalidToken() {
56-
assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerTokenImpl(
56+
assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerToken(
5757
new AccessToken("invalid", OffsetDateTime.MIN)));
5858
}
5959
}

0 commit comments

Comments
 (0)