Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/common/auth/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def authenticate(self, request):
return handle.handle(request, token, token_details.get_token_details)
raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
except Exception as e:
traceback.format_exc()
traceback.print_stack()
if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed) or isinstance(e,
AppApiException):
raise e
Expand Down
156 changes: 120 additions & 36 deletions apps/common/auth/handle/impl/user_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,116 @@
"""
import datetime
from functools import reduce
from typing import List

from django.core.cache import cache
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _

from common.auth.handle.auth_base_handle import AuthBaseHandle
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role, \
PermissionConstants
from common.constants.permission_constants import Auth, PermissionConstants, ResourcePermissionGroup, \
get_permission_list_by_resource_group, ResourceAuthType, \
ResourcePermissionRole, get_default_role_permission_mapping_list, get_default_workspace_user_role_mapping_list
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppAuthenticationFailed
from common.utils.common import group_by
from system_manage.models.workspace_user_permission import WorkspaceUserPermission
from system_manage.models.workspace_user_permission import WorkspaceUserResourcePermission
from users.models import User


def get_permission(permission_id):
"""
获取权限字符串
@param permission_id: 权限id
@return: 权限字符串
"""
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}"


def get_workspace_permission(permission_id, workspace_id):
"""
获取工作空间权限字符串
@param permission_id: 权限id
@param workspace_id: 工作空间id
@return:
"""
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}:/WORKSPACE/{workspace_id}"


def get_workspace_resource_permission_list(permission_id, workspace_id, workspace_user_permission_dict):
workspace_user_permission_list = workspace_user_permission_dict.get(workspace_id)
if workspace_user_permission_list is None:
def get_workspace_permission_list(role_permission_mapping_dict, workspace_user_role_mapping_list):
"""
获取工作空间下所有的权限
@param role_permission_mapping_dict: 角色权限关联字典
@param workspace_user_role_mapping_list: 工作空间用户角色关联列表
@return: 工作空间下的权限
"""
workspace_permission_list = [
[get_workspace_permission(role_permission_mapping.permission_id, w_u_r.workspace_id) for role_permission_mapping
in
role_permission_mapping_dict.get(w_u_r.role_id, [])] for w_u_r in workspace_user_role_mapping_list]
return reduce(lambda x, y: [*x, *y], workspace_permission_list, [])


def get_workspace_resource_permission_list(
workspace_user_resource_permission_list: List[WorkspaceUserResourcePermission],
role_permission_mapping_dict,
workspace_user_role_mapping_dict):
"""

@param workspace_user_resource_permission_list: 工作空间用户资源权限列表
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
@param workspace_user_role_mapping_dict: 工作空间用户角色映射字典 key为role_id
@return: 工作空间资源权限列表
"""
resource_permission_list = [
get_workspace_resource_permission_list_by_workspace_user_permission(workspace_user_resource_permission,
role_permission_mapping_dict,
workspace_user_role_mapping_dict) for
workspace_user_resource_permission in workspace_user_resource_permission_list]
# 将二维数组扁平为一维
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])


def get_workspace_resource_permission_list_by_workspace_user_permission(
workspace_user_resource_permission: WorkspaceUserResourcePermission,
role_permission_mapping_dict,
workspace_user_role_mapping_dict):
"""

