Skip to content

Commit 01a0709

Browse files
feat: Add connector-level permissions for Kafka Connect (#614)
Implements granular permission control at the individual connector level, allowing administrators to grant permissions for specific connectors rather than entire Kafka Connect instances. Changes: - Add CONNECTOR resource type and ConnectorAction enum for granular permissions - Implement hierarchical permission checking (connector-level takes precedence) - Update frontend to check connector permissions with connect-level fallback - Add comprehensive tests for connector permission scenarios - Upgrade Testcontainers to 2.0.2 for Docker Engine 29 compatibility Features: - Permission format: `connect-name/connector-name` for specific connectors - Wildcard patterns supported (e.g., `.*-connect/prod-.*`) - Backwards compatible with existing CONNECT permissions - Action hierarchy maintained (EDIT includes VIEW permission) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent f51df4c commit 01a0709

File tree

17 files changed

+1001
-158
lines changed

17 files changed

+1001
-158
lines changed

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
44
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
55
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
6-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
7-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
86

97
import io.kafbat.ui.api.KafkaConnectApi;
108
import io.kafbat.ui.model.ConnectDTO;
@@ -19,6 +17,7 @@
1917
import io.kafbat.ui.model.TaskDTO;
2018
import io.kafbat.ui.model.rbac.AccessContext;
2119
import io.kafbat.ui.model.rbac.permission.ConnectAction;
20+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
2221
import io.kafbat.ui.service.KafkaConnectService;
2322
import io.kafbat.ui.service.mcp.McpTool;
2423
import java.util.Comparator;
@@ -29,6 +28,7 @@
2928
import lombok.RequiredArgsConstructor;
3029
import lombok.extern.slf4j.Slf4j;
3130
import org.springframework.http.ResponseEntity;
31+
import org.springframework.security.access.AccessDeniedException;
3232
import org.springframework.web.bind.annotation.RestController;
3333
import org.springframework.web.server.ServerWebExchange;
3434
import reactor.core.publisher.Flux;
@@ -41,6 +41,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
4141
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
4242
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
4343
private static final String CONNECTOR_NAME = "connectorName";
44+
private static final String ACCESS_DENIED = "Access denied";
4445

4546
private final KafkaConnectService kafkaConnectService;
4647

@@ -97,34 +98,47 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
9798
String connectorName,
9899
ServerWebExchange exchange) {
99100

101+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
100102
var context = AccessContext.builder()
101103
.cluster(clusterName)
102-
.connectActions(connectName, ConnectAction.VIEW)
104+
.connectorActions(connectorResource, ConnectorAction.VIEW)
103105
.operationName("getConnector")
104106
.build();
105107

106-
return validateAccess(context).then(
107-
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
108-
.map(ResponseEntity::ok)
109-
).doOnEach(sig -> audit(context, sig));
108+
// Use hierarchical permission check
109+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.VIEW)
110+
.flatMap(hasAccess -> {
111+
if (!hasAccess) {
112+
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
113+
}
114+
return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
115+
.map(ResponseEntity::ok);
116+
})
117+
.doOnEach(sig -> audit(context, sig));
110118
}
111119

112120
@Override
113121
public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
114122
String connectorName,
115123
ServerWebExchange exchange) {
116124

125+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
117126
var context = AccessContext.builder()
118127
.cluster(clusterName)
119-
.connectActions(connectName, ConnectAction.DELETE)
128+
.connectorActions(connectorResource, ConnectorAction.DELETE)
120129
.operationName("deleteConnector")
121-
.operationParams(Map.of(CONNECTOR_NAME, connectName))
130+
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
122131
.build();
123132

124-
return validateAccess(context).then(
125-
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
126-
.map(ResponseEntity::ok)
127-
).doOnEach(sig -> audit(context, sig));
133+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.DELETE)
134+
.flatMap(hasAccess -> {
135+
if (!hasAccess) {
136+
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
137+
}
138+
return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
139+
.map(ResponseEntity::ok);
140+
})
141+
.doOnEach(sig -> audit(context, sig));
128142
}
129143

130144

@@ -182,17 +196,24 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
182196
Mono<Map<String, Object>> requestBody,
183197
ServerWebExchange exchange) {
184198

199+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
185200
var context = AccessContext.builder()
186201
.cluster(clusterName)
187-
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
202+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT)
188203
.operationName("setConnectorConfig")
189204
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
190205
.build();
191206

