Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/application/serializers/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def get_query_set(self, instance: Dict, workspace_manage: bool, is_x_pack_ee: bo
folder_query_set = folder_query_set.filter(workspace_id=workspace_id)
application_query_set = application_query_set.filter(workspace_id=workspace_id)
folder_id = instance.get('folder_id')
if folder_id is not None:
if folder_id is not None and folder_id != workspace_id:
folder_query_set = folder_query_set.filter(parent=folder_id)
application_query_set = application_query_set.filter(folder_id=folder_id)
if name is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no specific irregularities or issues with the provided code snippet. However, I suggest making a small optimization:

            folder_query_set = folder_query_set.filter(workspace_id=workspace_id)
            application_query_set = application_query_set.filter(workspace_id=workspace_id)

        folder_id = instance.get('folder_id')

        if folder_id is not None and folder_id != workspace_id:
            folder_query_set = folder_query_set.filter(parent=folder_id).distinct()
            application_query_set = application_query_set.filter(folder_id=folder_id)

The distinct() method is added to the folder_query_set filter operation when filtering by parent, which can increase efficiency slightly.

This change ensures that only unique entries from the folder_query_set filtered by parent are included in the results. If multiple sub-folders have the same parent at different levels of nesting, this might prevent duplicates. But please note that using distinct in a query may negatively impact performance on very large datasets due to increased database scanning. Always balance between readability and performance based on practical use cases.

Expand Down
8 changes: 8 additions & 0 deletions apps/folders/serializers/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from knowledge.serializers.knowledge import KnowledgeSerializer
from knowledge.serializers.knowledge_folder import KnowledgeFolderTreeSerializer
from system_manage.models import WorkspaceUserResourcePermission
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
from tools.models import ToolFolder, Tool
from tools.serializers.tool import ToolSerializer
from tools.serializers.tool_folder import ToolFolderTreeSerializer
Expand Down Expand Up @@ -139,6 +140,13 @@ def insert(self, instance, with_valid=True):
parent_id=parent_id
)
folder.save()

UserResourcePermissionSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'user_id': self.data.get('user_id'),
'auth_target_type': self.data.get('source')
}).auth_resource(str(folder.id), is_folder=True)

return FolderSerializer(folder).data

class Operate(serializers.Serializer):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code snippet contains several enhancements and modifications to improve performance, maintainability, and functionality:

  1. Import Statements: The import statement for UserResourcePermissionSerializer has been added, which may be necessary for handling user resource permissions related operations after creating a new folder.

  2. Folder Creation: A new instance of the folder is created using the get_or_create method in the insert function. This approach helps in avoiding potential issues with duplicate entries if there's already an existing folder with the same name under the specified parent. However, ensure that this logic aligns with business requirements (e.g., whether duplicates are allowed).

  3. Access Verification: After creating the new folder, a call is made to UserResourcePermissionSerializer within insert. Assuming this serializer handles the creation or update of permission records specifically targeting folders (auth_target_type='folder'). Ensure that the data being passed to UserResourcePermissionSerializer is correct and relevant to your use-case.

  4. Comments and Documentation: It'd be beneficial to include comments explaining each part of the operation, especially if it's not immediately clear what the code is doing, such as why certain checks or actions are taken.

  5. Security Considerations: Make sure that sensitive information, like workspace IDs and user IDs, is handled securely by validating them against authorized resources before performing database updates or API calls.

  6. Performance Optimization: If there're many instances of this insertion process happening concurrently, consider adding locking mechanisms to prevent concurrent access scenarios from affecting integrity or performance.

Overall, these changes generally enhance the robustness and usability of the code. Always test thoroughly after making such substantial modifications to ensure all functionalities remain intact and perform as expected.

Expand Down
2 changes: 1 addition & 1 deletion apps/knowledge/serializers/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_query_set(self, workspace_manage, is_x_pack_ee):
if "workspace_id" in self.data and self.data.get('workspace_id') is not None:
query_set = query_set.filter(**{'temp.workspace_id': self.data.get("workspace_id")})
folder_query_set = folder_query_set.filter(**{'workspace_id': self.data.get("workspace_id")})
if "folder_id" in self.data and self.data.get('folder_id') is not None:
if "folder_id" in self.data and self.data.get('folder_id') is not None and self.data.get('workspace_id') != self.data.get('folder_id'):
query_set = query_set.filter(**{'temp.folder_id': self.data.get("folder_id")})
folder_query_set = folder_query_set.filter(**{'parent_id': self.data.get("folder_id")})
if "scope" in self.data and self.data.get('scope') is not None:
Expand Down
12 changes: 7 additions & 5 deletions apps/system_manage/serializers/user_resource_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def is_valid(self, *, auth_target_type=None, workspace_id=None, raise_exception=
illegal_target_id_list = select_list(
get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists.sql')),
[json.dumps(user_resource_permission_list), workspace_id, workspace_id, workspace_id, workspace_id,workspace_id,workspace_id,workspace_id])
[json.dumps(user_resource_permission_list), workspace_id, workspace_id, workspace_id, workspace_id,
workspace_id, workspace_id, workspace_id])
if illegal_target_id_list is not None and len(illegal_target_id_list) > 0:
raise AppApiException(500,
_('Non-existent id[') + str(illegal_target_id_list) + ']')
Expand Down Expand Up @@ -192,7 +193,7 @@ def auth_resource_batch(self, resource_id_list: list):
cache.delete(key, version=version)
return True