@param workspace_user_resource_permission: 工作空间用户资源权限对象
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
@param workspace_user_role_mapping_dict: 工作空间用户角色关联字典 key为role_id
@return: 工作空间用户资源的权限列表
"""

role_permission_mapping_list = [role_permission_mapping_dict.get(workspace_user_role_mapping.role_id) for
workspace_user_role_mapping in
workspace_user_role_mapping_dict.get(
workspace_user_resource_permission.workspace_id)]
role_permission_mapping_list = reduce(lambda x, y: [*x, *y], role_permission_mapping_list, [])
# 如果是根据角色
if (workspace_user_resource_permission.auth_target_type == ResourceAuthType.ROLE
and workspace_user_resource_permission.permission_list.__contains__(
ResourcePermissionRole.ROLE)):
return [
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]
return [
f"{permission_id}:/WORKSPACE/{workspace_id}/{workspace_user_permission.auth_target_type}/{workspace_user_permission.target}"
for workspace_user_permission in
workspace_user_permission_list if workspace_user_permission.is_auth] + [
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]
f"{role_permission_mapping.permission_id}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
for role_permission_mapping in role_permission_mapping_list]

elif workspace_user_resource_permission.auth_target_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP:
resource_permission_list = [
[
f"{permission}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
for permission in get_permission_list_by_resource_group(ResourcePermissionGroup[resource_permission])]
for resource_permission in workspace_user_resource_permission.permission_list if
ResourcePermissionGroup.values.__contains__(resource_permission)]
# 将二维数组扁平为一维
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])
return []


def get_permission_list(user,
Expand All @@ -63,41 +135,53 @@ def get_permission_list(user,
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
lambda item: item.role_id)
# 获取角色权限映射数据
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list])
role_dict = group_by(role_permission_mapping_list, lambda item: item.get('role_id'))
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)

workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
workspace_user_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id__in=[workspace_user_role.workspace_id for workspace_user_role in
workspace_user_role_mapping_list])
workspace_user_permission_dict = group_by(workspace_user_permission_list,
key=lambda item: item.workspace_id)
permission_list = [
get_workspace_resource_permission_list(role_permission_mapping.permission_id,
role_dict.get(role_permission_mapping.role_id).workspace_id,
workspace_user_permission_dict)
for role_permission_mapping in
role_permission_mapping_list]

# 将二维数组扁平为一维
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])

# 资源权限
workspace_resource_permission_list = get_workspace_resource_permission_list(workspace_user_permission_list,
role_permission_mapping_dict,
workspace_user_role_mapping_dict)

workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
workspace_user_role_mapping_list)
# 系统权限
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
role_permission_mapping_list]
# 合并权限
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
cache.set(key, permission_list, version=version)
else:
workspace_id_list = ['default']
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
workspace_user_resource_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id__in=workspace_id_list)

workspace_user_permission_dict = group_by(workspace_user_permission_list,
key=lambda item: item.workspace_id)
permission_list = get_default_permission_list_by_role(RoleConstants[user.role])
permission_list = [
get_workspace_resource_permission_list(permission, 'default', workspace_user_permission_dict) for
permission
in permission_list]
# 将二维数组扁平为一维
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])
role_permission_mapping_list = get_default_role_permission_mapping_list()
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)
workspace_user_role_mapping_list = get_default_workspace_user_role_mapping_list([user.role])
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
lambda item: item.role_id)
# 资源权限
workspace_resource_permission_list = get_workspace_resource_permission_list(
workspace_user_resource_permission_list,
role_permission_mapping_dict,
workspace_user_role_mapping_dict)

workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
workspace_user_role_mapping_list)
# 系统权限
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
role_permission_mapping_list]
# 合并权限
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
cache.set(key, permission_list, version=version)
return permission_list

Expand Down
100 changes: 95 additions & 5 deletions apps/common/constants/permission_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
@desc: 权限,角色 常量
"""
from enum import Enum
from functools import reduce
from typing import List

from django.db import models


class Group(Enum):
"""
Expand Down Expand Up @@ -45,6 +48,40 @@ class RoleGroup(Enum):
CHAT_USER = "CHAT_USER"


class ResourcePermissionRole(models.TextChoices):
"""
资源权限根据角色
"""
ROLE = "ROLE"

def __eq__(self, other):
return str(self) == str(other)


class ResourcePermissionGroup(models.TextChoices):
"""
资源权限组
"""
# 查看
VIEW = "VIEW"
# 管理
MANAGE = "MANAGE"

def __eq__(self, other):
return str(self) == str(other)


class ResourceAuthType(models.TextChoices):
"""
资源授权类型
"""
"当授权类型是Role时候"
ROLE = "ROLE"

"""资源权限组"""
RESOURCE_PERMISSION_GROUP = "RESOURCE_PERMISSION_GROUP"


class Role:
def __init__(self, name: str, decs: str, group: RoleGroup, resource_path=None):
self.name = name
Expand Down Expand Up @@ -78,14 +115,19 @@ class Permission:
权限信息
"""

def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None,
resource_permission_group_list=None):
if role_list is None:
role_list = []
if resource_permission_group_list is None:
resource_permission_group_list = []
self.group = group
self.operate = operate
self.resource_path = resource_path
# 用于获取角色与权限的关系,只适用于没有权限管理的
self.role_list = role_list
# 用于资源权限权限分组
self.resource_permission_group_list = resource_permission_group_list

