Skip to content

Commit 41022ef

Browse files
authored
Merge branch 'main' into issues/631
2 parents 5f158cf + 3c93cb0 commit 41022ef

File tree

65 files changed

+3546
-274
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3546
-274
lines changed

.github/workflows/cve_checks.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ jobs:
7171
uses: aquasecurity/trivy-action@dc5a429b52fcf669ce959baa2c2dd26090d2a6c4 # https://github.com/aquasecurity/trivy-action/releases/tag/0.33.1
7272
with:
7373
image-ref: "ghcr.io/kafbat/kafka-ui:latest"
74+
severity: "CRITICAL,HIGH"
7475
format: "table"
7576
exit-code: "1"
7677

api/build.gradle

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@ plugins {
77
alias(libs.plugins.spring.dependency.management)
88
}
99

10+
configurations.all {
11+
resolutionStrategy {
12+
capabilitiesResolution {
13+
withCapability("org.lz4:lz4-java") {
14+
select(libs.lz4.yawk.get().toString())
15+
}
16+
}
17+
}
18+
}
19+
1020

1121
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
1222

@@ -33,7 +43,10 @@ dependencies {
3343

3444
implementation libs.spring.security.ldap
3545

36-
implementation libs.kafka.clients
46+
implementation (libs.kafka.clients) {
47+
// TODO: Remove once client would fix CVE
48+
exclude group: "org.lz4", module: "lz4-java"
49+
}
3750

3851
implementation libs.apache.avro
3952
implementation libs.apache.commons
@@ -77,10 +90,12 @@ dependencies {
7790
// CVE Fixes
7891
implementation libs.apache.commons.compress
7992
implementation libs.okhttp3.logging.intercepter
93+
implementation libs.lz4.yawk
8094

8195
implementation libs.modelcontextprotocol.spring.webflux
8296
implementation libs.victools.jsonschema.generator
8397

98+
8499
// Google Managed Service for Kafka IAM support
85100
implementation (libs.google.managed.kafka.login.handler) {
86101
exclude group: 'com.google.oauth-client', module: 'google-oauth-client'

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kafbat.ui.model.ConsumerGroupDetailsDTO;
1313
import io.kafbat.ui.model.ConsumerGroupOffsetsResetDTO;
1414
import io.kafbat.ui.model.ConsumerGroupOrderingDTO;
15+
import io.kafbat.ui.model.ConsumerGroupsLagResponseDTO;
1516
import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO;
1617
import io.kafbat.ui.model.PartitionOffsetDTO;
1718
import io.kafbat.ui.model.SortOrderDTO;
@@ -20,10 +21,13 @@
2021
import io.kafbat.ui.service.ConsumerGroupService;
2122
import io.kafbat.ui.service.OffsetsResetService;
2223
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.time.Instant;
25+
import java.util.List;
2326
import java.util.Map;
2427
import java.util.Optional;
2528
import java.util.OptionalInt;
2629
import java.util.function.Supplier;
30+
import java.util.stream.Collectors;
2731
import lombok.RequiredArgsConstructor;
2832
import lombok.extern.slf4j.Slf4j;
2933
import org.springframework.beans.factory.annotation.Value;
@@ -33,6 +37,7 @@
3337
import org.springframework.web.server.ServerWebExchange;
3438
import reactor.core.publisher.Flux;
3539
import reactor.core.publisher.Mono;
40+
import reactor.util.function.Tuples;
3641

3742
@RestController
3843
@RequiredArgsConstructor
@@ -98,6 +103,39 @@ public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clu
98103

99104

100105

106+
@Override
107+
public Mono<ResponseEntity<ConsumerGroupsLagResponseDTO>> getConsumerGroupsLag(String clusterName,
108+
List<String> groupNames,
109+
Long lastUpdate,
110+
ServerWebExchange exchange) {
111+
112+
var context = AccessContext.builder()
113+
.cluster(clusterName)
114+
.operationName("getConsumerGroupsLag")
115+
.build();
116+
117+
Mono<ResponseEntity<ConsumerGroupsLagResponseDTO>> result =
118+
consumerGroupService.getConsumerGroupsLag(getCluster(clusterName), groupNames, Optional.ofNullable(lastUpdate))
119+
.flatMap(t ->
120+
Flux.fromIterable(t.getT1().entrySet())
121+
.filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getKey(), clusterName))
122+
.collectList()
123+
.map(l -> l.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
124+
.map(l -> Tuples.of(t.getT2(), l))
125+
)
126+
.map(t ->
127+
new ConsumerGroupsLagResponseDTO(
128+
t.getT1().orElse(0L), t.getT2()
129+
)
130+
)
131+
.map(ResponseEntity::ok)
132+
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
133+
134+
return validateAccess(context)
135+
.then(result)
136+
.doOnEach(sig -> audit(context, sig));
137+
}
138+
101139
@Override
102140
public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
103141
String topicName,

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
44
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
55
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
6-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
7-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
86

97
import io.kafbat.ui.api.KafkaConnectApi;
108
import io.kafbat.ui.model.ConnectDTO;
@@ -20,6 +18,7 @@
2018
import io.kafbat.ui.model.TopicsResponseDTO;
2119
import io.kafbat.ui.model.rbac.AccessContext;
2220
import io.kafbat.ui.model.rbac.permission.ConnectAction;
21+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
2322
import io.kafbat.ui.service.KafkaConnectService;
2423
import io.kafbat.ui.service.mcp.McpTool;
2524
import java.util.Comparator;
@@ -107,7 +106,7 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
107106

108107
var context = AccessContext.builder()
109108
.cluster(clusterName)
110-
.connectActions(connectName, ConnectAction.VIEW)
109+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW)
111110
.operationName("getConnector")
112111
.build();
113112

@@ -124,9 +123,8 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con
124123

125124
var context = AccessContext.builder()
126125
.cluster(clusterName)
127-
.connectActions(connectName, ConnectAction.DELETE)
126+
.connectorActions(connectName, connectorName, ConnectorAction.DELETE)
128127
.operationName("deleteConnector")
129-
.operationParams(Map.of(CONNECTOR_NAME, connectName))
130128
.build();
131129

132130
return validateAccess(context).then(
@@ -182,7 +180,7 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust
182180

183181
var context = AccessContext.builder()
184182
.cluster(clusterName)
185-
.connectActions(connectName, ConnectAction.VIEW)
183+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW)
186184
.operationName("getConnectorConfig")
187185
.build();
188186

@@ -201,9 +199,8 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
201199

202200
var context = AccessContext.builder()
203201
.cluster(clusterName)
204-
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
202+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW, ConnectorAction.EDIT)
205203
.operationName("setConnectorConfig")
206-
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
207204
.build();
208205