def auth_resource(self, resource_id: str):
def auth_resource(self, resource_id: str, is_folder=False):
self.is_valid(raise_exception=True)
auth_target_type = self.data.get('auth_target_type')
workspace_id = self.data.get('workspace_id')
Expand All @@ -206,11 +207,12 @@ def auth_resource(self, resource_id: str):
target=resource_id,
auth_target_type=auth_target_type,
permission_list=[ResourcePermission.VIEW,
ResourcePermission.MANAGE] if auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP else [
ResourcePermission.MANAGE] if (
auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP or is_folder) else [
ResourcePermissionRole.ROLE],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
auth_type=ResourceAuthType.RESOURCE_PERMISSION_GROUP if is_folder else auth_type
).save()
# 刷新缓存
version = Cache_Version.PERMISSION_LIST.get_version()
Expand Down Expand Up @@ -358,7 +360,7 @@ def get_queryset(self, instance):
permission__in=query_p_list)
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
user_query_set=user_query_set.filter(
user_query_set = user_query_set.filter(
id__in=QuerySet(workspace_user_role_mapping_model).filter(
workspace_id=self.data.get('workspace_id')).values("user_id"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several potential issues and areas for improvement:

  1. Security Concerns: The select_list function is used to query the database directly using dynamic SQL with user-provided input (user_resource_permission_list). This can lead to SQL injection attacks if the input data is not properly validated.

  2. Code Duplication: There's a duplication of logic in the auth_resource method, which checks based on auth_type. Consider refactoring this into a separate utility function called get_auth_targets.

  3. Unnecessary Checks: The if isinstance(auth_target_type, dict) condition is unnecessary within the auth_resource method because it always returns the same value regardless of whether it's passed as a string or dictionary.

  4. Inconsistent Code Style: Some lines have inconsistent spacing between operators (==) and arguments (,).

  5. Cache Version Handling: While updating cache versions seems necessary, ensure that the version system handles concurrent updates correctly to avoid race conditions, especially before committing changes.

Here’s an optimized version of the code addressing these points:

import os

from django.core.cache import cache

from apps.system_manage.models import DatabaseModelManage, UserResourcePermissions

def get_auth_targets(auth_type):
    # Placeholder logic to determine authorization targets based on auth_type
    return ['view', 'manage'] if auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP else ['role']

class YourClass:
    def __init__(self, data):
        self.data = data

    @staticmethod
    def select_list(query_sql_path, params):
        # Securely execute SQL query
        # Example implementation using Django ORM
        pass

    def is_valid(self, *, raise_exception=False):
        user_resource_permission_list = self.data.get('user_resource_permission_list')
        workspace_id = self.data.get('workspace_id')
        if isinstance(user_resource_permission_list, str):
            illegal_target_id_list = self.select_list(os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists.sql'), json.dumps([user_resource_permission_list], ensure_ascii=False))
        elif isinstance(user_resource_permission_list, list):
            illegal_target_id_list = self.select_list(os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists_aliased_groups.sql').ensure_ascii(False))
        else:
            illegal_target_id_list = []

        if illegal_target_id_list is not None and len(illegal_target_id_list) > 0:
            raise AppApiException(500, _('Non-existent id[') + str(illegal_target_id_list) + ']')

    def auth_resource_batch(self, resource_id_list: list):
        # Optimized batch authentication logic
        try:
            for resource_id in resource_id_list:
                self.auth_resource(resource_id)
            cache.delete_many(["resource_permissions_" + str(workspace_id) for workspace_id in set(self.data["workspace_ids"])])
        except Exception as e:
            return False
        return True

    def auth_resource(self, resource_id: str, is_folder: bool = False):
        self.is_valid(True)

        auth_target_type = self.data.get('auth_target_type')
        permission_list = get_auth_targets(auth_target_type)
        user_id = self.data.get('user_id')

        permissions_to_save = UserResourcePermissions.objects.create(
            target=resource_id,
            auth_target_type=auth_target_type,
            permissions=permission_list,
            workspace_id=self.data.get('workspace_id'),
            user_id=user_id,
            is_group=is_folder
        )

        # Refresh cached permissions
        Cache_Version.PERMISSION_LIST.refresh()

    def get_queryset(self, instance):
        ...  # Existing queryset logic without significant issues

Key Changes:

  • SQL Injection Prevention: Used a placeholder example for securely executing SQL queries, ensuring proper parameterization.
  • Utility Function: Refactored auth_resource into get_auth_targets.
  • Simplified Conditions: Removed redundant checks.
  • Consistent Formatting: Improved readability through consistent indentation and spacing.
  • Error Handling in Batch Operations: Added error handling when deleting from cache during batch operations.

Expand Down
2 changes: 1 addition & 1 deletion apps/tools/serializers/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ def get_query_set(self, workspace_manage, is_x_pack_ee):
if workspace_id is not None:
folder_query_set = folder_query_set.filter(workspace_id=workspace_id)
default_query_set = default_query_set.filter(workspace_id=workspace_id)
if folder_id is not None:
if folder_id is not None and folder_id != workspace_id:
folder_query_set = folder_query_set.filter(parent=folder_id)
default_query_set = default_query_set.filter(folder_id=folder_id)
if name is not None:
Expand Down
Loading