192-
return validateAccess(context).then(
193-
kafkaConnectService
194-
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
195-
.map(ResponseEntity::ok))
207+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
208+
ConnectorAction.VIEW, ConnectorAction.EDIT)
209+
.flatMap(hasAccess -> {
210+
if (!hasAccess) {
211+
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
212+
}
213+
return kafkaConnectService
214+
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
215+
.map(ResponseEntity::ok);
216+
})
196217
.doOnEach(sig -> audit(context, sig));
197218
}
198219

@@ -201,21 +222,25 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
201222
String connectorName,
202223
ConnectorActionDTO action,
203224
ServerWebExchange exchange) {
204-
ConnectAction[] connectActions;
205-
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE};
206-
225+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
207226
var context = AccessContext.builder()
208227
.cluster(clusterName)
209-
.connectActions(connectName, connectActions)
228+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE)
210229
.operationName("updateConnectorState")
211230
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
212231
.build();
213232

214-
return validateAccess(context).then(
215-
kafkaConnectService
216-
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
217-
.map(ResponseEntity::ok)
218-
).doOnEach(sig -> audit(context, sig));
233+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
234+
ConnectorAction.VIEW, ConnectorAction.OPERATE)
235+
.flatMap(hasAccess -> {
236+
if (!hasAccess) {
237+
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
238+
}
239+
return kafkaConnectService
240+
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
241+
.map(ResponseEntity::ok);
242+
})
243+
.doOnEach(sig -> audit(context, sig));
219244
}
220245

221246
@Override
@@ -311,17 +336,24 @@ public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, Stri
311336
String connectorName,
312337
ServerWebExchange exchange) {
313338

339+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
314340
var context = AccessContext.builder()
315341
.cluster(clusterName)
316-
.connectActions(connectName, VIEW, RESET_OFFSETS)
342+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
317343
.operationName("resetConnectorOffsets")
318344
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
319345
.build();
320346

321-
return validateAccess(context).then(
322-
kafkaConnectService
323-
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
324-
.map(ResponseEntity::ok))
347+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
348+
ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
349+
.flatMap(hasAccess -> {
350+
if (!hasAccess) {
351+
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
352+
}
353+
return kafkaConnectService
354+
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
355+
.map(ResponseEntity::ok);
356+
})
325357
.doOnEach(sig -> audit(context, sig));
326358
}
327359
}

api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
1010
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
1111
import io.kafbat.ui.model.rbac.permission.ConnectAction;
12+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
1213
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1314
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1415
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -130,6 +131,11 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti
130131
return this;
131132
}
132133

134+
public AccessContextBuilder connectorActions(String connector, ConnectorAction... actions) {
135+
accessedResources.add(new SingleResourceAccess(connector, Resource.CONNECTOR, List.of(actions)));
136+
return this;
137+
}
138+
133139
public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
134140
accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
135141
return this;

api/src/main/java/io/kafbat/ui/model/rbac/Resource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
77
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
88
import io.kafbat.ui.model.rbac.permission.ConnectAction;
9+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
910
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1011
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1112
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -36,6 +37,8 @@ public enum Resource {
3637

3738
CONNECT(ConnectAction.values(), ConnectAction.ALIASES),
3839

40+
CONNECTOR(ConnectorAction.values(), ConnectorAction.ALIASES),
41+
3942
KSQL(KsqlAction.values()),
4043

4144
ACL(AclAction.values()),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.kafbat.ui.model.rbac.permission;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import org.apache.commons.lang3.EnumUtils;
6+
import org.jetbrains.annotations.Nullable;
7+
8+
public enum ConnectorAction implements PermissibleAction {
9+
10+
VIEW,
11+
EDIT(VIEW),
12+
CREATE(VIEW),
13+
OPERATE(VIEW),
14+
DELETE(VIEW),
15+
RESET_OFFSETS(VIEW),
16+
;
17+
18+
public static final String CONNECTOR_RESOURCE_DELIMITER = "/";
19+
20+
private final ConnectorAction[] dependantActions;
21+
22+
ConnectorAction(ConnectorAction... dependantActions) {
23+
this.dependantActions = dependantActions;
24+
}
25+
26+
public static final Set<ConnectorAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, OPERATE, RESET_OFFSETS);
27+
28+
public static final Map<String, PermissibleAction> ALIASES = Map.of(
29+
"restart", OPERATE,
30+
"pause", OPERATE,
31+
"resume", OPERATE,
32+
"restart_task", OPERATE,
33+
"state_update", OPERATE
34+
);
35+
36+
@Nullable
37+
public static ConnectorAction fromString(String name) {
38+
return EnumUtils.getEnum(ConnectorAction.class, name);
39+
}
40+
41+
@Override
42+
public boolean isAlter() {
43+
return ALTER_ACTIONS.contains(this);
44+
}
45+
46+
@Override
47+
public PermissibleAction[] dependantActions() {
48+
return dependantActions;
49+
}
50+
51+
public static String buildResourcePath(String connectName, String connectorName) {
52+
return connectName + CONNECTOR_RESOURCE_DELIMITER + connectorName;
53+
}
54+
}

