Skip to content

Commit 922d1e4

Browse files
feat: Add connector-level permissions for Kafka Connect (#614)
Implements granular permission control at the individual connector level, allowing administrators to grant permissions for specific connectors rather than entire Kafka Connect instances. Changes: - Add CONNECTOR resource type and ConnectorAction enum for granular permissions - Implement hierarchical permission checking (connector-level takes precedence) - Update frontend to check connector permissions with connect-level fallback - Add comprehensive tests for connector permission scenarios - Upgrade Testcontainers to 2.0.2 for Docker Engine 29 compatibility Features: - Permission format: `connect-name/connector-name` for specific connectors - Wildcard patterns supported (e.g., `.*-connect/prod-.*`) - Backwards compatible with existing CONNECT permissions - Action hierarchy maintained (EDIT includes VIEW permission) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent f51df4c commit 922d1e4

File tree

17 files changed

+994
-158
lines changed

17 files changed

+994
-158
lines changed

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
44
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
55
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
6-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
7-
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
86

97
import io.kafbat.ui.api.KafkaConnectApi;
108
import io.kafbat.ui.model.ConnectDTO;
@@ -19,6 +17,7 @@
1917
import io.kafbat.ui.model.TaskDTO;
2018
import io.kafbat.ui.model.rbac.AccessContext;
2119
import io.kafbat.ui.model.rbac.permission.ConnectAction;
20+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
2221
import io.kafbat.ui.service.KafkaConnectService;
2322
import io.kafbat.ui.service.mcp.McpTool;
2423
import java.util.Comparator;
@@ -29,6 +28,7 @@
2928
import lombok.RequiredArgsConstructor;
3029
import lombok.extern.slf4j.Slf4j;
3130
import org.springframework.http.ResponseEntity;
31+
import org.springframework.security.access.AccessDeniedException;
3232
import org.springframework.web.bind.annotation.RestController;
3333
import org.springframework.web.server.ServerWebExchange;
3434
import reactor.core.publisher.Flux;
@@ -97,34 +97,47 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
9797
String connectorName,
9898
ServerWebExchange exchange) {
9999

100+
String connectorResource = connectName + "/" + connectorName;
100101
var context = AccessContext.builder()
101102
.cluster(clusterName)
102-
.connectActions(connectName, ConnectAction.VIEW)
103+
.connectorActions(connectorResource, ConnectorAction.VIEW)
103104
.operationName("getConnector")
104105
.build();
105106

106-
return validateAccess(context).then(
107-
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
108-
.map(ResponseEntity::ok)
109-
).doOnEach(sig -> audit(context, sig));
107+
// Use hierarchical permission check
108+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.VIEW)
109+
.flatMap(hasAccess -> {
110+
if (!hasAccess) {
111+
return Mono.error(new AccessDeniedException("Access denied"));
112+
}
113+
return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
114+
.map(ResponseEntity::ok);
115+
})
116+
.doOnEach(sig -> audit(context, sig));
110117
}
111118

112119
@Override
113120
public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
114121
String connectorName,
115122
ServerWebExchange exchange) {
116123

124+
String connectorResource = connectName + "/" + connectorName;
117125
var context = AccessContext.builder()
118126
.cluster(clusterName)
119-
.connectActions(connectName, ConnectAction.DELETE)
127+
.connectorActions(connectorResource, ConnectorAction.DELETE)
120128
.operationName("deleteConnector")
121-
.operationParams(Map.of(CONNECTOR_NAME, connectName))
129+
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
122130
.build();
123131

124-
return validateAccess(context).then(
125-
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
126-
.map(ResponseEntity::ok)
127-
).doOnEach(sig -> audit(context, sig));
132+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.DELETE)
133+
.flatMap(hasAccess -> {
134+
if (!hasAccess) {
135+
return Mono.error(new AccessDeniedException("Access denied"));
136+
}
137+
return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
138+
.map(ResponseEntity::ok);
139+
})
140+
.doOnEach(sig -> audit(context, sig));
128141
}
129142

130143

@@ -182,17 +195,24 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
182195
Mono<Map<String, Object>> requestBody,
183196
ServerWebExchange exchange) {
184197

198+
String connectorResource = connectName + "/" + connectorName;
185199
var context = AccessContext.builder()
186200
.cluster(clusterName)
187-
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
201+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT)
188202
.operationName("setConnectorConfig")
189203
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
190204
.build();
191205

