From 4cce9225e2df72821e2fec29e2058f091d748dc1 Mon Sep 17 00:00:00 2001 From: Joshua Nathaniel Miller Date: Sat, 29 Nov 2025 00:10:46 -0600 Subject: [PATCH 1/2] feat: Add connector-level permissions for Kafka Connect (#614) Implements granular permission control at the individual connector level, allowing administrators to grant permissions for specific connectors rather than entire Kafka Connect instances. Changes: - Add CONNECTOR resource type and ConnectorAction enum for granular permissions - Implement hierarchical permission checking (connector-level takes precedence) - Update frontend to check connector permissions with connect-level fallback - Add comprehensive tests for connector permission scenarios - Upgrade Testcontainers to 2.0.2 for Docker Engine 29 compatibility Features: - Permission format: `connect-name/connector-name` for specific connectors - Wildcard patterns supported (e.g., `.*-connect/prod-.*`) - Backwards compatible with existing CONNECT permissions - Action hierarchy maintained (EDIT includes VIEW permission) --- .../ui/controller/KafkaConnectController.java | 96 ++++--- .../kafbat/ui/model/rbac/AccessContext.java | 6 + .../io/kafbat/ui/model/rbac/Resource.java | 3 + .../rbac/permission/ConnectorAction.java | 54 ++++ .../rbac/permission/PermissibleAction.java | 2 +- .../ui/service/rbac/AccessControlService.java | 59 ++++ .../ui/model/rbac/AccessContextTest.java | 68 +++++ .../AccessControlServiceRbacEnabledTest.java | 162 +++++++++++ .../rbac/RbacConnectorPermissionsTest.java | 268 ++++++++++++++++++ contract-typespec/api/config.tsp | 1 + .../main/resources/swagger/kafbat-ui-api.yaml | 1 + .../Connect/Details/Actions/Actions.tsx | 172 +++++++---- .../Details/Tasks/ActionsCellTasks.tsx | 25 +- .../connectorsColumns/cells/ActionsCell.tsx | 170 +++++++---- .../ActionDropdownItemWithFallback.tsx | 64 +++++ .../common/ActionComponent/index.ts | 2 + gradle/libs.versions.toml | 6 +- 17 files changed, 1001 insertions(+), 158 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectorAction.java create mode 100644 api/src/test/java/io/kafbat/ui/service/rbac/RbacConnectorPermissionsTest.java create mode 100644 frontend/src/components/common/ActionComponent/ActionDropDownItem/ActionDropdownItemWithFallback.tsx diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 559d9ae6c..8511a1044 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -3,8 +3,6 @@ import static io.kafbat.ui.model.ConnectorActionDTO.RESTART; import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS; import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS; -import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS; -import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW; import io.kafbat.ui.api.KafkaConnectApi; import io.kafbat.ui.model.ConnectDTO; @@ -19,6 +17,7 @@ import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; import io.kafbat.ui.service.KafkaConnectService; import io.kafbat.ui.service.mcp.McpTool; import java.util.Comparator; @@ -29,6 +28,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; +import org.springframework.security.access.AccessDeniedException; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; @@ -41,6 +41,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC private static final Set RESTART_ACTIONS = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS); private static final String CONNECTOR_NAME = "connectorName"; + private static final String ACCESS_DENIED = "Access denied"; private final KafkaConnectService kafkaConnectService; @@ -97,16 +98,23 @@ public Mono> getConnector(String clusterName, Strin String connectorName, ServerWebExchange exchange) { + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); var context = AccessContext.builder() .cluster(clusterName) - .connectActions(connectName, ConnectAction.VIEW) + .connectorActions(connectorResource, ConnectorAction.VIEW) .operationName("getConnector") .build(); - return validateAccess(context).then( - kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok) - ).doOnEach(sig -> audit(context, sig)); + // Use hierarchical permission check + return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.VIEW) + .flatMap(hasAccess -> { + if (!hasAccess) { + return Mono.error(new AccessDeniedException(ACCESS_DENIED)); + } + return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok); + }) + .doOnEach(sig -> audit(context, sig)); } @Override @@ -114,17 +122,23 @@ public Mono> deleteConnector(String clusterName, String con String connectorName, ServerWebExchange exchange) { + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); var context = AccessContext.builder() .cluster(clusterName) - .connectActions(connectName, ConnectAction.DELETE) + .connectorActions(connectorResource, ConnectorAction.DELETE) .operationName("deleteConnector") - .operationParams(Map.of(CONNECTOR_NAME, connectName)) + .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return validateAccess(context).then( - kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok) - ).doOnEach(sig -> audit(context, sig)); + return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.DELETE) + .flatMap(hasAccess -> { + if (!hasAccess) { + return Mono.error(new AccessDeniedException(ACCESS_DENIED)); + } + return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok); + }) + .doOnEach(sig -> audit(context, sig)); } @@ -182,17 +196,24 @@ public Mono> setConnectorConfig(String clusterName, Mono> requestBody, ServerWebExchange exchange) { + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); var context = AccessContext.builder() .cluster(clusterName) - .connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT) + .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT) .operationName("setConnectorConfig") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return validateAccess(context).then( - kafkaConnectService - .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) - .map(ResponseEntity::ok)) + return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, + ConnectorAction.VIEW, ConnectorAction.EDIT) + .flatMap(hasAccess -> { + if (!hasAccess) { + return Mono.error(new AccessDeniedException(ACCESS_DENIED)); + } + return kafkaConnectService + .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) + .map(ResponseEntity::ok); + }) .doOnEach(sig -> audit(context, sig)); } @@ -201,21 +222,25 @@ public Mono> updateConnectorState(String clusterName, Strin String connectorName, ConnectorActionDTO action, ServerWebExchange exchange) { - ConnectAction[] connectActions; - connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.OPERATE}; - + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); var context = AccessContext.builder() .cluster(clusterName) - .connectActions(connectName, connectActions) + .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE) .operationName("updateConnectorState") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return validateAccess(context).then( - kafkaConnectService - .updateConnectorState(getCluster(clusterName), connectName, connectorName, action) - .map(ResponseEntity::ok) - ).doOnEach(sig -> audit(context, sig)); + return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, + ConnectorAction.VIEW, ConnectorAction.OPERATE) + .flatMap(hasAccess -> { + if (!hasAccess) { + return Mono.error(new AccessDeniedException(ACCESS_DENIED)); + } + return kafkaConnectService + .updateConnectorState(getCluster(clusterName), connectName, connectorName, action) + .map(ResponseEntity::ok); + }) + .doOnEach(sig -> audit(context, sig)); } @Override @@ -311,17 +336,24 @@ public Mono> resetConnectorOffsets(String clusterName, Stri String connectorName, ServerWebExchange exchange) { + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); var context = AccessContext.builder() .cluster(clusterName) - .connectActions(connectName, VIEW, RESET_OFFSETS) + .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS) .operationName("resetConnectorOffsets") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return validateAccess(context).then( - kafkaConnectService - .resetConnectorOffsets(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok)) + return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, + ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS) + .flatMap(hasAccess -> { + if (!hasAccess) { + return Mono.error(new AccessDeniedException(ACCESS_DENIED)); + } + return kafkaConnectService + .resetConnectorOffsets(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok); + }) .doOnEach(sig -> audit(context, sig)); } } diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java index dbf5c456b..d011005f0 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java @@ -9,6 +9,7 @@ import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; import io.kafbat.ui.model.rbac.permission.KsqlAction; import io.kafbat.ui.model.rbac.permission.PermissibleAction; @@ -130,6 +131,11 @@ public AccessContextBuilder connectActions(String connect, ConnectAction... acti return this; } + public AccessContextBuilder connectorActions(String connector, ConnectorAction... actions) { + accessedResources.add(new SingleResourceAccess(connector, Resource.CONNECTOR, List.of(actions))); + return this; + } + public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) { accessedResources.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions))); return this; diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java b/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java index 691bc9e94..0bba98745 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java @@ -6,6 +6,7 @@ import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; import io.kafbat.ui.model.rbac.permission.KsqlAction; import io.kafbat.ui.model.rbac.permission.PermissibleAction; @@ -36,6 +37,8 @@ public enum Resource { CONNECT(ConnectAction.values(), ConnectAction.ALIASES), + CONNECTOR(ConnectorAction.values(), ConnectorAction.ALIASES), + KSQL(KsqlAction.values()), ACL(AclAction.values()), diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectorAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectorAction.java new file mode 100644 index 000000000..9ac083edb --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectorAction.java @@ -0,0 +1,54 @@ +package io.kafbat.ui.model.rbac.permission; + +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.EnumUtils; +import org.jetbrains.annotations.Nullable; + +public enum ConnectorAction implements PermissibleAction { + + VIEW, + EDIT(VIEW), + CREATE(VIEW), + OPERATE(VIEW), + DELETE(VIEW), + RESET_OFFSETS(VIEW), + ; + + public static final String CONNECTOR_RESOURCE_DELIMITER = "/"; + + private final ConnectorAction[] dependantActions; + + ConnectorAction(ConnectorAction... dependantActions) { + this.dependantActions = dependantActions; + } + + public static final Set ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, OPERATE, RESET_OFFSETS); + + public static final Map ALIASES = Map.of( + "restart", OPERATE, + "pause", OPERATE, + "resume", OPERATE, + "restart_task", OPERATE, + "state_update", OPERATE + ); + + @Nullable + public static ConnectorAction fromString(String name) { + return EnumUtils.getEnum(ConnectorAction.class, name); + } + + @Override + public boolean isAlter() { + return ALTER_ACTIONS.contains(this); + } + + @Override + public PermissibleAction[] dependantActions() { + return dependantActions; + } + + public static String buildResourcePath(String connectName, String connectorName) { + return connectName + CONNECTOR_RESOURCE_DELIMITER + connectorName; + } +} \ No newline at end of file diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java index a1394ac7e..67c0a64c7 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java @@ -5,7 +5,7 @@ public sealed interface PermissibleAction permits AclAction, ApplicationConfigAction, ConsumerGroupAction, SchemaAction, - ConnectAction, ClusterConfigAction, + ConnectAction, ConnectorAction, ClusterConfigAction, KsqlAction, TopicAction, AuditAction, ClientQuotaAction { String name(); diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java index 1837cb687..b05df1d1c 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java @@ -12,6 +12,7 @@ import io.kafbat.ui.model.rbac.Role; import io.kafbat.ui.model.rbac.Subject; import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; import io.kafbat.ui.model.rbac.permission.SchemaAction; import io.kafbat.ui.model.rbac.permission.TopicAction; @@ -21,6 +22,7 @@ import io.kafbat.ui.service.rbac.extractor.OauthAuthorityExtractor; import io.kafbat.ui.service.rbac.extractor.ProviderAuthorityExtractor; import jakarta.annotation.PostConstruct; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -206,6 +208,63 @@ public Mono isConnectAccessible(String connectName, String clusterName) ); } + public Mono isConnectorAccessible(String connectName, String connectorName, + String clusterName, ConnectorAction... actions) { + String connectorResource = ConnectorAction.buildResourcePath(connectName, connectorName); + + // First check for specific connector permissions + var connectorContext = AccessContext.builder() + .cluster(clusterName) + .connectorActions(connectorResource, actions) + .build(); + + return isAccessible(connectorContext) + .flatMap(hasConnectorAccess -> { + if (hasConnectorAccess) { + return Mono.just(true); + } + + // Fall back to checking connect-level permissions + // Map connector actions to corresponding connect actions + ConnectAction[] connectActions = mapToConnectActions(actions); + if (connectActions.length == 0) { + return Mono.just(false); + } + + var connectContext = AccessContext.builder() + .cluster(clusterName) + .connectActions(connectName, connectActions) + .build(); + + return isAccessible(connectContext); + }); + } + + private ConnectAction[] mapToConnectActions(ConnectorAction[] connectorActions) { + return Arrays.stream(connectorActions) + .map(action -> { + switch (action) { + case VIEW: + return ConnectAction.VIEW; + case EDIT: + return ConnectAction.EDIT; + case CREATE: + return ConnectAction.CREATE; + case DELETE: + return ConnectAction.DELETE; + case OPERATE: + return ConnectAction.OPERATE; + case RESET_OFFSETS: + return ConnectAction.RESET_OFFSETS; + default: + return null; + } + }) + .filter(Objects::nonNull) + .distinct() + .toArray(ConnectAction[]::new); + } + public List getRoles() { if (!rbacEnabled) { return Collections.emptyList(); diff --git a/api/src/test/java/io/kafbat/ui/model/rbac/AccessContextTest.java b/api/src/test/java/io/kafbat/ui/model/rbac/AccessContextTest.java index b1417f2c7..493c6deae 100644 --- a/api/src/test/java/io/kafbat/ui/model/rbac/AccessContextTest.java +++ b/api/src/test/java/io/kafbat/ui/model/rbac/AccessContextTest.java @@ -10,6 +10,7 @@ import io.kafbat.ui.model.rbac.AccessContext.SingleResourceAccess; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; import io.kafbat.ui.model.rbac.permission.PermissibleAction; import io.kafbat.ui.model.rbac.permission.TopicAction; import jakarta.annotation.Nullable; @@ -113,6 +114,73 @@ void shouldMapActionAliases() { assertThat(allowed).isTrue(); } + @Test + void allowsAccessForConnectorWithSpecificNameIfUserHasPermission() { + SingleResourceAccess sra = + new SingleResourceAccess("my-connect/my-connector", Resource.CONNECTOR, + List.of(ConnectorAction.VIEW, ConnectorAction.OPERATE)); + + var allowed = sra.isAccessible( + List.of( + permission(Resource.CONNECTOR, "my-connect/my-connector", ConnectorAction.VIEW, ConnectorAction.OPERATE))); + + assertThat(allowed).isTrue(); + } + + @Test + void allowsAccessForConnectorWithWildcardPatternIfUserHasPermission() { + SingleResourceAccess sra = + new SingleResourceAccess("prod-connect/customer-connector", Resource.CONNECTOR, + List.of(ConnectorAction.VIEW)); + + var allowed = sra.isAccessible( + List.of( + permission(Resource.CONNECTOR, "prod-connect/.*", ConnectorAction.VIEW, ConnectorAction.EDIT))); + + assertThat(allowed).isTrue(); + } + + @Test + void deniesAccessForConnectorIfUserLacksRequiredPermission() { + SingleResourceAccess sra = + new SingleResourceAccess("my-connect/my-connector", Resource.CONNECTOR, + List.of(ConnectorAction.DELETE)); + + var allowed = sra.isAccessible( + List.of( + permission(Resource.CONNECTOR, "my-connect/my-connector", ConnectorAction.VIEW, ConnectorAction.EDIT))); + + assertThat(allowed).isFalse(); + } + + @Test + void allowsAccessForConnectorWithMultipleWildcardPatterns() { + SingleResourceAccess sra = + new SingleResourceAccess("staging-connect/debezium-mysql-connector", Resource.CONNECTOR, + List.of(ConnectorAction.RESET_OFFSETS)); + + var allowed = sra.isAccessible( + List.of( + permission(Resource.CONNECTOR, ".*/debezium-.*", ConnectorAction.RESET_OFFSETS), + permission(Resource.CONNECTOR, "staging-.*/.*", ConnectorAction.VIEW))); + + assertThat(allowed).isTrue(); + } + + @Test + void testConnectorActionHierarchy() { + // Test that EDIT includes VIEW permission + SingleResourceAccess sra = + new SingleResourceAccess("test-connect/test-connector", Resource.CONNECTOR, + List.of(ConnectorAction.VIEW)); + + var allowed = sra.isAccessible( + List.of( + permission(Resource.CONNECTOR, "test-connect/.*", ConnectorAction.EDIT))); + + assertThat(allowed).isTrue(); + } + private Permission permission(Resource res, @Nullable String namePattern, PermissibleAction... actions) { return permission( res, namePattern, Stream.of(actions).map(PermissibleAction::name).toList() diff --git a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java index 2c24cb5d4..3cbb38d55 100644 --- a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java +++ b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java @@ -20,7 +20,13 @@ import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.rbac.AccessContext; +import io.kafbat.ui.model.rbac.DefaultRole; +import io.kafbat.ui.model.rbac.Permission; +import io.kafbat.ui.model.rbac.Resource; import io.kafbat.ui.model.rbac.Role; +import io.kafbat.ui.model.rbac.Subject; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; +import io.kafbat.ui.model.rbac.provider.Provider; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -296,4 +302,160 @@ void testGetRoles() { .anyMatch(role -> role.getName().equals(ADMIN_ROLE)); } + @Test + void isConnectorAccessible_withDirectConnectorPermission() { + withSecurityContext(() -> { + // Create a role with direct connector permission + Role connectorRole = new Role(); + connectorRole.setName("connector_role"); + connectorRole.setClusters(List.of(DEV_CLUSTER)); + + Subject subject = new Subject(); + subject.setType("group"); + subject.setProvider(Provider.LDAP); + subject.setValue("connector_role"); + connectorRole.setSubjects(List.of(subject)); + + Permission connectorPermission = new Permission(); + connectorPermission.setResource(Resource.CONNECTOR.name()); + connectorPermission.setActions(List.of("VIEW")); + connectorPermission.setValue(CONNECT_NAME + "/test-connector"); + + connectorRole.setPermissions(List.of(connectorPermission)); + connectorRole.validate(); + + // Mock the role retrieval + RoleBasedAccessControlProperties props = mock(RoleBasedAccessControlProperties.class); + when(props.getRoles()).thenReturn(List.of(connectorRole)); + when(user.groups()).thenReturn(List.of("connector_role")); + + ReflectionTestUtils.setField(accessControlService, "properties", props); + ReflectionTestUtils.setField(accessControlService, "rbacEnabled", true); + + Mono accessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "test-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + StepVerifier.create(accessible) + .expectNext(true) + .expectComplete() + .verify(); + }); + } + + @Test + void isConnectorAccessible_fallsBackToConnectPermission() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE)); + + // DEV_ROLE has CONNECT.VIEW permission for CONNECT_NAME + // Should allow access to any connector under that connect + Mono accessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "any-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + StepVerifier.create(accessible) + .expectNext(true) + .expectComplete() + .verify(); + }); + } + + @Test + void isConnectorAccessible_deniedWhenNoPermissions() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE)); + + // DEV_ROLE doesn't have permissions for "OTHER_CONNECT" + Mono accessible = accessControlService.isConnectorAccessible( + "OTHER_CONNECT", "some-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + StepVerifier.create(accessible) + .expectNext(false) + .expectComplete() + .verify(); + }); + } + + @Test + void isConnectorAccessible_withPatternMatching() { + withSecurityContext(() -> { + // Create a role with pattern-based connector permission + Role patternRole = new Role(); + patternRole.setName("pattern_role"); + patternRole.setClusters(List.of(DEV_CLUSTER)); + + Subject subject = new Subject(); + subject.setType("group"); + subject.setProvider(Provider.LDAP); + subject.setValue("pattern_role"); + patternRole.setSubjects(List.of(subject)); + + Permission connectorPermission = new Permission(); + connectorPermission.setResource(Resource.CONNECTOR.name()); + connectorPermission.setActions(List.of("VIEW", "EDIT")); + connectorPermission.setValue(".*-connect/prod-.*"); // Matches any connect ending with -connect and connectors starting with prod- + + patternRole.setPermissions(List.of(connectorPermission)); + patternRole.validate(); + + // Mock the role retrieval + RoleBasedAccessControlProperties props = mock(RoleBasedAccessControlProperties.class); + when(props.getRoles()).thenReturn(List.of(patternRole)); + when(user.groups()).thenReturn(List.of("pattern_role")); + + ReflectionTestUtils.setField(accessControlService, "properties", props); + ReflectionTestUtils.setField(accessControlService, "rbacEnabled", true); + + // Should match "dev-connect/prod-connector" + Mono accessible1 = accessControlService.isConnectorAccessible( + "dev-connect", "prod-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + StepVerifier.create(accessible1) + .expectNext(true) + .expectComplete() + .verify(); + + // Should not match "dev-connect/test-connector" + Mono accessible2 = accessControlService.isConnectorAccessible( + "dev-connect", "test-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + StepVerifier.create(accessible2) + .expectNext(false) + .expectComplete() + .verify(); + }); + } + + @Test + void isConnectorAccessible_multipleActions() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE)); + + // DEV_ROLE only has VIEW permission for CONNECT + // Should not allow EDIT, DELETE operations + Mono viewAccessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "some-connector", DEV_CLUSTER, ConnectorAction.VIEW); + + Mono editAccessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "some-connector", DEV_CLUSTER, ConnectorAction.EDIT); + + Mono deleteAccessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "some-connector", DEV_CLUSTER, ConnectorAction.DELETE); + + StepVerifier.create(viewAccessible) + .expectNext(true) + .expectComplete() + .verify(); + + StepVerifier.create(editAccessible) + .expectNext(false) + .expectComplete() + .verify(); + + StepVerifier.create(deleteAccessible) + .expectNext(false) + .expectComplete() + .verify(); + }); + } + } diff --git a/api/src/test/java/io/kafbat/ui/service/rbac/RbacConnectorPermissionsTest.java b/api/src/test/java/io/kafbat/ui/service/rbac/RbacConnectorPermissionsTest.java new file mode 100644 index 000000000..4867cae25 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/rbac/RbacConnectorPermissionsTest.java @@ -0,0 +1,268 @@ +package io.kafbat.ui.service.rbac; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.kafbat.ui.AbstractIntegrationTest; +import io.kafbat.ui.config.auth.RbacUser; +import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties; +import io.kafbat.ui.model.rbac.AccessContext; +import io.kafbat.ui.model.rbac.Permission; +import io.kafbat.ui.model.rbac.Resource; +import io.kafbat.ui.model.rbac.Role; +import io.kafbat.ui.model.rbac.Subject; +import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.model.rbac.permission.ConnectorAction; +import io.kafbat.ui.model.rbac.provider.Provider; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.AccessDeniedException; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.ReactiveSecurityContextHolder; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * Test cases for connector-level permissions in Kafka Connect. + * Tests the hierarchical permission system where connector-level permissions + * take precedence over connect-level permissions. + */ +@ActiveProfiles("rbac") +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +class RbacConnectorPermissionsTest extends AbstractIntegrationTest { + + public static final String DEV_ROLE_NAME = "dev_role"; + public static final String ADMIN_ROLE_NAME = "admin_role"; + public static final String CLUSTER_NAME = "local"; + public static final String CONNECT_NAME = "kafka-connect"; + public static final String CONNECTOR_NAME = "my-connector"; + public static final String ANOTHER_CONNECTOR_NAME = "another-connector"; + + @Autowired + AccessControlService accessControlService; + + @Mock + SecurityContext securityContext; + + @Mock + Authentication authentication; + + @Mock + RbacUser user; + + @BeforeEach + void setUp() { + // Mock roles + List roles = List.of( + getDevRole(), + getAdminRole() + ); + RoleBasedAccessControlProperties properties = mock(); + when(properties.getRoles()).thenReturn(roles); + + ReflectionTestUtils.setField(accessControlService, "properties", properties); + ReflectionTestUtils.setField(accessControlService, "rbacEnabled", true); + + // Mock security context + when(securityContext.getAuthentication()).thenReturn(authentication); + when(authentication.getPrincipal()).thenReturn(user); + } + + public void withSecurityContext(Runnable runnable) { + try (MockedStatic ctxHolder = Mockito.mockStatic( + ReactiveSecurityContextHolder.class)) { + // Mock static method to get security context + ctxHolder.when(ReactiveSecurityContextHolder::getContext).thenReturn(Mono.just(securityContext)); + runnable.run(); + } + } + + /** + * Test that a user with specific connector-level permission can view the connector. + */ + @Test + void validateAccess_withConnectorLevelPermission_allowed() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE_NAME)); + AccessContext context = AccessContext.builder() + .cluster(CLUSTER_NAME) + .connectorActions(ConnectorAction.buildResourcePath(CONNECT_NAME, CONNECTOR_NAME), ConnectorAction.VIEW) + .build(); + Mono validateAccessMono = accessControlService.validateAccess(context); + StepVerifier.create(validateAccessMono) + .expectComplete() + .verify(); + }); + } + + /** + * Test that a user without specific connector-level permission + * but with connect-level permission is denied access. + */ + @Test + void validateAccess_withoutConnectorLevelPermission_denied() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE_NAME)); + AccessContext context = AccessContext.builder() + .cluster(CLUSTER_NAME) + .connectorActions(ConnectorAction.buildResourcePath(CONNECT_NAME, ANOTHER_CONNECTOR_NAME), ConnectorAction.VIEW) + .build(); + Mono validateAccessMono = accessControlService.validateAccess(context); + StepVerifier.create(validateAccessMono) + .expectErrorMatches(e -> e instanceof AccessDeniedException) + .verify(); + }); + } + + /** + * Test that a user with wildcard connector permission can access any connector. + */ + @Test + void validateAccess_withWildcardConnectorPermission_allowed() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(ADMIN_ROLE_NAME)); + AccessContext context = AccessContext.builder() + .cluster(CLUSTER_NAME) + .connectorActions(CONNECT_NAME + "/any-connector-name", ConnectorAction.VIEW) + .build(); + Mono validateAccessMono = accessControlService.validateAccess(context); + StepVerifier.create(validateAccessMono) + .expectComplete() + .verify(); + }); + } + + /** + * Test that connector-level DELETE permission works. + */ + @Test + void validateAccess_withConnectorLevelDeletePermission_allowed() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE_NAME)); + AccessContext context = AccessContext.builder() + .cluster(CLUSTER_NAME) + .connectorActions(ConnectorAction.buildResourcePath(CONNECT_NAME, CONNECTOR_NAME), ConnectorAction.DELETE) + .build(); + Mono validateAccessMono = accessControlService.validateAccess(context); + StepVerifier.create(validateAccessMono) + .expectComplete() + .verify(); + }); + } + + /** + * Test that a user with connect-level permission can still access + * if no connector-level permissions are defined (backwards compatibility). + */ + @Test + void isConnectorAccessible_withConnectPermission_fallback() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(ADMIN_ROLE_NAME)); + Mono accessible = accessControlService.isConnectorAccessible( + CONNECT_NAME, "any-connector", CLUSTER_NAME, ConnectorAction.OPERATE); + StepVerifier.create(accessible) + .expectNext(true) + .verifyComplete(); + }); + } + + /** + * Test that hierarchical permission checking works - EDIT includes VIEW. + */ + @Test + void validateAccess_connectorActionHierarchy() { + withSecurityContext(() -> { + when(user.groups()).thenReturn(List.of(DEV_ROLE_NAME)); + // Dev role has EDIT permission on my-connector, which should include VIEW + AccessContext context = AccessContext.builder() + .cluster(CLUSTER_NAME) + .connectorActions(ConnectorAction.buildResourcePath(CONNECT_NAME, CONNECTOR_NAME), ConnectorAction.VIEW) + .build(); + Mono validateAccessMono = accessControlService.validateAccess(context); + StepVerifier.create(validateAccessMono) + .expectComplete() + .verify(); + }); + } + + /** + * Dev role with specific connector-level permissions. + */ + public static Role getDevRole() { + Role role = new Role(); + role.setName(DEV_ROLE_NAME); + role.setClusters(List.of(CLUSTER_NAME)); + + Subject sub = new Subject(); + sub.setType("group"); + sub.setProvider(Provider.LDAP); + sub.setValue("dev.group"); + role.setSubjects(List.of(sub)); + + // Specific connector-level permission for "my-connector" + Permission specificConnectorPermission = new Permission(); + specificConnectorPermission.setResource(Resource.CONNECTOR.name()); + specificConnectorPermission.setActions(List.of( + ConnectorAction.VIEW.name(), + ConnectorAction.EDIT.name(), + ConnectorAction.DELETE.name() + )); + specificConnectorPermission.setValue(ConnectorAction.buildResourcePath(CONNECT_NAME, CONNECTOR_NAME)); + + List permissions = List.of(specificConnectorPermission); + role.setPermissions(permissions); + role.validate(); + return role; + } + + /** + * Admin role with wildcard permissions on all connectors. + */ + public static Role getAdminRole() { + Role role = new Role(); + role.setName(ADMIN_ROLE_NAME); + role.setClusters(List.of(CLUSTER_NAME)); + + Subject sub = new Subject(); + sub.setType("group"); + sub.setProvider(Provider.LDAP); + sub.setValue("admin.group"); + role.setSubjects(List.of(sub)); + + // Wildcard connector-level permission + Permission wildcardConnectorPermission = new Permission(); + wildcardConnectorPermission.setResource(Resource.CONNECTOR.name()); + wildcardConnectorPermission.setActions(List.of( + ConnectorAction.VIEW.name(), + ConnectorAction.EDIT.name(), + ConnectorAction.OPERATE.name(), + ConnectorAction.DELETE.name(), + ConnectorAction.RESET_OFFSETS.name() + )); + wildcardConnectorPermission.setValue(CONNECT_NAME + ConnectorAction.CONNECTOR_RESOURCE_DELIMITER + ".*"); + + // Also have connect-level permissions for backwards compatibility + Permission connectPermission = new Permission(); + connectPermission.setResource(Resource.CONNECT.name()); + connectPermission.setActions(List.of( + ConnectAction.VIEW.name(), + ConnectAction.EDIT.name(), + ConnectAction.OPERATE.name() + )); + connectPermission.setValue(CONNECT_NAME); + + List permissions = List.of(wildcardConnectorPermission, connectPermission); + role.setPermissions(permissions); + role.validate(); + return role; + } +} \ No newline at end of file diff --git a/contract-typespec/api/config.tsp b/contract-typespec/api/config.tsp index 3490f2dcc..7875b2762 100644 --- a/contract-typespec/api/config.tsp +++ b/contract-typespec/api/config.tsp @@ -294,6 +294,7 @@ enum ResourceType { CONSUMER, SCHEMA, CONNECT, + CONNECTOR, KSQL, ACL, AUDIT, diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index dd0dce0d8..94f9559ca 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4156,6 +4156,7 @@ components: - CONSUMER - SCHEMA - CONNECT + - CONNECTOR - KSQL - ACL - AUDIT diff --git a/frontend/src/components/Connect/Details/Actions/Actions.tsx b/frontend/src/components/Connect/Details/Actions/Actions.tsx index f1f2ff13d..1ed0164f4 100644 --- a/frontend/src/components/Connect/Details/Actions/Actions.tsx +++ b/frontend/src/components/Connect/Details/Actions/Actions.tsx @@ -20,7 +20,7 @@ import { } from 'lib/paths'; import { useConfirm } from 'lib/hooks/useConfirm'; import { Dropdown } from 'components/common/Dropdown'; -import { ActionDropdownItem } from 'components/common/ActionComponent'; +import { ActionDropdownItemWithFallback } from 'components/common/ActionComponent'; import ChevronDownIcon from 'components/common/Icons/ChevronDownIcon'; import * as S from './Action.styled'; @@ -75,6 +75,8 @@ const Actions: React.FC = () => { () => resetConnectorOffsetsMutation.mutateAsync() ); + const connectorPath = `${routerProps.connectName}/${routerProps.connectorName}`; + return ( { } > {connector?.status.state === ConnectorState.RUNNING && ( - Pause - + )} {connector?.status.state === ConnectorState.RUNNING && ( - Stop - + )} {(connector?.status.state === ConnectorState.PAUSED || connector?.status.state === ConnectorState.STOPPED) && ( - Resume - + )} - Restart Connector - - + Restart All Tasks - - + Restart Failed Tasks - + - Reset Offsets - - + Delete - + ); diff --git a/frontend/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx b/frontend/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx index 8055456a4..a87495cf3 100644 --- a/frontend/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx +++ b/frontend/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx @@ -4,7 +4,7 @@ import { CellContext } from '@tanstack/react-table'; import useAppParams from 'lib/hooks/useAppParams'; import { useRestartConnectorTask } from 'lib/hooks/api/kafkaConnect'; import { Dropdown } from 'components/common/Dropdown'; -import { ActionDropdownItem } from 'components/common/ActionComponent'; +import { ActionDropdownItemWithFallback } from 'components/common/ActionComponent'; import { RouterParamsClusterConnectConnector } from 'lib/paths'; const ActionsCellTasks: React.FC> = ({ row }) => { @@ -17,20 +17,29 @@ const ActionsCellTasks: React.FC> = ({ row }) => { restartMutation.mutateAsync(taskId); }; + const connectorPath = `${routerProps.connectName}/${routerProps.connectorName}`; + return ( - restartTaskHandler(id?.task)} danger confirm="Are you sure you want to restart the task?" - permission={{ - resource: ResourceType.CONNECT, - action: Action.OPERATE, - value: routerProps.connectName, - }} + permission={[ + { + resource: ResourceType.CONNECTOR, + action: Action.OPERATE, + value: connectorPath, + }, + { + resource: ResourceType.CONNECT, + action: Action.OPERATE, + value: routerProps.connectName, + }, + ]} > Restart task - + ); }; diff --git a/frontend/src/components/Connect/List/ConnectorsTable/connectorsColumns/cells/ActionsCell.tsx b/frontend/src/components/Connect/List/ConnectorsTable/connectorsColumns/cells/ActionsCell.tsx index 8529b58f9..1e90ee9cc 100644 --- a/frontend/src/components/Connect/List/ConnectorsTable/connectorsColumns/cells/ActionsCell.tsx +++ b/frontend/src/components/Connect/List/ConnectorsTable/connectorsColumns/cells/ActionsCell.tsx @@ -17,7 +17,7 @@ import { } from 'lib/hooks/api/kafkaConnect'; import { useConfirm } from 'lib/hooks/useConfirm'; import { useIsMutating } from '@tanstack/react-query'; -import { ActionDropdownItem } from 'components/common/ActionComponent'; +import ActionDropdownItemWithFallback from 'components/common/ActionComponent/ActionDropDownItem/ActionDropdownItemWithFallback'; import ClusterContext from 'components/contexts/ClusterContext'; const ActionsCell: React.FC> = ({ @@ -88,100 +88,156 @@ const ActionsCell: React.FC> = ({ {(status.state === ConnectorState.PAUSED || status.state === ConnectorState.STOPPED) && ( - Resume - + )} {status.state === ConnectorState.RUNNING && ( - Pause - + )} {status.state === ConnectorState.RUNNING && ( - Stop - + )} - Restart Connector - - + Restart All Tasks - - + Restart Failed Tasks - - + Reset Offsets - - + Delete - + ); }; diff --git a/frontend/src/components/common/ActionComponent/ActionDropDownItem/ActionDropdownItemWithFallback.tsx b/frontend/src/components/common/ActionComponent/ActionDropDownItem/ActionDropdownItemWithFallback.tsx new file mode 100644 index 000000000..e13816ff0 --- /dev/null +++ b/frontend/src/components/common/ActionComponent/ActionDropDownItem/ActionDropdownItemWithFallback.tsx @@ -0,0 +1,64 @@ +import React from 'react'; +import { usePermission } from 'lib/hooks/usePermission'; +import { DropdownItemProps } from 'components/common/Dropdown/DropdownItem'; +import { + ActionComponentProps, + getDefaultActionMessage, +} from 'components/common/ActionComponent/ActionComponent'; + +import ActionDropdownItem from './ActionDropdownItem'; + +interface Props + extends Omit, + DropdownItemProps { + permission: + | ActionComponentProps['permission'] + | ActionComponentProps['permission'][]; +} + +/** + * ActionDropdownItem that supports multiple permission checks. + * If an array of permissions is provided, it will check them in order + * and use the first one that grants access. + */ +const ActionDropdownItemWithFallback: React.FC = ({ + permission, + message = getDefaultActionMessage(), + placement = 'left', + children, + disabled, + ...props +}) => { + const permissions = Array.isArray(permission) ? permission : [permission]; + + // Check all permissions upfront to avoid conditional hook calls + const permissionResults = permissions.map((perm) => + // eslint-disable-next-line react-hooks/rules-of-hooks + usePermission(perm.resource, perm.action, perm.value) + ); + + // Find the first permission that grants access + let effectivePermission = permissions[0]; + const hasAnyPermission = permissionResults.some((result, index) => { + if (result) { + effectivePermission = permissions[index]; + return true; + } + return false; + }); + + // If no permissions granted, the ActionDropdownItem will handle hiding + return ( + + {children} + + ); +}; + +export default ActionDropdownItemWithFallback; diff --git a/frontend/src/components/common/ActionComponent/index.ts b/frontend/src/components/common/ActionComponent/index.ts index 707083b6e..442e72f6c 100644 --- a/frontend/src/components/common/ActionComponent/index.ts +++ b/frontend/src/components/common/ActionComponent/index.ts @@ -3,6 +3,7 @@ import ActionButton from './ActionButton/ActionButton'; import ActionCanButton from './ActionButton/ActionCanButton/ActionCanButton'; import ActionNavLink from './ActionNavLink/ActionNavLink'; import ActionDropdownItem from './ActionDropDownItem/ActionDropdownItem'; +import ActionDropdownItemWithFallback from './ActionDropDownItem/ActionDropdownItemWithFallback'; import ActionPermissionWrapper from './ActionPermissionWrapper/ActionPermissionWrapper'; export { @@ -11,5 +12,6 @@ export { ActionCanButton, ActionButton, ActionDropdownItem, + ActionDropdownItemWithFallback, ActionPermissionWrapper, }; diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index df563b834..17e3ec56c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -27,7 +27,7 @@ cel = '0.3.0' junit = '5.12.2' mockito = '5.20.0' okhttp3 = '4.12.0' -testcontainers = '1.20.6' +testcontainers = '2.0.2' swagger-integration-jakarta = '2.2.28' jakarta-annotation-api = '2.1.1' jackson-databind-nullable = '0.2.6' @@ -97,8 +97,8 @@ cel = { module = 'dev.cel:cel', version.ref = 'cel' } caffeine = { module = 'com.github.ben-manes.caffeine:caffeine', version = '3.2.2'} testcontainers = { module = 'org.testcontainers:testcontainers', version.ref = 'testcontainers' } -testcontainers-kafka = { module = 'org.testcontainers:kafka', version.ref = 'testcontainers' } -testcontainers-jupiter = { module = 'org.testcontainers:junit-jupiter', version.ref = 'testcontainers' } +testcontainers-kafka = { module = 'org.testcontainers:testcontainers-kafka', version.ref = 'testcontainers' } +testcontainers-jupiter = { module = 'org.testcontainers:testcontainers-junit-jupiter', version.ref = 'testcontainers' } junit-jupiter-engine = { module = 'org.junit.jupiter:junit-jupiter-engine', version.ref = 'junit' } From 340d809a1d82348ccdcda159fbe7ec91b4122079 Mon Sep 17 00:00:00 2001 From: Joshua Nathaniel Miller Date: Mon, 1 Dec 2025 05:40:18 -0600 Subject: [PATCH 2/2] fix: restore backward compatibility for connector permissions and cleanup unused code --- .../ui/controller/KafkaConnectController.java | 82 +++++++------------ 1 file changed, 28 insertions(+), 54 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 8511a1044..e252cf40a 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -28,7 +28,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; -import org.springframework.security.access.AccessDeniedException; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; @@ -41,7 +40,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC private static final Set RESTART_ACTIONS = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS); private static final String CONNECTOR_NAME = "connectorName"; - private static final String ACCESS_DENIED = "Access denied"; private final KafkaConnectService kafkaConnectService; @@ -102,19 +100,14 @@ public Mono> getConnector(String clusterName, Strin var context = AccessContext.builder() .cluster(clusterName) .connectorActions(connectorResource, ConnectorAction.VIEW) + .connectActions(connectName, ConnectAction.VIEW) .operationName("getConnector") .build(); - // Use hierarchical permission check - return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.VIEW) - .flatMap(hasAccess -> { - if (!hasAccess) { - return Mono.error(new AccessDeniedException(ACCESS_DENIED)); - } - return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok); - }) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context).then( + kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -126,19 +119,15 @@ public Mono> deleteConnector(String clusterName, String con var context = AccessContext.builder() .cluster(clusterName) .connectorActions(connectorResource, ConnectorAction.DELETE) + .connectActions(connectName, ConnectAction.DELETE) .operationName("deleteConnector") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, ConnectorAction.DELETE) - .flatMap(hasAccess -> { - if (!hasAccess) { - return Mono.error(new AccessDeniedException(ACCESS_DENIED)); - } - return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok); - }) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context).then( + kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); } @@ -200,21 +189,16 @@ public Mono> setConnectorConfig(String clusterName, var context = AccessContext.builder() .cluster(clusterName) .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.EDIT) + .connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT) .operationName("setConnectorConfig") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, - ConnectorAction.VIEW, ConnectorAction.EDIT) - .flatMap(hasAccess -> { - if (!hasAccess) { - return Mono.error(new AccessDeniedException(ACCESS_DENIED)); - } - return kafkaConnectService - .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) - .map(ResponseEntity::ok); - }) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context).then( + kafkaConnectService + .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -226,21 +210,16 @@ public Mono> updateConnectorState(String clusterName, Strin var context = AccessContext.builder() .cluster(clusterName) .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.OPERATE) + .connectActions(connectName, ConnectAction.VIEW, ConnectAction.OPERATE) .operationName("updateConnectorState") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, - ConnectorAction.VIEW, ConnectorAction.OPERATE) - .flatMap(hasAccess -> { - if (!hasAccess) { - return Mono.error(new AccessDeniedException(ACCESS_DENIED)); - } - return kafkaConnectService - .updateConnectorState(getCluster(clusterName), connectName, connectorName, action) - .map(ResponseEntity::ok); - }) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context).then( + kafkaConnectService + .updateConnectorState(getCluster(clusterName), connectName, connectorName, action) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); } @Override @@ -340,20 +319,15 @@ public Mono> resetConnectorOffsets(String clusterName, Stri var context = AccessContext.builder() .cluster(clusterName) .connectorActions(connectorResource, ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS) + .connectActions(connectName, ConnectAction.VIEW, ConnectAction.RESET_OFFSETS) .operationName("resetConnectorOffsets") .operationParams(Map.of(CONNECTOR_NAME, connectorName)) .build(); - return accessControlService.isConnectorAccessible(connectName, connectorName, clusterName, - ConnectorAction.VIEW, ConnectorAction.RESET_OFFSETS) - .flatMap(hasAccess -> { - if (!hasAccess) { - return Mono.error(new AccessDeniedException(ACCESS_DENIED)); - } - return kafkaConnectService - .resetConnectorOffsets(getCluster(clusterName), connectName, connectorName) - .map(ResponseEntity::ok); - }) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context).then( + kafkaConnectService + .resetConnectorOffsets(getCluster(clusterName), connectName, connectorName) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); } }