Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.

Commit 3324341

Browse files
author
tnewman-at-gm
authored
Merge pull request #1 from tnewman-at-gm/feature/entra-auth
FE/BE Added Azure Entra Authentication for Event Hub
2 parents 8c70126 + d8aaddd commit 3324341

File tree

9 files changed

+424
-0
lines changed

9 files changed

+424
-0
lines changed

api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@
9494
<version>2.1.0</version>
9595
</dependency>
9696

97+
<dependency>
98+
<groupId>com.azure</groupId>
99+
<artifactId>azure-identity</artifactId>
100+
<version>1.13.0</version>
101+
</dependency>
102+
97103
<dependency>
98104
<groupId>org.apache.avro</groupId>
99105
<artifactId>avro</artifactId>
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import com.azure.core.credential.AccessToken;
6+
import com.azure.core.credential.TokenCredential;
7+
import com.azure.core.credential.TokenRequestContext;
8+
import com.azure.identity.DefaultAzureCredentialBuilder;
9+
import java.net.URI;
10+
import java.time.Duration;
11+
import java.util.List;
12+
import java.util.Map;
13+
import javax.security.auth.callback.Callback;
14+
import javax.security.auth.callback.UnsupportedCallbackException;
15+
import javax.security.auth.login.AppConfigurationEntry;
16+
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
17+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
18+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class);
25+
26+
private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);
27+
28+
private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
29+
30+
private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
31+
32+
static TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
33+
34+
private TokenRequestContext tokenRequestContext;
35+
36+
@Override
37+
public void configure(
38+
Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
39+
tokenRequestContext = buildTokenRequestContext(configs);
40+
}
41+
42+
private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
43+
URI uri = buildEventHubsServerUri(configs);
44+
String tokenAudience = buildTokenAudience(uri);
45+
46+
TokenRequestContext request = new TokenRequestContext();
47+
request.addScopes(tokenAudience);
48+
return request;
49+
}
50+
51+
private URI buildEventHubsServerUri(Map<String, ?> configs) {
52+
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);
53+
54+
if (null == bootstrapServers) {
55+
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
56+
LOGGER.error(message);
57+
throw new IllegalArgumentException(message);
58+
}
59+
60+
if (bootstrapServers.size() != 1) {
61+
final String message =
62+
BOOTSTRAP_SERVERS_CONFIG
63+
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
64+
LOGGER.error(message);
65+
throw new IllegalArgumentException(message);
66+
}
67+
68+
return URI.create("https://" + bootstrapServers.get(0));
69+
}
70+
71+
private String buildTokenAudience(URI uri) {
72+
return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
73+
}
74+
75+
@Override
76+
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
77+
for (Callback callback : callbacks) {
78+
if (callback instanceof OAuthBearerTokenCallback oauthCallback) {
79+
handleOAuthCallback(oauthCallback);
80+
} else {
81+
throw new UnsupportedCallbackException(callback);
82+
}
83+
}
84+
}
85+
86+
private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
87+
try {
88+
final OAuthBearerToken token = tokenCredential
89+
.getToken(tokenRequestContext)
90+
.map(AzureEntraOAuthBearerTokenImpl::new)
91+
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
92+
.doOnError(e -> LOGGER.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
93+
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
94+
.block();
95+
96+
oauthCallback.token(token);
97+
} catch (final RuntimeException e) {
98+
final String message =
99+
"Failed to acquire Azure token for Event Hub Authentication. "
100+
+ "Please ensure valid Azure credentials are configured.";
101+
LOGGER.error(message, e);
102+
oauthCallback.error("invalid_grant", message, null);
103+
}
104+
}
105+
106+
public void close() {
107+
// NOOP
108+
}
109+
110+
void setTokenCredential(final TokenCredential tokenCredential) {
111+
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
112+
}
113+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import com.azure.core.credential.AccessToken;
4+
import com.nimbusds.jwt.JWTClaimsSet;
5+
import com.nimbusds.jwt.JWTParser;
6+
import java.text.ParseException;
7+
import java.util.Arrays;
8+
import java.util.Optional;
9+
import java.util.Set;
10+
import java.util.stream.Collectors;
11+
import org.apache.kafka.common.errors.SaslAuthenticationException;
12+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
13+
14+
public class AzureEntraOAuthBearerTokenImpl implements OAuthBearerToken {
15+
16+
private final AccessToken accessToken;
17+
18+
private final JWTClaimsSet claims;
19+
20+
public AzureEntraOAuthBearerTokenImpl(AccessToken accessToken) {
21+
this.accessToken = accessToken;
22+
23+
try {
24+
claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
25+
} catch (ParseException exception) {
26+
throw new SaslAuthenticationException("Unable to parse the access token", exception);
27+
}
28+
}
29+
30+
@Override
31+
public String value() {
32+
return accessToken.getToken();
33+
}
34+
35+
@Override
36+
public Long startTimeMs() {
37+
return claims.getIssueTime().getTime();
38+
}
39+
40+
@Override
41+
public long lifetimeMs() {
42+
return claims.getExpirationTime().getTime();
43+
}
44+
45+
@Override
46+
public Set<String> scope() {
47+
// Referring to
48+
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
49+
// scp
50+
// claim is a String which is presented as a space separated list.
51+
return Optional.ofNullable(claims.getClaim("scp"))
52+
.map(s -> Arrays.stream(((String) s).split(" ")).collect(Collectors.toSet()))
53+
.orElse(null);
54+
}
55+
56+
@Override
57+
public String principalName() {
58+
return (String) claims.getClaim("upn");
59+
}
60+
61+
public boolean isExpired() {
62+
return accessToken.isExpired();
63+
}
64+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import static org.hamcrest.CoreMatchers.is;
4+
import static org.hamcrest.CoreMatchers.notNullValue;
5+
import static org.hamcrest.CoreMatchers.nullValue;
6+
import static org.hamcrest.MatcherAssert.assertThat;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertFalse;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
import static org.mockito.ArgumentMatchers.any;
11+
import static org.mockito.ArgumentMatchers.anyString;
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.times;
14+
import static org.mockito.Mockito.verify;
15+
import static org.mockito.Mockito.when;
16+
17+
import com.azure.core.credential.AccessToken;
18+
import com.azure.core.credential.TokenCredential;
19+
import com.azure.core.credential.TokenRequestContext;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import javax.security.auth.callback.Callback;
24+
import javax.security.auth.callback.UnsupportedCallbackException;
25+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
26+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.mockito.ArgumentCaptor;
31+
import org.mockito.Mock;
32+
import org.mockito.junit.jupiter.MockitoExtension;
33+
import reactor.core.publisher.Mono;
34+
35+
@ExtendWith(MockitoExtension.class)
36+
public class AzureEntraLoginCallbackHandlerTest {
37+
38+
// These are not real tokens. It was generated using fake values with an invalid signature,
39+
// so it is safe to store here.
40+
private static final String VALID_SAMPLE_TOKEN =
41+
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IjlHbW55RlBraGMzaE91UjIybXZTdmduTG83WSIsImtpZCI6IjlHbW55"
42+
+ "RlBraGMzaE91UjIybXZTdmduTG83WSJ9.eyJhdWQiOiJodHRwczovL3NhbXBsZS5zZXJ2aWNlYnVzLndpbmRvd3MubmV0IiwiaX"
43+
+ "NzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvc2FtcGxlLyIsImlhdCI6MTY5ODQxNTkxMiwibmJmIjoxNjk4NDE1OTEzLCJleH"
44+
+ "AiOjE2OTg0MTU5MTQsImFjciI6IjEiLCJhaW8iOiJzYW1wbGUtYWlvIiwiYW1yIjpbXSwiYXBwaWQiOiJzYW1wbGUtYXBwLWlkIi"
45+
+ "wiYXBwaWRhY3IiOiIwIiwiZmFtaWx5X25hbWUiOiJTYW1wbGUiLCJnaXZlbl9uYW1lIjoiU2FtcGxlIiwiZ3JvdXBzIjpbXSwiaX"
46+
+ "BhZGRyIjoiMTI3LjAuMC4xIiwibmFtZSI6IlNhbXBsZSBOYW1lIiwib2lkIjoic2FtcGxlLW9pZCIsIm9ucHJlbV9zaWQiOiJzYW"
47+
+ "1wbGUtb25wcmVtX3NpZCIsInB1aWQiOiJzYW1wbGUtcHVpZCIsInJoIjoic2FtcGxlLXJoIiwic2NwIjoiZXZlbnRfaHViIHN0b3"
48+
+ "JhZ2VfYWNjb3VudCIsInN1YiI6IlNhbXBsZSBTdWJqZWN0IiwidGlkIjoic2FtcGxlLXRpZCIsInVuaXF1ZV9uYW1lIjoic2FtcG"
49+
+ "xlQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJzYW1wbGVAbWljcm9zb2Z0LmNvbSIsInV0aSI6InNhbXBsZS11dGkiLCJ2ZXIiOiIxLj"
50+
+ "AiLCJ3aWRzIjpbXX0.DC_guYOsDlRc5GsXE39dn_zlBX54_Y8_mDTLXLgienl9dPMX5RE2X1QXGXA9ukZtptMzP_0wcoqDDjNrys"
51+
+ "GrNhztyeOr0YSeMMFq2NQ5vMBzLapwONwsnv55Hn0jOje9cqnMf43z1LHI6q6-rIIRz-SiTuoYUgOTxzFftpt-7FSqLjQpYEH7bL"
52+
+ "p-0yIU_aJUSb5HQTJbtYYOb54hsZ6VXpaiZ013qGtKODbHTG37kdoIw2MPn66CxanLZKeZM31IVxC-duAqxDgK4O2Ne6xRZRIPW1"
53+
+ "yt61QnZutWTJ4bAyhmplym3OWZ369cyiSJek0uyS5tibXeCYG4Kk8UQSFcsyfwgOsD0xvvcXcLexcUcEekoNBj6ixDhWssFzhC8T"
54+
+ "Npy8-QKNe_Tp6qHzJdI6OV71jpDkGvcmseLHC9GOxBWB0IdYbePTFK-rz2dkN3uMUiFwQJvEbORsq1IaQXj2esT0F7sMfqzWQF9h"
55+
+ "koVy4mJg_auvrZlnQkNPdLHfCacU33ZPwtuSS6b-0XolbxZ5DlJ4p1OJPeHl2xsi61qiHuCBsmnkLNtHmyxNTXGs7xc4dEQokaCK"
56+
+ "-FB_lzC3D4mkJMxKWopQGXnQtizaZjyclGpiUFs3mEauxC7RpsbanitxPFs7FK3mY0MQJk9JNVi1oM-8qfEp8nYT2DwFBhLcIp2z"
57+
+ "Q";
58+
59+
@Mock
60+
private OAuthBearerTokenCallback oauthBearerTokenCallBack;
61+
62+
@Mock
63+
private OAuthBearerToken oauthBearerToken;
64+
65+
@Mock
66+
private TokenCredential tokenCredential;
67+
68+
@Mock
69+
private AccessToken accessToken;
70+
71+
private AzureEntraLoginCallbackHandler azureEntraLoginCallbackHandler;
72+
73+
@BeforeEach
74+
public void beforeEach() {
75+
azureEntraLoginCallbackHandler = new AzureEntraLoginCallbackHandler();
76+
azureEntraLoginCallbackHandler.setTokenCredential(tokenCredential);
77+
}
78+
79+
@Test
80+
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
81+
throws UnsupportedCallbackException {
82+
final Map<String, Object> configs = new HashMap<>();
83+
configs.put(
84+
"bootstrap.servers",
85+
List.of("test-eh.servicebus.windows.net:9093"));
86+
87+
when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken));
88+
when(accessToken.getToken()).thenReturn(VALID_SAMPLE_TOKEN);
89+
90+
azureEntraLoginCallbackHandler.configure(configs, null, null);
91+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
92+
93+
final ArgumentCaptor<TokenRequestContext> contextCaptor =
94+
ArgumentCaptor.forClass(TokenRequestContext.class);
95+
final ArgumentCaptor<OAuthBearerToken> tokenCaptor =
96+
ArgumentCaptor.forClass(OAuthBearerToken.class);
97+
98+
verify(tokenCredential, times(1)).getToken(contextCaptor.capture());
99+
verify(oauthBearerTokenCallBack, times(0)).error(anyString(), anyString(), anyString());
100+
verify(oauthBearerTokenCallBack, times(1)).token(tokenCaptor.capture());
101+
102+
final TokenRequestContext tokenRequestContext = contextCaptor.getValue();
103+
assertThat(tokenRequestContext, is(notNullValue()));
104+
assertThat(
105+
tokenRequestContext.getScopes(),
106+
is(List.of("https://test-eh.servicebus.windows.net/.default")));
107+
assertThat(tokenRequestContext.getClaims(), is(nullValue()));
108+
assertThat(tokenRequestContext.getTenantId(), is(nullValue()));
109+
assertFalse(tokenRequestContext.isCaeEnabled());
110+
111+
assertThat(tokenCaptor.getValue(), is(notNullValue()));
112+
assertEquals(VALID_SAMPLE_TOKEN, tokenCaptor.getValue().value());
113+
}
114+
115+
@Test
116+
public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException {
117+
final Map<String, Object> configs = new HashMap<>();
118+
configs.put(
119+
"bootstrap.servers",
120+
List.of("test-eh.servicebus.windows.net:9093"));
121+
122+
when(tokenCredential.getToken(any(TokenRequestContext.class)))
123+
.thenThrow(new RuntimeException("failed to acquire token"));
124+
125+
azureEntraLoginCallbackHandler.configure(configs, null, null);
126+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
127+
128+
verify(oauthBearerTokenCallBack, times(1))
129+
.error(
130+
"invalid_grant",
131+
"Failed to acquire Azure token for Event Hub Authentication. "
132+
+ "Please ensure valid Azure credentials are configured.",
133+
null);
134+
verify(oauthBearerTokenCallBack, times(0)).token(any());
135+
}
136+
137+
@Test
138+
public void shouldThrowExceptionWithNullBootstrapServers() {
139+
final Map<String, Object> configs = new HashMap<>();
140+
141+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
142+
configs, null, null));
143+
}
144+
145+
@Test
146+
public void shouldThrowExceptionWithMultipleBootstrapServers() {
147+
final Map<String, Object> configs = new HashMap<>();
148+
configs.put("bootstrap.servers", List.of("server1", "server2"));
149+
150+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
151+
configs, null, null));
152+
}
153+
154+
@Test
155+
public void shouldThrowExceptionWithUnsupportedCallback() {
156+
assertThrows(UnsupportedCallbackException.class, () -> azureEntraLoginCallbackHandler.handle(
157+
new Callback[] {mock(Callback.class)}));
158+
}
159+
160+
@Test
161+
public void shouldDoNothingOnClose() {
162+
azureEntraLoginCallbackHandler.close();
163+
}
164+
165+
@Test
166+
public void shouldSupportDefaultConstructor() {
167+
new AzureEntraLoginCallbackHandler();
168+
}
169+
}

0 commit comments

Comments
 (0)