diff --git a/api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandler.java b/api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandler.java similarity index 87% rename from api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandler.java rename to api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandler.java index 689a89246..c6c08b6fd 100644 --- a/api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandler.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandler.java @@ -1,8 +1,7 @@ -package io.kafbat.ui.sasl.azure.entra; +package io.kafbat.ui.config.auth.azure; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; -import com.azure.core.credential.AccessToken; import com.azure.core.credential.TokenCredential; import com.azure.core.credential.TokenRequestContext; import com.azure.identity.DefaultAzureCredentialBuilder; @@ -13,16 +12,14 @@ import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class); - private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10); private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6; @@ -53,7 +50,7 @@ private URI buildEventHubsServerUri(Map configs) { if (null == bootstrapServers) { final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration."; - LOGGER.error(message); + log.error(message); throw new IllegalArgumentException(message); } @@ -61,7 +58,7 @@ private URI buildEventHubsServerUri(Map configs) { final String message = BOOTSTRAP_SERVERS_CONFIG + " contains multiple bootstrap servers. Only a single bootstrap server is supported."; - LOGGER.error(message); + log.error(message); throw new IllegalArgumentException(message); } @@ -87,9 +84,9 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) { try { final OAuthBearerToken token = tokenCredential .getToken(tokenRequestContext) - .map(AzureEntraOAuthBearerTokenImpl::new) + .map(AzureEntraOAuthBearerToken::new) .timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME) - .doOnError(e -> LOGGER.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e)) + .doOnError(e -> log.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e)) .retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES) .block(); @@ -98,7 +95,7 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) { final String message = "Failed to acquire Azure token for Event Hub Authentication. " + "Please ensure valid Azure credentials are configured."; - LOGGER.error(message, e); + log.error(message, e); oauthCallback.error("invalid_grant", message, null); } } diff --git a/api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImpl.java b/api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerToken.java similarity index 80% rename from api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImpl.java rename to api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerToken.java index 2362df26c..e9c315940 100644 --- a/api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImpl.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerToken.java @@ -1,4 +1,4 @@ -package io.kafbat.ui.sasl.azure.entra; +package io.kafbat.ui.config.auth.azure; import com.azure.core.credential.AccessToken; import com.nimbusds.jwt.JWTClaimsSet; @@ -11,13 +11,13 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; -public class AzureEntraOAuthBearerTokenImpl implements OAuthBearerToken { +public class AzureEntraOAuthBearerToken implements OAuthBearerToken { private final AccessToken accessToken; private final JWTClaimsSet claims; - public AzureEntraOAuthBearerTokenImpl(AccessToken accessToken) { + public AzureEntraOAuthBearerToken(AccessToken accessToken) { this.accessToken = accessToken; try { @@ -48,9 +48,7 @@ public Set scope() { // https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the // scp // claim is a String which is presented as a space separated list. - return Optional.ofNullable(claims.getClaim("scp")) - .map(s -> Arrays.stream(((String) s).split(" ")).collect(Collectors.toSet())) - .orElse(null); + return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet()); } @Override diff --git a/api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandlerTest.java b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java similarity index 99% rename from api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandlerTest.java rename to api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java index 8b63a82f3..d2e39ce75 100644 --- a/api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandlerTest.java +++ b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java @@ -1,4 +1,4 @@ -package io.kafbat.ui.sasl.azure.entra; +package io.kafbat.ui.config.auth.azure; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; diff --git a/api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImplTest.java b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java similarity index 93% rename from api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImplTest.java rename to api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java index bf52626a5..84ed3b1cd 100644 --- a/api/src/test/java/io/kafbat/ui/sasl/azure/entra/AzureEntraOAuthBearerTokenImplTest.java +++ b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java @@ -1,4 +1,4 @@ -package io.kafbat.ui.sasl.azure.entra; +package io.kafbat.ui.config.auth.azure; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -12,7 +12,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; import org.junit.jupiter.api.Test; -public class AzureEntraOAuthBearerTokenImplTest { +public class AzureEntraOAuthBearerTokenTest { // These are not real tokens. It was generated using fake values with an invalid signature, // so it is safe to store here. @@ -39,8 +39,8 @@ public class AzureEntraOAuthBearerTokenImplTest { void constructorShouldParseToken() { final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN); - final AzureEntraOAuthBearerTokenImpl azureOAuthBearerToken = - new AzureEntraOAuthBearerTokenImpl(accessToken); + final AzureEntraOAuthBearerToken azureOAuthBearerToken = + new AzureEntraOAuthBearerToken(accessToken); assertThat(azureOAuthBearerToken, is(notNullValue())); assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN)); @@ -53,7 +53,7 @@ void constructorShouldParseToken() { @Test void constructorShouldRejectInvalidToken() { - assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerTokenImpl( + assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerToken( new AccessToken("invalid", OffsetDateTime.MIN))); } }