Skip to content

Commit 3185e77

Browse files
authored
Merge branch 'main' into issues/348
2 parents 3d89a8f + 1269803 commit 3185e77

File tree

27 files changed

+559
-106
lines changed

27 files changed

+559
-106
lines changed

.github/workflows/e2e-run.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ jobs:
103103
run: |
104104
mkdir -p ./e2e-tests/target/selenoid-results/video
105105
mkdir -p ./e2e-tests/target/selenoid-results/logs
106-
docker-compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
107-
docker-compose -f ./documentation/compose/e2e-tests.yaml up -d
106+
docker compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
107+
docker compose -f ./documentation/compose/e2e-tests.yaml up -d
108108
109109
- name: Dump Docker logs on failure
110110
if: failure()

api/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@
9494
<version>2.1.0</version>
9595
</dependency>
9696

97+
<dependency>
98+
<groupId>com.azure</groupId>
99+
<artifactId>azure-identity</artifactId>
100+
<version>1.13.3</version>
101+
<exclusions>
102+
<exclusion>
103+
<groupId>io.netty</groupId>
104+
<artifactId>netty-tcnative-boringssl-static</artifactId>
105+
</exclusion>
106+
</exclusions>
107+
</dependency>
108+
97109
<dependency>
98110
<groupId>org.apache.avro</groupId>
99111
<artifactId>avro</artifactId>
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.kafbat.ui.config.auth.azure;
2+
3+
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import com.azure.core.credential.TokenCredential;
6+
import com.azure.core.credential.TokenRequestContext;
7+
import com.azure.identity.DefaultAzureCredentialBuilder;
8+
import java.net.URI;
9+
import java.time.Duration;
10+
import java.util.List;
11+
import java.util.Map;
12+
import javax.security.auth.callback.Callback;
13+
import javax.security.auth.callback.UnsupportedCallbackException;
14+
import javax.security.auth.login.AppConfigurationEntry;
15+
import lombok.extern.slf4j.Slf4j;
16+
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
17+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
18+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
19+
20+
@Slf4j
21+
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {
22+
23+
private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);
24+
25+
private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
26+
27+
private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
28+
29+
static TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
30+
31+
private TokenRequestContext tokenRequestContext;
32+
33+
@Override
34+
public void configure(Map<String, ?> configs,
35+
String mechanism,
36+
List<AppConfigurationEntry> jaasConfigEntries) {
37+
tokenRequestContext = buildTokenRequestContext(configs);
38+
}
39+
40+
private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
41+
URI uri = buildEventHubsServerUri(configs);
42+
String tokenAudience = buildTokenAudience(uri);
43+
44+
TokenRequestContext request = new TokenRequestContext();
45+
request.addScopes(tokenAudience);
46+
return request;
47+
}
48+
49+
@SuppressWarnings("unchecked")
50+
private URI buildEventHubsServerUri(Map<String, ?> configs) {
51+
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);
52+
53+
if (bootstrapServers == null) {
54+
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
55+
log.error(message);
56+
throw new IllegalArgumentException(message);
57+
}
58+
59+
if (bootstrapServers.size() > 1) {
60+
final String message =
61+
BOOTSTRAP_SERVERS_CONFIG
62+
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
63+
log.error(message);
64+
throw new IllegalArgumentException(message);
65+
}
66+
67+
return URI.create("https://" + bootstrapServers.get(0));
68+
}
69+
70+
private String buildTokenAudience(URI uri) {
71+
return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
72+
}
73+
74+
@Override
75+
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
76+
for (Callback callback : callbacks) {
77+
if (!(callback instanceof OAuthBearerTokenCallback oauthCallback)) {
78+
throw new UnsupportedCallbackException(callback);
79+
}
80+
handleOAuthCallback(oauthCallback);
81+
}
82+
}
83+
84+
private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
85+
try {
86+
final OAuthBearerToken token = tokenCredential
87+
.getToken(tokenRequestContext)
88+
.map(AzureEntraOAuthBearerToken::new)
89+
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
90+
.doOnError(e -> log.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
91+
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
92+
.block();
93+
94+
oauthCallback.token(token);
95+
} catch (RuntimeException e) {
96+
final String message =
97+
"Failed to acquire Azure token for Event Hub Authentication. "
98+
+ "Please ensure valid Azure credentials are configured.";
99+
log.error(message, e);
100+
oauthCallback.error("invalid_grant", message, null);
101+
}
102+
}
103+
104+
public void close() {
105+
// NOOP
106+
}
107+
108+
void setTokenCredential(TokenCredential tokenCredential) {
109+
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
110+
}
111+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.kafbat.ui.config.auth.azure;
2+
3+
import com.azure.core.credential.AccessToken;
4+
import com.nimbusds.jwt.JWTClaimsSet;
5+
import com.nimbusds.jwt.JWTParser;
6+
import java.text.ParseException;
7+
import java.util.Arrays;
8+
import java.util.Set;
9+
import java.util.stream.Collectors;
10+
import org.apache.kafka.common.errors.SaslAuthenticationException;
11+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
12+
13+
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {
14+
15+
private final AccessToken accessToken;
16+
private final JWTClaimsSet claims;
17+
18+
public AzureEntraOAuthBearerToken(AccessToken accessToken) {
19+
this.accessToken = accessToken;
20+
21+
try {
22+
claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
23+
} catch (ParseException exception) {
24+
throw new SaslAuthenticationException("Unable to parse the access token", exception);
25+
}
26+
}
27+
28+
@Override
29+
public String value() {
30+
return accessToken.getToken();
31+
}
32+
33+
@Override
34+
public Long startTimeMs() {
35+
return claims.getIssueTime().getTime();
36+
}
37+
38+
@Override
39+
public long lifetimeMs() {
40+
return claims.getExpirationTime().getTime();
41+
}
42+
43+
@Override
44+
public Set<String> scope() {
45+
// Referring to
46+
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
47+
// scp
48+
// claim is a String which is presented as a space separated list.
49+
return Arrays
50+
.stream(((String) claims.getClaim("scp")).split(" "))
51+
.collect(Collectors.toSet());
52+
}
53+
54+
@Override
55+
public String principalName() {
56+
return (String) claims.getClaim("upn");
57+
}
58+
59+
public boolean isExpired() {
60+
return accessToken.isExpired();
61+
}
62+
}