@staticmethod
def new_instance(permission_str: str):
Expand Down Expand Up @@ -151,13 +193,28 @@ class PermissionConstants(Enum):
KNOWLEDGE_MODULE_CREATE = Permission(group=Group.KNOWLEDGE, operate=Operate.CREATE, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
KNOWLEDGE_MODULE_READ = Permission(group=Group.KNOWLEDGE, operate=Operate.READ, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
RoleConstants.USER],
resource_permission_group_list=[
ResourcePermissionGroup.VIEW
])
KNOWLEDGE_MODULE_EDIT = Permission(group=Group.KNOWLEDGE, operate=Operate.EDIT, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
RoleConstants.USER],
resource_permission_group_list=[
ResourcePermissionGroup.MANAGE
]
)
KNOWLEDGE_MODULE_DELETE = Permission(group=Group.KNOWLEDGE, operate=Operate.DELETE, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
RoleConstants.USER],
resource_permission_group_list=[
ResourcePermissionGroup.MANAGE
]
)
KNOWLEDGE_READ = Permission(group=Group.KNOWLEDGE, operate=Operate.READ, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
RoleConstants.USER],
resource_permission_group_list=[
ResourcePermissionGroup.VIEW
]
)
KNOWLEDGE_CREATE = Permission(group=Group.KNOWLEDGE, operate=Operate.CREATE, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])

Expand Down Expand Up @@ -194,6 +251,39 @@ def get_default_permission_list_by_role(role: RoleConstants):
PermissionConstants.__members__))))


class RolePermissionMapping:
def __init__(self, role_id, permission_id):
self.role_id = role_id
self.permission_id = permission_id


class WorkspaceUserRoleMapping:
def __init__(self, workspace_id, role_id, user_id):
self.workspace_id = workspace_id
self.role_id = role_id
self.user_id = user_id


def get_default_role_permission_mapping_list():
role_permission_mapping_list = [
[RolePermissionMapping(role.value.name, PermissionConstants[k].value.__str__()) for role in
PermissionConstants[k].value.role_list] for k in PermissionConstants.__members__]
return reduce(lambda x, y: [*x, *y], role_permission_mapping_list, [])


def get_default_workspace_user_role_mapping_list(user_role_list: list):
return [WorkspaceUserRoleMapping('default', role.value.name, 'default') for role in RoleConstants if
user_role_list.__contains__(role.value.name)]


def get_permission_list_by_resource_group(resource_group: ResourcePermissionGroup):
"""
根据资源组获取权限
"""
return [PermissionConstants[k] for k in PermissionConstants.__members__ if
PermissionConstants[k].value.resource_permission_group_list.__contains__(resource_group)]


