Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions apps/folders/serializers/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import uuid_utils.compat as uuid
from django.db import transaction
from django.db.models import QuerySet, Q, Func, F
from django.db.models import QuerySet, Q, Func, F, TextField
from django.db.models.functions import Cast
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers

Expand Down Expand Up @@ -224,7 +225,9 @@ def delete(self):
nodes = Folder.objects.filter(id=self.data.get('id')).get_descendants(include_self=True)
for node in nodes:
# 删除相关的资源
source_ids = Source.objects.filter(folder_id=node.id).values_list('id', flat=True)
source_ids = (Source.objects.filter(folder_id=node.id)
.annotate(id_str=Cast('id', TextField()))
.values_list('id_str', flat=True))
# 检查文件夹是否存在未授权当前用户的资源
auth_list = QuerySet(WorkspaceUserResourcePermission).filter(
Q(workspace_id=self.data.get('workspace_id')) &
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code you provided appears to have several improvements that can be suggested for better readability, maintainability, and potentially performance:

Improvements:

  1. Field Annotations: You've introduced annotations like Cast from Django's database functions module. This is a good approach to ensure data types are consistent before querying.

  2. Database Function Usage: The usage of Cast('id', TextField()) is appropriate because it explicitly converts an integer column to a string type, which is useful when working with certain database operations where different data types might be required.

  3. Optimization Suggestions:

    • Batch Deletion: Consider using bulk deletion methods (Source.objects.filter(...).delete() without iterating over each object individually) if applicable, especially in production environments.
    • Transaction Management: Ensure that transaction boundaries are correctly set up around resource deletions to maintain atomicity and consistency. However, since transaction.atomic() is not applied here, it needs to be explicitly ensured within the critical sections.
  4. Code Readability:

    • Add comments explaining complex logic steps, such as why resources need to be filtered under specific workspace permissions.
    • Use meaningful variable names to improve understanding of their purpose in the context of folder and resource management.
  5. Error Handling: Adding try-except blocks around database queries or resource processing would help manage exceptions gracefully and prevent unexpected application errors.

  6. Future-Proofing: Ensure that any new features or changes in future versions of Django do not break current functionality due to compatibility issues.

These suggestions aim to enhance the quality and robustness of the code while making it more efficient and easier to understand.

Expand Down
50 changes: 38 additions & 12 deletions apps/system_manage/serializers/user_resource_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,17 @@ class ResourceUserPermissionSerializer(serializers.Serializer):
'TOOL': Tool
}

def get_queryset(self, instance):
def get_queryset(self, instance, is_x_pack_ee: bool):

user_query_set = QuerySet(model=get_dynamics_model({
'nick_name': models.CharField(),
'username': models.CharField(),
"permission": models.CharField(),
"id": models.UUIDField(),
"u.id": models.UUIDField(),
"role": models.CharField(),
"role_setting.type": models.CharField(),
"user_role_relation.workspace_id": models.CharField(),

}))
nick_name = instance.get('nick_name')
username = instance.get('username')
Expand Down Expand Up @@ -368,9 +372,14 @@ def get_queryset(self, instance):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
user_query_set = user_query_set.filter(
id__in=QuerySet(workspace_user_role_mapping_model).filter(
workspace_id=self.data.get('workspace_id')).values("user_id"))

**{"u.id__in": QuerySet(workspace_user_role_mapping_model).filter(
workspace_id=self.data.get('workspace_id')).values("user_id")})
if is_x_pack_ee:
user_query_set = user_query_set.filter(
**{'role_setting.type': "USER", 'user_role_relation.workspace_id': self.data.get('workspace_id')})
else:
user_query_set = user_query_set.filter(
**{'role': "USER"})
return {
'workspace_user_resource_permission_query_set': workspace_user_resource_permission_query_set,
'user_query_set': user_query_set
Expand All @@ -380,22 +389,38 @@ def list(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
is_x_pack_ee = self.is_x_pack_ee()
# 资源的用户授权列表
resource_user_permission_list = native_search(self.get_queryset(instance), get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'get_resource_user_permission_detail.sql')
resource_user_permission_list = native_search(self.get_queryset(instance, is_x_pack_ee), get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage",
'sql',
('get_resource_user_permission_detail_ee.sql' if is_x_pack_ee else
'get_resource_user_permission_detail.sql')
)
))
return resource_user_permission_list

@staticmethod
def is_x_pack_ee():
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
return workspace_user_role_mapping_model is not None and role_permission_mapping_model is not None

def page(self, instance, current_page: int, page_size: int, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
# 分页列表
resource_user_permission_page_list = native_page_search(current_page, page_size, self.get_queryset(instance),
is_x_pack_ee = self.is_x_pack_ee()
resource_user_permission_page_list = native_page_search(current_page, page_size,
self.get_queryset(instance, is_x_pack_ee),
get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage",
'sql',
'get_resource_user_permission_detail.sql')
(
'get_resource_user_permission_detail_ee.sql' if is_x_pack_ee else
'get_resource_user_permission_detail.sql')
)
))
return resource_user_permission_page_list