192-
return validateAccess(context).then(
193-
kafkaConnectService
194-
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
195-
.map(ResponseEntity::ok))
206+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
207+
ConnectorAction.VIEW, ConnectorAction.EDIT)
208+
.flatMap(hasAccess -> {
209+
if (!hasAccess) {
210+
return Mono.error(new AccessDeniedException("Access denied"));
211+
}
212+
return kafkaConnectService
213+
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
214+
.map(ResponseEntity::ok);
215+
})
196216
.doOnEach(sig -> audit(context, sig));
197217
}
198218

@@ -201,21 +221,25 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
201221
String connectorName,
202222
ConnectorActionDTO action,
203223
ServerWebExchange exchange) {
204-
ConnectAction[] connectActions;
205-
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE};
206-
224+
String connectorResource = connectName + "/" + connectorName;
207225
var context = AccessContext.builder()
208226
.cluster(clusterName)
209-
.connectActions(connectName, connectActions)
227+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE)
210228
.operationName("updateConnectorState")
211229
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
212230
.build();
213231

214-
return validateAccess(context).then(
215-
kafkaConnectService
216-
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
217-
.map(ResponseEntity::ok)
218-
).doOnEach(sig -> audit(context, sig));
232+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
233+
ConnectorAction.VIEW, ConnectorAction.OPERATE)
234+
.flatMap(hasAccess -> {
235+
if (!hasAccess) {
236+
return Mono.error(new AccessDeniedException("Access denied"));
237+
}
238+
return kafkaConnectService
239+
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
240+
.map(ResponseEntity::ok);
241+
})
242+
.doOnEach(sig -> audit(context, sig));
219243
}
220244

221245
@Override
@@ -311,17 +335,24 @@ public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, Stri
311335
String connectorName,
312336
ServerWebExchange exchange) {
313337

338+
String connectorResource = connectName + "/" + connectorName;
314339
var context = AccessContext.builder()
315340
.cluster(clusterName)
316-
.connectActions(connectName, VIEW, RESET_OFFSETS)
341+
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
317342
.operationName("resetConnectorOffsets")
318343
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
319344
.build();
320345

321-
return validateAccess(context).then(
322-
kafkaConnectService
323-
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
324-
.map(ResponseEntity::ok))
346+
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
347+
ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
348+
.flatMap(hasAccess -> {
349+
if (!hasAccess) {
350+
return Mono.error(new AccessDeniedException("Access denied"));
351+
}
352+
return kafkaConnectService
353+
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
354+
.map(ResponseEntity::ok);
355+
})
325356
.doOnEach(sig -> audit(context, sig));
326357
}
327358
}

api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
1010
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
1111
import io.kafbat.ui.model.rbac.permission.ConnectAction;
12+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
1213
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1314
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1415
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -130,6 +131,11 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti
130131
return this;
131132
}
132133

134+
public AccessContextBuilder connectorActions(String connector, ConnectorAction... actions) {
135+
accessedResources.add(new SingleResourceAccess(connector, Resource.CONNECTOR, List.of(actions)));
136+
return this;
137+
}
138+
133139
public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
134140
accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
135141
return this;

api/src/main/java/io/kafbat/ui/model/rbac/Resource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
77
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
88
import io.kafbat.ui.model.rbac.permission.ConnectAction;
9+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
910
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1011
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1112
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
@@ -36,6 +37,8 @@ public enum Resource {
3637

3738
CONNECT(ConnectAction.values(), ConnectAction.ALIASES),
3839

40+
CONNECTOR(ConnectorAction.values(), ConnectorAction.ALIASES),
41+
3942
KSQL(KsqlAction.values()),
4043

4144
ACL(AclAction.values()),
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kafbat.ui.model.rbac.permission;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import org.apache.commons.lang3.EnumUtils;
6+
import org.jetbrains.annotations.Nullable;
7+
8+
public enum ConnectorAction implements PermissibleAction {
9+
10+
VIEW,
11+
EDIT(VIEW),
12+
CREATE(VIEW),
13+
OPERATE(VIEW),
14+
DELETE(VIEW),
15+
RESET_OFFSETS(VIEW),
16+
;
17+
18+
private final ConnectorAction[] dependantActions;
19+
20+
ConnectorAction(ConnectorAction... dependantActions) {
21+
this.dependantActions = dependantActions;
22+
}
23+
24+
public static final Set<ConnectorAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, OPERATE, RESET_OFFSETS);
25+
26+
public static final Map<String, PermissibleAction> ALIASES = Map.of(
27+
"restart", OPERATE,
28+
"pause", OPERATE,
29+
"resume", OPERATE,
30+
"restart_task", OPERATE,
31+
"state_update", OPERATE
32+
);
33+
34+
@Nullable
35+
public static ConnectorAction fromString(String name) {
36+
return EnumUtils.getEnum(ConnectorAction.class, name);
37+
}
38+
39+
@Override
40+
public boolean isAlter() {
41+
return ALTER_ACTIONS.contains(this);
42+
}
43+
44+
@Override
45+
public PermissibleAction[] dependantActions() {
46+
return dependantActions;
47+
}
48+
}