api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
public sealed interface PermissibleAction permits
66
AclAction, ApplicationConfigAction,
77
ConsumerGroupAction, SchemaAction,
8-
ConnectAction, ClusterConfigAction,
8+
ConnectAction, ConnectorAction, ClusterConfigAction,
99
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {
1010

1111
String name();

api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kafbat.ui.model.rbac.Role;
1313
import io.kafbat.ui.model.rbac.Subject;
1414
import io.kafbat.ui.model.rbac.permission.ConnectAction;
15+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
1516
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1617
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1718
import io.kafbat.ui.model.rbac.permission.TopicAction;
@@ -21,6 +22,7 @@
2122
import io.kafbat.ui.service.rbac.extractor.OauthAuthorityExtractor;
2223
import io.kafbat.ui.service.rbac.extractor.ProviderAuthorityExtractor;
2324
import jakarta.annotation.PostConstruct;
25+
import java.util.Arrays;
2426
import java.util.Collections;
2527
import java.util.List;
2628
import java.util.Objects;
@@ -206,6 +208,63 @@ public Mono<Boolean> isConnectAccessible(String connectName, String clusterName)
206208
);
207209
}
208210

211+
public Mono<Boolean> isConnectorAccessible(String connectName, String connectorName,
212+
String clusterName, ConnectorAction... actions) {
213+
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
214+
215+
// First check for specific connector permissions
216+
var connectorContext = AccessContext.builder()
217+
.cluster(clusterName)
218+
.connectorActions(connectorResource, actions)
219+
.build();
220+
221+
return isAccessible(connectorContext)
222+
.flatMap(hasConnectorAccess -> {
223+
if (hasConnectorAccess) {
224+
return Mono.just(true);
225+
}
226+
227+
// Fall back to checking connect-level permissions
228+
// Map connector actions to corresponding connect actions
229+
ConnectAction[] connectActions = mapToConnectActions(actions);
230+
if (connectActions.length == 0) {
231+
return Mono.just(false);
232+
}
233+
234+
var connectContext = AccessContext.builder()
235+
.cluster(clusterName)
236+
.connectActions(connectName, connectActions)
237+
.build();
238+
239+
return isAccessible(connectContext);
240+
});
241+
}
242+
243+
private ConnectAction[] mapToConnectActions(ConnectorAction[] connectorActions) {
244+
return Arrays.stream(connectorActions)
245+
.map(action -> {
246+
switch (action) {
247+
case VIEW:
248+
return ConnectAction.VIEW;
249+
case EDIT:
250+
return ConnectAction.EDIT;
251+
case CREATE:
252+
return ConnectAction.CREATE;
253+
case DELETE:
254+
return ConnectAction.DELETE;
255+
case OPERATE:
256+
return ConnectAction.OPERATE;
257+
case RESET_OFFSETS:
258+
return ConnectAction.RESET_OFFSETS;
259+
default:
260+
return null;
261+
}
262+
})
263+
.filter(Objects::nonNull)
264+
.distinct()
265+
.toArray(ConnectAction[]::new);
266+
}
267+
209268
public List<Role> getRoles() {
210269
if (!rbacEnabled) {
211270
return Collections.emptyList();

0 commit comments

Comments
 (0)