Expand All @@ -407,9 +432,10 @@ def get_has_manage_permission_resource_under_folders(self, current_user_id, fold
resource_model = self.RESOURCE_MODEL_MAP[auth_target_type]

if workspace_manage:
current_user_managed_resources_ids = QuerySet(resource_model).filter(workspace_id=workspace_id, folder__in=folder_ids).annotate(
id_str=Cast('id', TextField())
).values_list("id_str", flat=True)
current_user_managed_resources_ids = QuerySet(resource_model).filter(workspace_id=workspace_id,
folder__in=folder_ids).annotate(
id_str=Cast('id', TextField())
).values_list("id_str", flat=True)
else:
current_user_managed_resources_ids = QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id=workspace_id, user_id=current_user_id, auth_target_type=auth_target_type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code contains several areas that need attention:

  1. Function Overload: The get_queryset method accepts two arguments (instance and is_x_pack_ee). This might cause confusion since you don't utilize one of these parameters inside the method.

  2. SQL Injection Potential: Using os.path.join to include file paths can lead to SQL injection attacks, especially if file paths are directly derived from inputs like workspace_id.

  3. Duplicate Code:

    • Both the list and paginate methods contain similar logic for querying permissions based on conditions (e.g., user roles).
    • There's also repeated code when filtering resources under folders.
  4. Static Method for Role Check: The is_x_pack_ee static method can be extracted into a separate function or decorator.

  5. Potential Performance Issues:

    • If multiple queries are made depending on certain condition flags (like checking for X-Pack EE), it may have performance implications.
    • Consider caching results where appropriate to reduce redundant computations.

Here’s how I recommend refactoring the code:

class ResourceUserPermissionSerializer(serializers.Serializer):
    RESOURCE_MODEL_MAP = {
        'TICKET': Ticket,
        # Add other resource types below
    }

    # ... rest of the class definition ...

    def get_has_manage_permission_resource_under_folders(self, current_user_id, folder_ids, workspace_id, workspace_manage=False):
        resource_model = self.RESOURCE_MODEL_MAP[type(self.instance)]
        
        if not isinstance(folder_ids, Iterable):
            raise serializers.ValidationError('Folder IDs must be an iterable')
        
        qs_kwargs = {}
        qs_filter_conditions = []

        if workspace_manage:
            qs_filter_conditions.append(Q(workspace_id=workspace_id))
            qs_kwargs['folder__in'] = folder_ids  # Assuming you want only items with matching folders

        user_role_mapping_qs = DatabaseModelManage.get_model("workspace_user_role_mapping").filter(
            workspace_id=workspace_id,
            user_id=current_user_id
        )

        # Constructing dynamic filters based on user roles
        for role_condition in user_role_mapping_qs.values('role'):
            role_condition_dict = {f"role_{role_condition}": "USER"}
            qs_filter_conditions.append(role_condition_dict)

        if qs_filter_conditions:
            filter_dict = {} if not qs_kwargs else {'**': qs_kwargs}
            qs_filter_dict = {'**': {**{k: v for d in qs_filter_conditions for k, v in d.items()}, **filter_dict}}
            
            result = native_search(resource_model.objects.all(), get_file_content(
                os.path.join(PROJECT_DIR, "apps", "system_manage",
                             'sql',
                             (
                                 'get_resource_user_permission_detail_ee.sql' if type(qs_filter_dict) == dict else
                                 'get_resource_user_permission_detail.sql'
                             )
                         )),
                        **qs_filter_dict)
        
        return result

Changes Made:

  1. Removed duplicate variables by using dictionary unpacking and iterating through multiple lists together.
  2. Simplified the conditional logic involving ws_user_role_mapping.
  3. Extracted role-based filtering into a loop and dynamically construct the query dictionary for better readability.
  4. Added error handling for non-iterable folder ids.

Expand Down
35 changes: 35 additions & 0 deletions apps/system_manage/sql/get_resource_user_permission_detail_ee.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
SELECT
distinct(u.id),
u.id,
u.nick_name,
u.username,
case
when
wurp."permission" is null then 'NOT_AUTH'
else wurp."permission"
end
FROM
public."user" u
LEFT JOIN (
SELECT
user_id ,
(case
when auth_type = 'ROLE'
and 'ROLE' = any( permission_list) then 'ROLE'
when auth_type = 'RESOURCE_PERMISSION_GROUP'
and 'MANAGE'= any(permission_list) then 'MANAGE'
when auth_type = 'RESOURCE_PERMISSION_GROUP'
and 'VIEW' = any( permission_list) then 'VIEW'
else null
end) as "permission"
FROM
workspace_user_resource_permission
${workspace_user_resource_permission_query_set}
) wurp
ON
u.id = wurp.user_id
left join user_role_relation user_role_relation
on user_role_relation.user_id = u.id
left join role_setting role_setting
on role_setting.id = user_role_relation.role_id
${user_query_set}
Loading