class Auth:
"""
用于存储当前用户的角色和权限
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code snippet you provided looks generally well-structured and follows Django conventions. However, there are a few areas where improvements or corrections can be made:

Improvements and Corrected Points

  1. Comments: The docstrings and comments should be more detailed to help future maintainers understand the purpose of each function and its parameters.

  2. Useful Functionality: There might be use cases for some functions such as get_permission_list_by_resource_group, which could be more helpful in managing permissions based on different resource groups.

  3. Django Models: Using Django models for roles and groups would allow for better integration with existing Django features.

  4. Code Organization: Consider organizing related functionality into separate files if they become too complex, especially when dealing with multiple constants and classes.

  5. Error Handling: Add error handling where appropriate to manage potential edge cases during runtime.

Here's an updated version with these considerations:

# Import necessary modules
from enum import Enum, TextChoices
from functools import reduce
from typing import List, Dict

import django.db.models as models

# Define Enums
class Group(Enum):
    ADMIN = "ADMIN"
    USER = "USER"

class RoleGroup(TextChoices):
    GENERAL = "GENERAL"
    STUDENT = "STUDENT"
    TEACHER = "TEACHER"

class ResourcePermissionRole(TextChoices):
    ROLE = "ROLE"

class ResourcePermissionGroup(TextChoices):
    VIEW = "VIEW"
    MANAGE = "MANAGE"

class ResourceType(TextChoices):
    API = "API"
    FILE = "FILE"
    DOCUMENT = "DOCUMENT"

class Operate(Enum):
    LIST = "LIST"
    CREATE = "CREATE"
    READ = "READ"
    UPDATE = "UPDATE"
    DELETE = "DELETE"

# Define Constants and Classes
class Permission(models.Model):  # Move this to models.py or another file
    """Model representation for Permissions."""
    GRP_CHOICES = [(group.value, group.name) for group in Group]
    OPERATE_CHOICES = [(operate.value, operate.name) for operate in Operate]

    group_name = models.CharField(max_length=50, choices=GRP_CHOICES)
    operation = models.CharField(max_length=50, choices=OPERATE_CHOICES)

    def __eq__(self, other):
        return f"{self.group_name}:{self.operation}" == str(other)


class User(RoleMixin):
    username = models.CharField(max_length=100, unique=True)
    groups = models.ManyToManyField(Group, related_name='users')

class Role(models.Model):  # Move this to models.py or another file
    name = models.CharField(max_length=100)
    description = models.TextField()
    group_type = models.ForeignKey(RoleGroup, null=True, blank=True, on_delete=models.CASCADE)

    roles_permissions = models.ManyToManyField(Permission, through="UserRole")

def get_default_permission_list_by_role(role: Role):
    """Retrieve default permissions for a given role."""
    return [permission for permission in Permission.objects.filter(role__name=role.name)]


def get_default_role_permission_mapping_list():
    """Generate default role-permission mappings."""
    return UserRole.objects.all()


def get_permission_groups_for_user(user: User):
    """Get all available permission groups for a user."""
    roles = [group.name for group in user.groups.all()]
    permission_mappings = get_default_role_permission_mapping_list().select_related("permissions")
    
    permissions_dict: Dict[ResourceType, set[str]] = defaultdict(set)
    for mapping in permission_mappings:
        perm = mapping.permissions.first()
        if perm:
            resources_with_perms = perm.resource_path.split(',')
            for res in resources_with_perms:
                # Assuming resource type is part of path (e.g., /api/user/resource/VIEW)
                resource_type = res.rsplit('/', 1)[0]
                permissions_dict[resource_type].add(res.rsplit('/', 1)[-1])

    resource_groups = {}
    allowed_operations_per_res_type = {
        ResourceType.API: {"list", "create"},
        ResourceType.FILE: {"read", "update", "delete"},
        ResourceType.DOCUMENT: {"view", "edit"}
    }

    for key, vals in permissions_dict.items():
        grouped_perms = {op: [] for op in allowed_operations_per_res_type[key]}
        
        for val in vals:
            grouped_ops_key = next((key for key, vlist inallowed_operations_per_res_type[key].items() if val.lower() in vlist), None)
            
            if grouped_ops_key:
                grouped_perms[grouped_ops_key].append(val)
        
        resource_groups.setdefault(key, []).extend(grouped_perms.values())

    return resource_groups


def map_roles_to_workspace_users(workspace_id, roles):
    """
    Map a list of roles to a specific workspace.
    :param workspace_id: ID of the workspace
    :param roles: List of roleName string values associated with roles
    :return: List of WorkspaceUserRoleMappings representing mapped users
    """
    workspace_users_list = []
    current_active_group_set = set(get_all_active_workspace_groups(workspace_id))  # Implement helper function here
    
    for role_name in roles:
        current_role_details = None
        
        for r in get_available_roles_in_workspace(workspace_id):
            if r.name.lower() == role_name.lower(): 
                current_role_details = r
                break  # Exit upon first match
            
        if current_role_details:
            active_users = list(current_active_group_set.intersection({u['username'] for u in retrieve_active_users()}))
            workspace_users_list.extend([WorkspaceUserRoleMapping(workspace.id,
                                                               str(r.name),
                                                               user) for user in active_users])
                
    return workspace_users_list


def retrieve_active_users():
    """
    Retrieve all currently active user accounts.
    Note that actual implementation may vary depending on your authentication system.
    :return: A list of dicts containing usernames and email addresses
    """
    return [
        {'username': 'john_doe', 'email': '<EMAIL>'},
        {'username': 'jane_smith', 'email': '<EMAIL>'}
    ]

def get_available_roles_in_workspace(workspace_id):
    """List all roles available within specific workspace."""
    from .models import Role  # Ensure proper relative imports
    return Role.objects.filter(workspaces=workspace_id)

def get_all_active_workspace_groups(workspace_id):
    """Return a collection of all group names considered active under specified workspace_id."""
    active_workspaces = [
        {'id': 1, 'groups': ['students']},
        {'id': 2, 'groups': ['teachers']}
    ]
    # Assume retrieving active groups is context-specific logic
    return [active['groups'] for active in active_workspaces if active.get('id') == workspace_id][0]

This revised version includes moving the Permission model to Django models, improving documentation readability, adding functionalities like mapping roles to a workspace, and ensuring proper organization and separation of concerns.

Expand Down
Loading
Loading