209206
return validateAccess(context).then(
@@ -218,14 +215,10 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
218215
String connectorName,
219216
ConnectorActionDTO action,
220217
ServerWebExchange exchange) {
221-
ConnectAction[] connectActions;
222-
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE};
223-
224218
var context = AccessContext.builder()
225219
.cluster(clusterName)
226-
.connectActions(connectName, connectActions)
220+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW, ConnectorAction.OPERATE)
227221
.operationName("updateConnectorState")
228-
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
229222
.build();
230223

231224
return validateAccess(context).then(
@@ -242,9 +235,8 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
242235
ServerWebExchange exchange) {
243236
var context = AccessContext.builder()
244237
.cluster(clusterName)
245-
.connectActions(connectName, ConnectAction.VIEW)
238+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW)
246239
.operationName("getConnectorTasks")
247-
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
248240
.build();
249241

250242
return validateAccess(context).thenReturn(
@@ -261,9 +253,8 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin
261253

262254
var context = AccessContext.builder()
263255
.cluster(clusterName)
264-
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.OPERATE)
256+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW, ConnectorAction.OPERATE)
265257
.operationName("restartConnectorTask")
266-
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
267258
.build();
268259

269260
return validateAccess(context).then(
@@ -324,15 +315,16 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
324315
}
325316