api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
public sealed interface PermissibleAction permits
66
AclAction, ApplicationConfigAction,
77
ConsumerGroupAction, SchemaAction,
8-
ConnectAction, ClusterConfigAction,
8+
ConnectAction, ConnectorAction, ClusterConfigAction,
99
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {
1010

1111
String name();

api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kafbat.ui.model.rbac.Role;
1313
import io.kafbat.ui.model.rbac.Subject;
1414
import io.kafbat.ui.model.rbac.permission.ConnectAction;
15+
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
1516
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
1617
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1718
import io.kafbat.ui.model.rbac.permission.TopicAction;
@@ -21,6 +22,7 @@
2122
import io.kafbat.ui.service.rbac.extractor.OauthAuthorityExtractor;
2223
import io.kafbat.ui.service.rbac.extractor.ProviderAuthorityExtractor;
2324
import jakarta.annotation.PostConstruct;
25+
import java.util.Arrays;
2426
import java.util.Collections;
2527
import java.util.List;
2628
import java.util.Objects;
@@ -206,6 +208,63 @@ public Mono<Boolean> isConnectAccessible(String connectName, String clusterName)
206208
);
207209
}
208210

211+
public Mono<Boolean> isConnectorAccessible(String connectName, String connectorName,
212+
String clusterName, ConnectorAction... actions) {
213+
String connectorResource = connectName + "/" + connectorName;
214+
215+
// First check for specific connector permissions
216+
var connectorContext = AccessContext.builder()
217+
.cluster(clusterName)
218+
.connectorActions(connectorResource, actions)
219+
.build();
220+
221+
return isAccessible(connectorContext)
222+
.flatMap(hasConnectorAccess -> {
223+
if (hasConnectorAccess) {
224+
return Mono.just(true);
225+
}
226+
227+
// Fall back to checking connect-level permissions
228+
// Map connector actions to corresponding connect actions
229+
ConnectAction[] connectActions = mapToConnectActions(actions);
230+
if (connectActions.length == 0) {
231+
return Mono.just(false);
232+
}
233+
234+
var connectContext = AccessContext.builder()
235+
.cluster(clusterName)
236+
.connectActions(connectName, connectActions)
237+
.build();
238+
239+
return isAccessible(connectContext);
240+
});
241+
}
242+
243+
private ConnectAction[] mapToConnectActions(ConnectorAction[] connectorActions) {
244+
return Arrays.stream(connectorActions)
245+
.map(action -> {
246+
switch (action) {
247+
case VIEW:
248+
return ConnectAction.VIEW;
249+
case EDIT:
250+
return ConnectAction.EDIT;
251+
case CREATE:
252+
return ConnectAction.CREATE;
253+
case DELETE:
254+
return ConnectAction.DELETE;
255+
case OPERATE:
256+
return ConnectAction.OPERATE;
257+
case RESET_OFFSETS:
258+
return ConnectAction.RESET_OFFSETS;
259+
default:
260+
return null;
261+
}
262+
})
263+
.filter(Objects::nonNull)
264+
.distinct()
265+
.toArray(ConnectAction[]::new);
266+
}
267+
209268
public List<Role> getRoles() {
210269
if (!rbacEnabled) {
211270
return Collections.emptyList();

0 commit comments

Comments
 (0)