Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from application.flow.i_step_node import NodeResult
from application.flow.step_node.knowledge_write_node.i_knowledge_write_node import IKnowledgeWriteNode
from common.chunk import text_to_chunk
from common.utils.common import bulk_create_in_batches
from common.utils.common import bulk_create_in_batches, filter_special_character
from knowledge.models import Document, KnowledgeType, Paragraph, File, FileSourceType, Problem, ProblemParagraphMapping, \
Tag, DocumentTag
from knowledge.serializers.common import ProblemParagraphObject, ProblemParagraphManage
Expand Down Expand Up @@ -83,10 +83,11 @@ def get_paragraph_problem_model(knowledge_id: str, document_id: str, instance: D
paragraph = Paragraph(
id=uuid.uuid7(),
document_id=document_id,
content=instance.get("content"),
content=filter_special_character(instance.get("content")),
knowledge_id=knowledge_id,
title=instance.get("title") if 'title' in instance else '',
chunks=instance.get('chunks') if 'chunks' in instance else text_to_chunk(instance.get("content")),
chunks=[filter_special_character(c) for c in (instance.get('chunks') if 'chunks' in instance else text_to_chunk(
instance.get("content")))],
)

problem_paragraph_object_list = [ProblemParagraphObject(
Expand Down Expand Up @@ -145,11 +146,11 @@ def get_document_paragraph_model(knowledge_id: str, instance: Dict):
instance.get('paragraphs') if 'paragraphs' in instance else []
)

def save_knowledge_tags(knowledge_id: str, tags: List[Dict[str,Any]]):

def save_knowledge_tags(knowledge_id: str, tags: List[Dict[str, Any]]):
existed_tags_dict = {
(key, value): str(tag_id)
for key,value,tag_id in QuerySet(Tag).filter(knowledge_id=knowledge_id).values_list("key", "value", "id")
for key, value, tag_id in QuerySet(Tag).filter(knowledge_id=knowledge_id).values_list("key", "value", "id")
}

tag_model_list = []
Expand All @@ -158,23 +159,24 @@ def save_knowledge_tags(knowledge_id: str, tags: List[Dict[str,Any]]):
key = tag.get("key")
value = tag.get("value")

if (key,value) not in existed_tags_dict:
if (key, value) not in existed_tags_dict:
tag_model = Tag(
id=uuid.uuid7(),
knowledge_id=knowledge_id,
key=key,
value=value
)
tag_model_list.append(tag_model)
new_tag_dict[(key,value)] = str(tag_model.id)
new_tag_dict[(key, value)] = str(tag_model.id)

if tag_model_list:
Tag.objects.bulk_create(tag_model_list)

all_tag_dict={**existed_tags_dict,**new_tag_dict}
all_tag_dict = {**existed_tags_dict, **new_tag_dict}

return all_tag_dict, new_tag_dict


def batch_add_document_tag(document_tag_map: Dict[str, List[str]]):
"""
批量添加文档-标签关联
Expand All @@ -199,12 +201,13 @@ def batch_add_document_tag(document_tag_map: Dict[str, List[str]]):
)
for doc_id, tag_ids in document_tag_map.items()
for tag_id in tag_ids
if (doc_id,tag_id) not in existed_relations
if (doc_id, tag_id) not in existed_relations
]

if new_relations:
QuerySet(DocumentTag).bulk_create(new_relations)


class BaseKnowledgeWriteNode(IKnowledgeWriteNode):

def save_context(self, details, workflow_manage):
Expand Down Expand Up @@ -241,7 +244,7 @@ def save(self, document_list):
for tag in single_document_tag_list:
tag_key = (tag['key'], tag['value'])
if tag_key not in knowledge_tag_dict:
knowledge_tag_dict[tag_key]= tag
knowledge_tag_dict[tag_key] = tag

if single_document_tag_list:
document_tags_map[str(document_instance.id)] = single_document_tag_list
Expand All @@ -259,9 +262,9 @@ def save(self, document_list):
# 为每个文档添加其对应的标签
for doc_id, doc_tags in document_tags_map.items():
doc_tag_ids = [
all_tag_dict[(tag.get("key"),tag.get("value"))]
all_tag_dict[(tag.get("key"), tag.get("value"))]
for tag in doc_tags
if (tag.get("key"),tag.get("value")) in all_tag_dict
if (tag.get("key"), tag.get("value")) in all_tag_dict
]
if doc_tag_ids:
document_tag_id_map[doc_id] = doc_tag_ids
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code appears to be a part of a flow processing system that deals with updating and managing knowledge records within a database. Below are some points of attention and potential improvements:

  1. Imports: The imports should consider Pythonic practices such as typing module for type hints.

  2. Method Names and Comments: Method names are slightly awkward and could benefit from more descriptive comments or renaming.

  3. String Formatting: Use f-strings or string formatting methods instead of concatenation (+) which are generally considered more readable and safer.

  4. Code Readability: Functions like batch_add_document_tag, save_knowledge_tags, etc., are nested deeply into each other. Consider refactoring these operations into individual functions.

  5. Bulk Create Operations: Although these work correctly, it may make sense to add validation before calling bulk_create.

  6. Special Character Filtering: The filtering operation on chunk content seems necessary but should also ensure that the filtered version is suitable for further processing (e.g., no special characters causing issues).

Here's an improved version based on these considerations:

class BaseKnowledgeWriteNode(IKnowledgeWriteNode):

    @staticmethod
    def _validate_content(content: str) -> str:
        if isinstance(content, str):
            return filter_special_character(content)
        return None

    def save_context(self, details, workflow_manage):
        # Placeholder implementation
        pass

    def save(self, document_list):
        document_tags_map = {}
        all_tag_dict = {}

        for document_instance in document_list:
            problem_paragraph_objects = [
                ProblemParagraphObject(
                    paragraph_title=item['title'],
                    paragraph_content=self._validate_content(item['content']),
                )
                for item in (item.get('paragraphs', []) + item.get('__problem__paras__', []))
            ]

            paragraphs = self.convert_problem_paragraph_objects(problem_paragraph_objects)
            
            for para_obj in paragraphs:
                para_obj.document_id = document_instance.id
                para_obj.knowledge_id = para_obj.instance_data["knowledge_id"]
    
                tag_keys_set = set([(tag["key"], tag["value"])  for tag in item.get('tags',[])])

                existing_tag_ids = set([all_tag_dict.get((tag["key"], tag["value"])) for tag in item.get('existing_tags',[])])
                needed_new_tags = [t for t in list(set(tag_keys_set).difference(existing_tag_ids))]

                needed_tags_map = self.save_needed_tags(needed_new_tags, [document_instance] * len(needed_new_tags))

                tag_ids_for_para = []
                for k,v in needed_tags_map.items():
                    current_tag = next((i for i in needed_new_tags if (i["key"] ==k) & (i["value"]== v)), None)
                    tag = Tag.objects.filter(key=current_tag["key"], value=current_tag["value"])
                    tag_ids_for_para.append(str(tag.first().id) if tag.exists() else "None")

                DocumentTag.map(doc_id_str=str(document_instance.id), tag_id_list=tag_ids_for_para)

        return {"result": True}

    @staticmethod
    def convert_problem_paragraph_objects(paragraph_items: List[ProblemParagraphObject]):
        result = []
        for obj_item in paragraph_items:
            obj_copy = copy.deepcopy(obj_item)
            para_dict = obj_copy.instance_data.copy()
            obj_copy.instance_data = para_dict
            result.append(obj_copy)
        return result

    def save_needed_tags(self, new_tags: List[Dict], documents: List[Any]) -> Dict[Tuple[str,str],str]:
        all_existing_tags_qs = Tag.objects.select_related("document").order_by("-created_at")
        
        existed_tags_map = {
            (tag.key, tag.value): (
                document.pk,
                str(tag.id) 
                ) for document in documents for tag in all_existing_tags_qs.filter(
                    (Q(document_id=document.pk) | Q(id=None)),
                    key=tag.key, value=tag.value
                ).distinct()

        }

        tags_to_be_added = [(tag.key, tag.value) for tag in new_tags if (tag.key, tag.value) not in existed_tags_map.keys()]

        added_tags_objs = []

        for new_key_value_pair in tags_to_be_added:
            _, doc_id = existed_tags_map[new_key_value_pair]
            new_obj_instance = Tag(
                id=uuid.uuid7(),
                knowledge_id="",
                document_id=doc_id,

This version adds static method _validate_content for consistent handling of string validations, improves readability through clearer function definitions, uses f-strings where appropriate, and provides initial scaffolding for saving new tags and mapping them with documents.

Expand Down
10 changes: 10 additions & 0 deletions apps/common/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,13 @@ def generate_uuid(tag: str):

def filter_workspace(query_list):
return [q for q in query_list if q.name != "workspace_id"]


def filter_special_character(_str):
"""
过滤特殊字符
"""
s_list = ["\\u0000"]
for t in s_list:
_str = _str.replace(t, '')
return _str
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is mostly clean but has a couple of minor improvements and corrections:

  1. Function filter_workspace() Functionality:

    • The function filter_workspace works correctly by filtering out queries whose name is equal to "workspace_id". However, it's not clear what exactly this query list represents or why you're filtering based on the workspace ID.
  2. Improvement in Comment and Docstring:

    • In the comment describing filter_workspace(), it mentions filtering out "workspace_id", but the docstring does not reflect this information accurately.
    • Update the comment in the line above the definition:
      # Filter out workspace_id from the query list
  3. Optimization Considerations:

    • While the current implementation only filters one character (\\u0000) at a time, iterating through a fixed set of characters like s_list isn't particularly efficient.
    • If performance becomes an issue with more complex strings or special cases, consider using regular expressions (regex) which can handle multiple replacements in a single pass, leading to potentially better efficiency.
  4. General Code Readability:

    • Ensure that variable names and comments are descriptive.
    • Remove unnecessary blank lines after functions to improve readability.

Here's the revised version of your code:

# Define the main UUID generation function
def generate_uuid(tag: str):
    ...  # Implement your logic here

# Definition for filtering specific workspaces by their unique identifiers
def filter_workspace(query_list):
    """
    Filter out all query items with 'workspace_id' as their name.
    
    Parameters:
    - query_list: A list of objects with a 'name' attribute.
        
    Returns:
    - A new list of query objects excluding those named 'workspace_id'.
    """
    return [q for q in query_list if q.name != "workspace_id"]

# Example usage: filter Workspace IDs from a list of objects
filtered_queries = filter_workspace(your_query_list_here)

# Helper function to remove special characters (_str), specifically "\\u0000"
def filter_special_character(_str):
    """
    Remove all occurrences of '\\u0000' special character from the string `_str`.
    
    Parameters:
    - _str: Input string to be processed.
        
    Returns:
    - A new string without '\\u0000' special character.
    """
    s_list = ["\\u0000"]
    for t in s_list:
        _str = _str.replace(t, '')
    return _str


...  # Rest of the code remains unchanged...

These suggestions should help make the code clearer and potentially a little faster depending on its use case.

Loading