Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;

import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
Expand All @@ -19,6 +17,7 @@
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
Expand All @@ -29,6 +28,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
Expand All @@ -41,6 +41,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private static final String CONNECTOR_NAME = "connectorName";
private static final String ACCESS_DENIED = "Access denied";

private final KafkaConnectService kafkaConnectService;

Expand Down Expand Up @@ -97,34 +98,47 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, ConnectAction.VIEW)
.connectorActions(connectorResource, ConnectorAction.VIEW)
.operationName("getConnector")
.build();

return validateAccess(context).then(
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
// Use hierarchical permission check
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.VIEW)
.flatMap(hasAccess -> {
if (!hasAccess) {
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
}
return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok);
})
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, ConnectAction.DELETE)
.connectorActions(connectorResource, ConnectorAction.DELETE)
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.DELETE)
.flatMap(hasAccess -> {
if (!hasAccess) {
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
}
return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok);
})
.doOnEach(sig -> audit(context, sig));
}


Expand Down Expand Up @@ -182,17 +196,24 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT)
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
ConnectorAction.VIEW, ConnectorAction.EDIT)
.flatMap(hasAccess -> {
if (!hasAccess) {
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
}
return kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok);
})
.doOnEach(sig -> audit(context, sig));
}

Expand All @@ -201,21 +222,25 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
ConnectAction[] connectActions;
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE};

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, connectActions)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE)
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
ConnectorAction.VIEW, ConnectorAction.OPERATE)
.flatMap(hasAccess -> {
if (!hasAccess) {
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
}
return kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok);
})
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand Down Expand Up @@ -311,17 +336,24 @@ public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, Stri
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, VIEW, RESET_OFFSETS)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName,
ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
.flatMap(hasAccess -> {
if (!hasAccess) {
return Mono.error(new AccessDeniedException(ACCESS_DENIED));
}
return kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok);
})
.doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
Expand Down Expand Up @@ -130,6 +131,11 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti
return this;
}

public AccessContextBuilder connectorActions(String connector, ConnectorAction... actions) {
accessedResources.add(new SingleResourceAccess(connector, Resource.CONNECTOR, List.of(actions)));
return this;
}

public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
return this;
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/rbac/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
Expand Down Expand Up @@ -36,6 +37,8 @@ public enum Resource {

CONNECT(ConnectAction.values(), ConnectAction.ALIASES),

CONNECTOR(ConnectorAction.values(), ConnectorAction.ALIASES),

KSQL(KsqlAction.values()),

ACL(AclAction.values()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.kafbat.ui.model.rbac.permission;

import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum ConnectorAction implements PermissibleAction {

VIEW,
EDIT(VIEW),
CREATE(VIEW),
OPERATE(VIEW),
DELETE(VIEW),
RESET_OFFSETS(VIEW),
;

public static final String CONNECTOR_RESOURCE_DELIMITER = "/";

private final ConnectorAction[] dependantActions;

ConnectorAction(ConnectorAction... dependantActions) {
this.dependantActions = dependantActions;
}

public static final Set<ConnectorAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, OPERATE, RESET_OFFSETS);

public static final Map<String, PermissibleAction> ALIASES = Map.of(
"restart", OPERATE,
"pause", OPERATE,
"resume", OPERATE,
"restart_task", OPERATE,
"state_update", OPERATE
);

@Nullable
public static ConnectorAction fromString(String name) {
return EnumUtils.getEnum(ConnectorAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}

@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}

public static String buildResourcePath(String connectName, String connectorName) {
return connectName + CONNECTOR_RESOURCE_DELIMITER + connectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
ConnectAction, ConnectorAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {

String name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kafbat.ui.model.rbac.Role;
import io.kafbat.ui.model.rbac.Subject;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.SchemaAction;
import io.kafbat.ui.model.rbac.permission.TopicAction;
Expand All @@ -21,6 +22,7 @@
import io.kafbat.ui.service.rbac.extractor.OauthAuthorityExtractor;
import io.kafbat.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -206,6 +208,63 @@ public Mono<Boolean> isConnectAccessible(String connectName, String clusterName)
);
}

public Mono<Boolean> isConnectorAccessible(String connectName, String connectorName,
String clusterName, ConnectorAction... actions) {
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);

// First check for specific connector permissions
var connectorContext = AccessContext.builder()
.cluster(clusterName)
.connectorActions(connectorResource, actions)
.build();

return isAccessible(connectorContext)
.flatMap(hasConnectorAccess -> {
if (hasConnectorAccess) {
return Mono.just(true);
}

// Fall back to checking connect-level permissions
// Map connector actions to corresponding connect actions
ConnectAction[] connectActions = mapToConnectActions(actions);
if (connectActions.length == 0) {
return Mono.just(false);
}

var connectContext = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, connectActions)
.build();

return isAccessible(connectContext);
});
}

private ConnectAction[] mapToConnectActions(ConnectorAction[] connectorActions) {
return Arrays.stream(connectorActions)
.map(action -> {
switch (action) {
case VIEW:
return ConnectAction.VIEW;
case EDIT:
return ConnectAction.EDIT;
case CREATE:
return ConnectAction.CREATE;
case DELETE:
return ConnectAction.DELETE;
case OPERATE:
return ConnectAction.OPERATE;
case RESET_OFFSETS:
return ConnectAction.RESET_OFFSETS;
default:
return null;
}
})
.filter(Objects::nonNull)
.distinct()
.toArray(ConnectAction[]::new);
}

public List<Role> getRoles() {
if (!rbacEnabled) {
return Collections.emptyList();
Expand Down
Loading
Loading