326317
@Override
327-
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
318+
public Mono<ResponseEntity<Void>> resetConnectorOffsets(
319+
String clusterName,
320+
String connectName,
328321
String connectorName,
329322
ServerWebExchange exchange) {
330323

331324
var context = AccessContext.builder()
332325
.cluster(clusterName)
333-
.connectActions(connectName, VIEW, RESET_OFFSETS)
326+
.connectorActions(connectName, connectorName, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
334327
.operationName("resetConnectorOffsets")
335-
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
336328
.build();
337329

338330
return validateAccess(context).then(

api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
package io.kafbat.ui.mapper;
22

3+
import io.kafbat.ui.api.model.ConsumerGroupLag;
4+
import io.kafbat.ui.api.model.ConsumerGroupState;
35
import io.kafbat.ui.model.BrokerDTO;
46
import io.kafbat.ui.model.ConsumerGroupDTO;
57
import io.kafbat.ui.model.ConsumerGroupDetailsDTO;
8+
import io.kafbat.ui.model.ConsumerGroupLagDTO;
69
import io.kafbat.ui.model.ConsumerGroupStateDTO;
710
import io.kafbat.ui.model.ConsumerGroupTopicPartitionDTO;
811
import io.kafbat.ui.model.InternalConsumerGroup;
912
import io.kafbat.ui.model.InternalTopicConsumerGroup;
13+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
14+
import java.time.Instant;
1015
import java.util.ArrayList;
1116
import java.util.HashMap;
17+
import java.util.List;
1218
import java.util.Map;
1319
import java.util.Optional;
20+
import java.util.Set;
21+
import java.util.stream.Collectors;
22+
import java.util.stream.Stream;
1423
import org.apache.kafka.common.Node;
1524
import org.apache.kafka.common.TopicPartition;
1625

api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.kafbat.ui.model;
22

3+
import static io.kafbat.ui.util.ConsumerGroupUtil.calculateConsumerLag;
4+
35
import java.util.Collection;
46
import java.util.Map;
57
import java.util.Optional;
@@ -56,22 +58,6 @@ public static InternalConsumerGroup create(
5658
return builder.build();
5759
}
5860

59-
private static Long calculateConsumerLag(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> endOffsets) {
60-
Long consumerLag = null;
61-
// consumerLag should be undefined if no committed offsets found for topic
62-
if (!offsets.isEmpty()) {
63-
consumerLag = offsets.entrySet().stream()
64-
.mapToLong(e ->
65-
Optional.ofNullable(endOffsets)
66-
.map(o -> o.get(e.getKey()))
67-
.map(o -> o - e.getValue())
68-
.orElse(0L)
69-
).sum();
70-
}
71-
72-
return consumerLag;
73-
}
74-
7561
private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
7662

7763
return (int) Stream.concat(

api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
1010
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
1111
import io.kafbat.ui.model.rbac.permission.ConnectAction;
12+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
1213
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1314
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1415
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -19,7 +20,9 @@
1920
import java.util.Collection;
2021
import java.util.List;
2122
import java.util.Map;
23+
import java.util.Optional;
2224
import java.util.stream.Collectors;
25+
import java.util.stream.Stream;
2326
import org.springframework.security.access.AccessDeniedException;
2427

2528
public record AccessContext(String cluster,
@@ -34,26 +37,33 @@ public interface ResourceAccess {
3437

3538
Resource resourceType();
3639

37-
Collection<PermissibleAction> requestedActions();
40+
Collection<? extends PermissibleAction> requestedActions();
3841

3942
boolean isAccessible(List<Permission> userPermissions);
43+
44+
@Nullable
45+
ResourceAccess fallback();
4046
}
4147

4248
record SingleResourceAccess(@Nullable String name,
4349
Resource resourceType,
44-
Collection<PermissibleAction> requestedActions) implements ResourceAccess {
50+
Collection<? extends PermissibleAction> requestedActions,
51+
@Nullable ResourceAccess fallback) implements ResourceAccess {
4552

46-
SingleResourceAccess(@Nullable String name,
47-
Resource resourceType,
48-
Collection<PermissibleAction> requestedActions) {
53+
SingleResourceAccess {
4954
Preconditions.checkArgument(!requestedActions.isEmpty(), "actions not present");
50-
this.name = name;
51-
this.resourceType = resourceType;
52-
this.requestedActions = requestedActions;
5355
}
5456

55-
SingleResourceAccess(Resource type, List<PermissibleAction> requestedActions) {
56-
this(null, type, requestedActions);
57+
SingleResourceAccess(@Nullable String name, Resource type, List<? extends PermissibleAction> requestedActions) {
58+
this(name, type, requestedActions, null);
59+
}
60+
61+
SingleResourceAccess(Resource type, List<? extends PermissibleAction> requestedActions, ResourceAccess fallback) {
62+
this(null, type, requestedActions, fallback);
63+
}
64+
65+
SingleResourceAccess(Resource type, List<? extends PermissibleAction> requestedActions) {
66+
this(null, type, requestedActions, null);
5767
}
5868

5969
@Override
@@ -77,7 +87,8 @@ public boolean isAccessible(List<Permission> userPermissions) throws AccessDenie
7787
.flatMap(p -> p.getParsedActions().stream())
7888
.collect(Collectors.toSet());
7989

80-
return allowedActions.containsAll(requestedActions);
90+
return allowedActions.containsAll(requestedActions)
91+
|| Optional.ofNullable(fallback).map(e -> e.isAccessible(userPermissions)).orElse(false);
8192
}
8293
}
8394

@@ -130,6 +141,18 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti
130141
return this;
131142
}
132143

144+
public AccessContextBuilder connectorActions(String connect, String connector, ConnectorAction... actions) {
145+
accessedResources.add(
146+
new SingleResourceAccess(String.join("/", connect, connector), Resource.CONNECTOR, List.of(actions),
147+
new SingleResourceAccess(
148+
connect, Resource.CONNECT,
149+
Stream.of(actions).map(ConnectorAction::getConnectAction).toList()
150+
)
151+
)
152+
);
153+
return this;
154+
}
155+
133156
public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
134157
accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
135158
return this;

api/src/main/java/io/kafbat/ui/model/rbac/Resource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
77
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
88
import io.kafbat.ui.model.rbac.permission.ConnectAction;
9+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
910
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1011
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1112
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -36,6 +37,8 @@ public enum Resource {
3637

3738
CONNECT(ConnectAction.values(), ConnectAction.ALIASES),
3839

40+
CONNECTOR(ConnectorAction.values()),
41+
3942
KSQL(KsqlAction.values()),
4043

4144
ACL(AclAction.values()),

0 commit comments

Comments
 (0)