api/src/main/java/io/kafbat/ui/controller/AccessController.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,33 +45,34 @@ public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExch
4545
.map(SecurityContext::getAuthentication)
4646
.map(Principal::getName);
4747

48+
var builder = AuthenticationInfoDTO.builder()
49+
.rbacEnabled(accessControlService.isRbacEnabled());
50+
4851
return userName
4952
.zipWith(permissions)
50-
.map(data -> {
51-
var dto = new AuthenticationInfoDTO(accessControlService.isRbacEnabled());
52-
dto.setUserInfo(new UserInfoDTO(data.getT1(), data.getT2()));
53-
return dto;
54-
})
55-
.switchIfEmpty(Mono.just(new AuthenticationInfoDTO(accessControlService.isRbacEnabled())))
53+
.map(data -> (AuthenticationInfoDTO) builder
54+
.userInfo(new UserInfoDTO(data.getT1(), data.getT2()))
55+
.build()
56+
)
57+
.switchIfEmpty(Mono.just(builder.build()))
5658
.map(ResponseEntity::ok);
5759
}
5860

5961
private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, List<String> clusters) {
6062
return permissions
6163
.stream()
62-
.map(permission -> {
63-
UserPermissionDTO dto = new UserPermissionDTO();
64-
dto.setClusters(clusters);
65-
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
66-
dto.setValue(permission.getValue());
67-
dto.setActions(permission.getParsedActions()
68-
.stream()
69-
.map(p -> p.name().toUpperCase())
70-
.map(this::mapAction)
71-
.filter(Objects::nonNull)
72-
.toList());
73-
return dto;
74-
})
64+
.map(permission -> (UserPermissionDTO) UserPermissionDTO.builder()
65+
.clusters(clusters)
66+
.resource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()))
67+
.value(permission.getValue())
68+
.actions(permission.getParsedActions()
69+
.stream()
70+
.map(p -> p.name().toUpperCase())
71+
.map(this::mapAction)
72+
.filter(Objects::nonNull)
73+
.toList())
74+
.build()
75+
)
7576
.toList();
7677
}
7778

api/src/main/java/io/kafbat/ui/controller/KsqlController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String cluster
5353
}
5454

5555
@Override
56+
@SuppressWarnings("unchecked")
5657
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
5758
String pipeId,
5859
ServerWebExchange exchange) {

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.kafbat.ui.controller;
22

3+
import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_RUN;
4+
import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_VIEW;
35
import static io.kafbat.ui.model.rbac.permission.TopicAction.CREATE;
46
import static io.kafbat.ui.model.rbac.permission.TopicAction.DELETE;
57
import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT;
6-
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
78
import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW;
89
import static java.util.stream.Collectors.toList;
910

@@ -272,7 +273,7 @@ public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicN
272273

273274
var context = AccessContext.builder()
274275
.cluster(clusterName)
275-
.topicActions(topicName, MESSAGES_READ)
276+
.topicActions(topicName, ANALYSIS_RUN)
276277
.operationName("analyzeTopic")
277278
.build();
278279

@@ -288,7 +289,7 @@ public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String
288289
ServerWebExchange exchange) {
289290
var context = AccessContext.builder()
290291
.cluster(clusterName)
291-
.topicActions(topicName, MESSAGES_READ)
292+
.topicActions(topicName, ANALYSIS_RUN)
292293
.operationName("cancelTopicAnalysis")
293294
.build();
294295

@@ -306,7 +307,7 @@ public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterNam
306307

307308
var context = AccessContext.builder()
308309
.cluster(clusterName)
309-
.topicActions(topicName, MESSAGES_READ)
310+
.topicActions(topicName, ANALYSIS_VIEW)
310311
.operationName("getTopicAnalysis")
311312
.build();
312313

@@ -350,18 +351,12 @@ private Comparator<InternalTopic> getComparatorForTopic(
350351
if (orderBy == null) {
351352
return defaultComparator;
352353
}
353-
switch (orderBy) {
354-
case TOTAL_PARTITIONS:
355-
return Comparator.comparing(InternalTopic::getPartitionCount);
356-
case OUT_OF_SYNC_REPLICAS:
357-
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
358-
case REPLICATION_FACTOR:
359-
return Comparator.comparing(InternalTopic::getReplicationFactor);
360-
case SIZE:
361-
return Comparator.comparing(InternalTopic::getSegmentSize);
362-
case NAME:
363-
default:
364-
return defaultComparator;
365-
}
354+
return switch (orderBy) {
355+
case TOTAL_PARTITIONS -> Comparator.comparing(InternalTopic::getPartitionCount);
356+
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
357+
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
358+
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
359+
default -> defaultComparator;
360+
};
366361
}
367362
}

api/src/main/java/io/kafbat/ui/model/rbac/permission/TopicAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public enum TopicAction implements PermissibleAction {
1313
MESSAGES_READ(VIEW),
1414
MESSAGES_PRODUCE(VIEW),
1515
MESSAGES_DELETE(VIEW, EDIT),
16+
ANALYSIS_VIEW(VIEW),
17+
ANALYSIS_RUN(VIEW, ANALYSIS_VIEW),
1618

1719
;
1820

0 commit comments

Comments
 (0)