Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;

import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
Expand All @@ -19,6 +17,7 @@
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
Expand Down Expand Up @@ -97,8 +96,10 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectorActions(connectorResource, ConnectorAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnector")
.build();
Expand All @@ -114,11 +115,13 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectorActions(connectorResource, ConnectorAction.DELETE)
.connectActions(connectName, ConnectAction.DELETE)
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
Expand Down Expand Up @@ -182,31 +185,32 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
ConnectAction[] connectActions;
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE};

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, connectActions)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.OPERATE)
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand Down Expand Up @@ -311,17 +315,19 @@ public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, Stri
String connectorName,
ServerWebExchange exchange) {

String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, VIEW, RESET_OFFSETS)
.connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.RESET_OFFSETS)
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
Expand Down Expand Up @@ -130,6 +131,11 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti
return this;
}

public AccessContextBuilder connectorActions(String connector, ConnectorAction... actions) {
accessedResources.add(new SingleResourceAccess(connector, Resource.CONNECTOR, List.of(actions)));
return this;
}

public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
return this;
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/rbac/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
Expand Down Expand Up @@ -36,6 +37,8 @@ public enum Resource {

CONNECT(ConnectAction.values(), ConnectAction.ALIASES),

CONNECTOR(ConnectorAction.values(), ConnectorAction.ALIASES),

KSQL(KsqlAction.values()),

ACL(AclAction.values()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.kafbat.ui.model.rbac.permission;

import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum ConnectorAction implements PermissibleAction {

VIEW,
EDIT(VIEW),
CREATE(VIEW),
OPERATE(VIEW),
DELETE(VIEW),
RESET_OFFSETS(VIEW),
;

public static final String CONNECTOR_RESOURCE_DELIMITER = "/";

private final ConnectorAction[] dependantActions;

ConnectorAction(ConnectorAction... dependantActions) {
this.dependantActions = dependantActions;
}

public static final Set<ConnectorAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, OPERATE, RESET_OFFSETS);

public static final Map<String, PermissibleAction> ALIASES = Map.of(
"restart", OPERATE,
"pause", OPERATE,
"resume", OPERATE,
"restart_task", OPERATE,
"state_update", OPERATE
);

@Nullable
public static ConnectorAction fromString(String name) {
return EnumUtils.getEnum(ConnectorAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}

@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}

public static String buildResourcePath(String connectName, String connectorName) {
return connectName + CONNECTOR_RESOURCE_DELIMITER + connectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
ConnectAction, ConnectorAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {

String name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kafbat.ui.model.rbac.Role;
import io.kafbat.ui.model.rbac.Subject;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
import io.kafbat.ui.model.rbac.permission.SchemaAction;
import io.kafbat.ui.model.rbac.permission.TopicAction;
Expand All @@ -21,6 +22,7 @@
import io.kafbat.ui.service.rbac.extractor.OauthAuthorityExtractor;
import io.kafbat.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -206,6 +208,63 @@ public Mono<Boolean> isConnectAccessible(String connectName, String clusterName)
);
}

public Mono<Boolean> isConnectorAccessible(String connectName, String connectorName,
String clusterName, ConnectorAction... actions) {
String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName);

// First check for specific connector permissions
var connectorContext = AccessContext.builder()
.cluster(clusterName)
.connectorActions(connectorResource, actions)
.build();

return isAccessible(connectorContext)
.flatMap(hasConnectorAccess -> {
if (hasConnectorAccess) {
return Mono.just(true);
}

// Fall back to checking connect-level permissions
// Map connector actions to corresponding connect actions
ConnectAction[] connectActions = mapToConnectActions(actions);
if (connectActions.length == 0) {
return Mono.just(false);
}

var connectContext = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, connectActions)
.build();

return isAccessible(connectContext);
});
}

private ConnectAction[] mapToConnectActions(ConnectorAction[] connectorActions) {
return Arrays.stream(connectorActions)
.map(action -> {
switch (action) {
case VIEW:
return ConnectAction.VIEW;
case EDIT:
return ConnectAction.EDIT;
case CREATE:
return ConnectAction.CREATE;
case DELETE:
return ConnectAction.DELETE;
case OPERATE:
return ConnectAction.OPERATE;
case RESET_OFFSETS:
return ConnectAction.RESET_OFFSETS;
default:
return null;
}
})
.filter(Objects::nonNull)
.distinct()
.toArray(ConnectAction[]::new);
}

public List<Role> getRoles() {
if (!rbacEnabled) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kafbat.ui.model.rbac.AccessContext.SingleResourceAccess;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConnectorAction;
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
import io.kafbat.ui.model.rbac.permission.TopicAction;
import jakarta.annotation.Nullable;
Expand Down Expand Up @@ -113,6 +114,73 @@ void shouldMapActionAliases() {
assertThat(allowed).isTrue();
}

@Test
void allowsAccessForConnectorWithSpecificNameIfUserHasPermission() {
SingleResourceAccess sra =
new SingleResourceAccess("my-connect/my-connector", Resource.CONNECTOR,
List.of(ConnectorAction.VIEW, ConnectorAction.OPERATE));

var allowed = sra.isAccessible(
List.of(
permission(Resource.CONNECTOR, "my-connect/my-connector", ConnectorAction.VIEW, ConnectorAction.OPERATE)));

assertThat(allowed).isTrue();
}

@Test
void allowsAccessForConnectorWithWildcardPatternIfUserHasPermission() {
SingleResourceAccess sra =
new SingleResourceAccess("prod-connect/customer-connector", Resource.CONNECTOR,
List.of(ConnectorAction.VIEW));

var allowed = sra.isAccessible(
List.of(
permission(Resource.CONNECTOR, "prod-connect/.*", ConnectorAction.VIEW, ConnectorAction.EDIT)));

assertThat(allowed).isTrue();
}

@Test
void deniesAccessForConnectorIfUserLacksRequiredPermission() {
SingleResourceAccess sra =
new SingleResourceAccess("my-connect/my-connector", Resource.CONNECTOR,
List.of(ConnectorAction.DELETE));

var allowed = sra.isAccessible(
List.of(
permission(Resource.CONNECTOR, "my-connect/my-connector", ConnectorAction.VIEW, ConnectorAction.EDIT)));

assertThat(allowed).isFalse();
}

@Test
void allowsAccessForConnectorWithMultipleWildcardPatterns() {
SingleResourceAccess sra =
new SingleResourceAccess("staging-connect/debezium-mysql-connector", Resource.CONNECTOR,
List.of(ConnectorAction.RESET_OFFSETS));

var allowed = sra.isAccessible(
List.of(
permission(Resource.CONNECTOR, ".*/debezium-.*", ConnectorAction.RESET_OFFSETS),
permission(Resource.CONNECTOR, "staging-.*/.*", ConnectorAction.VIEW)));

assertThat(allowed).isTrue();
}

@Test
void testConnectorActionHierarchy() {
// Test that EDIT includes VIEW permission
SingleResourceAccess sra =
new SingleResourceAccess("test-connect/test-connector", Resource.CONNECTOR,
List.of(ConnectorAction.VIEW));

var allowed = sra.isAccessible(
List.of(
permission(Resource.CONNECTOR, "test-connect/.*", ConnectorAction.EDIT)));

assertThat(allowed).isTrue();
}

private Permission permission(Resource res, @Nullable String namePattern, PermissibleAction... actions) {
return permission(
res, namePattern, Stream.of(actions).map(PermissibleAction::name).toList()
Expand Down
Loading
Loading