-
Notifications
You must be signed in to change notification settings - Fork 2.6k
perf: Optimize the export of conversation logs #4230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,9 +16,10 @@ | |
| from common.db.sql_execute import select_one, select_list, update_execute | ||
| from common.result import Page | ||
|
|
||
|
|
||
| # 添加模型缓存 | ||
| _model_cache = {} | ||
|
|
||
|
|
||
| def get_dynamics_model(attr: dict, table_name='dynamics'): | ||
| """ | ||
| 获取一个动态的django模型 | ||
|
|
@@ -29,24 +30,24 @@ def get_dynamics_model(attr: dict, table_name='dynamics'): | |
| # 创建缓存键,基于属性和表名 | ||
| cache_key = hashlib.md5(f"{table_name}_{str(sorted(attr.items()))}".encode()).hexdigest() | ||
| # print(f'cache_key: {cache_key}') | ||
|
|
||
| # 如果模型已存在,直接返回缓存的模型 | ||
| if cache_key in _model_cache: | ||
| return _model_cache[cache_key] | ||
|
|
||
| attributes = { | ||
| "__module__": "knowledge.models", | ||
| "Meta": type("Meta", (), {'db_table': table_name}), | ||
| **attr | ||
| } | ||
|
|
||
| # 使用唯一的类名避免冲突 | ||
| class_name = f'Dynamics_{cache_key[:8]}' | ||
| model_class = type(class_name, (models.Model,), attributes) | ||
|
|
||
| # 缓存模型 | ||
| _model_cache[cache_key] = model_class | ||
|
|
||
| return model_class | ||
|
|
||
|
|
||
|
|
@@ -189,6 +190,51 @@ def native_page_search(current_page: int, page_size: int, queryset: QuerySet | D | |
| return Page(total.get("count"), list(map(post_records_handler, result)), current_page, page_size) | ||
|
|
||
|
|
||
| def native_page_handler(page_size: int, | ||
| queryset: QuerySet | Dict[str, QuerySet], | ||
| select_string: str, | ||
| field_replace_dict=None, | ||
| with_table_name=False, | ||
| primary_key=None, | ||
| get_primary_value=None, | ||
| primary_queryset: str = None, | ||
| ): | ||
| if isinstance(queryset, Dict): | ||
| exec_sql, exec_params = generate_sql_by_query_dict({**queryset, | ||
| primary_queryset: queryset[primary_queryset].order_by( | ||
| primary_key)}, select_string, field_replace_dict, with_table_name) | ||
| else: | ||
| exec_sql, exec_params = generate_sql_by_query(queryset.order_by( | ||
| primary_key), select_string, field_replace_dict, with_table_name) | ||
| total_sql = "SELECT \"count\"(*) FROM (%s) temp" % exec_sql | ||
| total = select_one(total_sql, exec_params) | ||
| processed_count = 0 | ||
| last_id = None | ||
| while processed_count < total.get("count"): | ||
| if last_id is not None: | ||
| if isinstance(queryset, Dict): | ||
| exec_sql, exec_params = generate_sql_by_query_dict({**queryset, | ||
| primary_queryset: queryset[primary_queryset].filter( | ||
| **{f"{primary_key}__gt": last_id}).order_by( | ||
| primary_key)}, | ||
| select_string, field_replace_dict, | ||
| with_table_name) | ||
| else: | ||
| exec_sql, exec_params = generate_sql_by_query( | ||
| queryset.filter(**{f"{primary_key}__gt": last_id}).order_by( | ||
| primary_key), | ||
| select_string, field_replace_dict, | ||
| with_table_name) | ||
| limit_sql = connections[DEFAULT_DB_ALIAS].ops.limit_offset_sql( | ||
| 0, page_size | ||
| ) | ||
| page_sql = exec_sql + " " + limit_sql | ||
| result = select_list(page_sql, exec_params) | ||
| yield result | ||
| processed_count += page_size | ||
| last_id = get_primary_value(result[-1]) | ||
|
|
||
|
|
||
| def get_field_replace_dict(queryset: QuerySet): | ||
| """ | ||
| 获取需要替换的字段 默认 “xxx.xxx”需要被替换成 “xxx”."xxx" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The provided code looks mostly correct, but there are a few improvements that can be made: Improvements
Here's a revised version with some of these optimizations suggested: from typing import *
import hashlib
import inspect
from django.db import connection, DEFAULT_DB_ALIAS
from common.db.sql_execute import select_one, slect_list, update_execute
from .result import Page
# 添加模型缓存
_model_cache = {}
def get_dynamics_model(attr: dict, table_name='dynamics'):
"""
获取一个动态的Django模型
"""
# 创建缓存键,基于属性和表名
cache_key = hashlib.md5(f"{table_name}_{str(sorted(attr.items()))}".encode()).hexdigest()
# 如果模型已存在,直接返回缓存的模型
if cache_key in _model_cache:
return _model_cache[cache_key]
attributes = {
"__module__": "knowledge.models",
"Meta": type("Meta", (), {'db_table': table_name}),
**attr
}
# 使用唯一的类名避免冲突
class_name = f"Dynamics_{cache_key[:8]}"
model_class = type(class_name, (models.Model,), attributes)
# 缓存模型
_model_cache[cache_key] = model_class
return model_class
def native_page_search(current_page: int, page_size: int, queryset: QuerySet | Dict[str, QuerySet]):
"""
分页搜索并返回Page对象
"""
total_results = count_objects(queryset)
paginated_queryset = paginate_queryset(queryset, current_page, page_size)
return Page(total_results, list(paginated_queryset), current_page, page_size)
def native_page_handler(page_size: int,
queryset: QuerySet | Dict[str, QuerySet],
select_string: str,
field_replace_dict=None,
with_table_name=False,
primary_key='id',
get_primary_value=lambda x: getattr(x, 'pk', None),
primary_queryset: str = None,
):
"""
自定义分页处理函数
"""
if isinstance(queryset, Dict):
if primary_queryset not in queryset:
raise ValueError('Missing primary_queryset')
querysets_to_process = [queryset[primary_queryset]]
else:
querysets_to_process = [queryset]
for qs in querysets_to_process:
last_processed_id = None
processed_count = 0
while processed_count < qs.count():
if last_processed_id is not None:
qs_with_condition = qs.filter(**{f'{primary_key}__gt': last_processed_id})
else:
qs_with_condition = qs
sql, params = convert_queryset_to_sql(qs_with_condition, select_string, field_replace_dict, with_table_name)
total_count_result = execute_query(total_count_sql, params)
total_count = total_count_result['data'][0]['count']
while processed_count < total_count:
page_sql = wrap_in_pagination(sql, params, page_size, last_processed_id=last_processed_id)
results = execute_query(page_sql, params)
for row in results:
yield row
processed_count += len(results)
last_processed_id = get_primary_value(row)
def native_page_search_paginated(current_page: int, page_size: int, queryset: Union[QuerySet, Dict], handler: Callable[[int, List, Optional[int]], Iterator[Any]]):
paginator = Paginator(queryset, per_page=page_size)
try:
response_data = paginator.page(current_page).object_list
meta = {'total_pages': paginator.num_pages, 'current_page_number': current_page}
result_rows = []
for row in handler(per_page, response_data, current_page):
result_rows.append(row)
final_content = [convert_row_to_response(row) for row in result_rows]
status_code = 200
except Exception as e:
logging.exception(e)
result_rows = [{'error': str(e)}]
meta = {'total_pages': 0, 'current_page_number': 0}
status_code = 500
response_payload = {"meta": meta, "content": final_content }
return Response(data=response_payload)
def get_field_replace_dict(queryset: QuerySet):
"""
获取需要替换的字段 默认 “xxx.xxx”需要被替换成 “xxx”
"""
def extract_last_value(item: Tuple[List[Tuple[str, Any]]]) -> tuple:
...This revision includes placeholder implementations of helper functions ( |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this review of the code update:
The main changes were focused on optimizations and improvements such as replacing
native_page_searchwithnative_page_handler, which allows for pagination more smoothly. The function also now handles resetting values when converting strings to avoid potential illegal characters and normalizing time formats.To make it even better and cleaner:
get_file_content.Overall, these updates should enhance maintainability and reduce potential bugs while improving performance.