Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.kafbat.ui.sasl.azure.entra;
package io.kafbat.ui.config.auth.azure;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredentialBuilder;
Expand All @@ -13,16 +12,14 @@
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class);

private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);

private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
Expand Down Expand Up @@ -53,15 +50,15 @@ private URI buildEventHubsServerUri(Map<String, ?> configs) {

if (null == bootstrapServers) {
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
LOGGER.error(message);
log.error(message);
throw new IllegalArgumentException(message);
}

if (bootstrapServers.size() != 1) {
final String message =
BOOTSTRAP_SERVERS_CONFIG
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
LOGGER.error(message);
log.error(message);
throw new IllegalArgumentException(message);
}

Expand All @@ -87,9 +84,9 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
try {
final OAuthBearerToken token = tokenCredential
.getToken(tokenRequestContext)
.map(AzureEntraOAuthBearerTokenImpl::new)
.map(AzureEntraOAuthBearerToken::new)
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
.doOnError(e -> LOGGER.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
.doOnError(e -> log.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
.block();

Expand All @@ -98,7 +95,7 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
final String message =
"Failed to acquire Azure token for Event Hub Authentication. "
+ "Please ensure valid Azure credentials are configured.";
LOGGER.error(message, e);
log.error(message, e);
oauthCallback.error("invalid_grant", message, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.kafbat.ui.sasl.azure.entra;
package io.kafbat.ui.config.auth.azure;

import com.azure.core.credential.AccessToken;
import com.nimbusds.jwt.JWTClaimsSet;
Expand All @@ -11,13 +11,13 @@
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;

public class AzureEntraOAuthBearerTokenImpl implements OAuthBearerToken {
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {

private final AccessToken accessToken;

private final JWTClaimsSet claims;

public AzureEntraOAuthBearerTokenImpl(AccessToken accessToken) {
public AzureEntraOAuthBearerToken(AccessToken accessToken) {
this.accessToken = accessToken;

try {
Expand Down Expand Up @@ -48,9 +48,7 @@ public Set<String> scope() {
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
// scp
// claim is a String which is presented as a space separated list.
return Optional.ofNullable(claims.getClaim("scp"))
.map(s -> Arrays.stream(((String) s).split(" ")).collect(Collectors.toSet()))
.orElse(null);
return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.kafbat.ui.sasl.azure.entra;
package io.kafbat.ui.config.auth.azure;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.kafbat.ui.sasl.azure.entra;
package io.kafbat.ui.config.auth.azure;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -12,7 +12,7 @@
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.junit.jupiter.api.Test;

public class AzureEntraOAuthBearerTokenImplTest {
public class AzureEntraOAuthBearerTokenTest {

// These are not real tokens. It was generated using fake values with an invalid signature,
// so it is safe to store here.
Expand All @@ -39,8 +39,8 @@ public class AzureEntraOAuthBearerTokenImplTest {
void constructorShouldParseToken() {
final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN);

final AzureEntraOAuthBearerTokenImpl azureOAuthBearerToken =
new AzureEntraOAuthBearerTokenImpl(accessToken);
final AzureEntraOAuthBearerToken azureOAuthBearerToken =
new AzureEntraOAuthBearerToken(accessToken);

assertThat(azureOAuthBearerToken, is(notNullValue()));
assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN));
Expand All @@ -53,7 +53,7 @@ void constructorShouldParseToken() {

@Test
void constructorShouldRejectInvalidToken() {
assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerTokenImpl(
assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerToken(
new AccessToken("invalid", OffsetDateTime.MIN)));
}
}