Skip to content

Commit ff9f339

Browse files
committed
perf: Optimize the export of conversation logs
1 parent 727c8bf commit ff9f339

File tree

4 files changed

+87
-40
lines changed

4 files changed

+87
-40
lines changed

apps/application/serializers/application_chat.py

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from rest_framework import serializers
2525

2626
from application.models import Chat, Application, ChatRecord
27-
from common.db.search import get_dynamics_model, native_search, native_page_search
27+
from common.db.search import get_dynamics_model, native_search, native_page_search, native_page_handler
2828
from common.exception.app_exception import AppApiException
2929
from common.utils.common import get_file_content
3030
from maxkb.conf import PROJECT_DIR
@@ -95,7 +95,8 @@ def get_query_set(self, select_ids=None):
9595
'trample_num': models.IntegerField(),
9696
'comparer': models.CharField(),
9797
'application_chat.update_time': models.DateTimeField(),
98-
'application_chat.id': models.UUIDField(), }))
98+
'application_chat.id': models.UUIDField(),
99+
'application_chat_record_temp.id': models.UUIDField()}))
99100

100101
base_query_dict = {'application_chat.application_id': self.data.get("application_id"),
101102
'application_chat.update_time__gte': start_time,
@@ -106,7 +107,6 @@ def get_query_set(self, select_ids=None):
106107
if 'username' in self.data and self.data.get('username') is not None:
107108
base_query_dict['application_chat.asker__username__icontains'] = self.data.get('username')
108109

109-
110110
if select_ids is not None and len(select_ids) > 0:
111111
base_query_dict['application_chat.id__in'] = select_ids
112112
base_condition = Q(**base_query_dict)
@@ -180,25 +180,26 @@ def to_row(row: Dict):
180180
str(row.get('create_time').astimezone(pytz.timezone(TIME_ZONE)).strftime('%Y-%m-%d %H:%M:%S')
181181
if row.get('create_time') is not None else None)]
182182

183+
@staticmethod
184+
def reset_value(value):
185+
if isinstance(value, str):
186+
value = re.sub(ILLEGAL_CHARACTERS_RE, '', value)
187+
if isinstance(value, datetime.datetime):
188+
eastern = pytz.timezone(TIME_ZONE)
189+
c = datetime.timezone(eastern._utcoffset)
190+
value = value.astimezone(c)
191+
return value
192+
183193
def export(self, data, with_valid=True):
184194
if with_valid:
185195
self.is_valid(raise_exception=True)
186196
ApplicationChatRecordExportRequest(data=data).is_valid(raise_exception=True)
187197

188-
data_list = native_search(self.get_query_set(data.get('select_ids')),
189-
select_string=get_file_content(
190-
os.path.join(PROJECT_DIR, "apps", "application", 'sql',
191-
('export_application_chat_ee.sql' if ['PE', 'EE'].__contains__(
192-
edition) else 'export_application_chat.sql'))),
193-
with_table_name=False)
194-
195-
batch_size = 500
196-
197198
def stream_response():
198-
workbook = openpyxl.Workbook()
199-
worksheet = workbook.active
200-
worksheet.title = 'Sheet1'
201-
199+
workbook = openpyxl.Workbook(write_only=True)
200+
worksheet = workbook.create_sheet(title='Sheet1')
201+
current_page = 1
202+
page_size = 500
202203
headers = [gettext('Conversation ID'), gettext('summary'), gettext('User Questions'),
203204
gettext('Problem after optimization'),
204205
gettext('answer'), gettext('User feedback'),
@@ -207,24 +208,22 @@ def stream_response():
207208
gettext('Annotation'), gettext('USER'), gettext('Consuming tokens'),
208209
gettext('Time consumed (s)'),
209210
gettext('Question Time')]
210-
for col_idx, header in enumerate(headers, 1):
211-
cell = worksheet.cell(row=1, column=col_idx)
212-
cell.value = header
213-
214-
for i in range(0, len(data_list), batch_size):
215-
batch_data = data_list[i:i + batch_size]
216-
217-
for row_idx, row in enumerate(batch_data, start=i + 2):
218-
for col_idx, value in enumerate(self.to_row(row), 1):
219-
cell = worksheet.cell(row=row_idx, column=col_idx)
220-
if isinstance(value, str):
221-
value = re.sub(ILLEGAL_CHARACTERS_RE, '', value)
222-
if isinstance(value, datetime.datetime):
223-
eastern = pytz.timezone(TIME_ZONE)
224-
c = datetime.timezone(eastern._utcoffset)
225-
value = value.astimezone(c)
226-
cell.value = value
227-
211+
worksheet.append(headers)
212+
for data_list in native_page_handler(page_size, self.get_query_set(data.get('select_ids')),
213+
primary_key='application_chat_record_temp.id',
214+
primary_queryset='default_queryset',
215+
get_primary_value=lambda item: item.get('id'),
216+
select_string=get_file_content(
217+
os.path.join(PROJECT_DIR, "apps", "application", 'sql',
218+
('export_application_chat_ee.sql' if ['PE',
219+
'EE'].__contains__(
220+
edition) else 'export_application_chat.sql'))),
221+
with_table_name=False):
222+
223+
for item in data_list:
224+
row = [self.reset_value(v) for v in self.to_row(item)]
225+
worksheet.append(row)
226+
current_page = current_page + 1
228227
output = BytesIO()
229228
workbook.save(output)
230229
output.seek(0)

apps/application/sql/export_application_chat.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
SELECT
2+
application_chat_record_temp.id AS id,
23
application_chat."id" as chat_id,
34
application_chat.abstract as abstract,
45
application_chat_record_temp.problem_text as problem_text,

apps/application/sql/export_application_chat_ee.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
SELECT
2+
application_chat_record_temp.id AS id,
23
application_chat."id" as chat_id,
34
application_chat.abstract as abstract,
45
application_chat_record_temp.problem_text as problem_text,

apps/common/db/search.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
from common.db.sql_execute import select_one, select_list, update_execute
1717
from common.result import Page
1818

19-
2019
# 添加模型缓存
2120
_model_cache = {}
21+
22+
2223
def get_dynamics_model(attr: dict, table_name='dynamics'):
2324
"""
2425
获取一个动态的django模型
@@ -29,24 +30,24 @@ def get_dynamics_model(attr: dict, table_name='dynamics'):
2930
# 创建缓存键,基于属性和表名
3031
cache_key = hashlib.md5(f"{table_name}_{str(sorted(attr.items()))}".encode()).hexdigest()
3132
# print(f'cache_key: {cache_key}')
32-
33+
3334
# 如果模型已存在,直接返回缓存的模型
3435
if cache_key in _model_cache:
3536
return _model_cache[cache_key]
36-
37+
3738
attributes = {
3839
"__module__": "knowledge.models",
3940
"Meta": type("Meta", (), {'db_table': table_name}),
4041
**attr
4142
}
42-
43+
4344
# 使用唯一的类名避免冲突
4445
class_name = f'Dynamics_{cache_key[:8]}'
4546
model_class = type(class_name, (models.Model,), attributes)
46-
47+
4748
# 缓存模型
4849
_model_cache[cache_key] = model_class
49-
50+
5051
return model_class
5152

5253

@@ -189,6 +190,51 @@ def native_page_search(current_page: int, page_size: int, queryset: QuerySet | D
189190
return Page(total.get("count"), list(map(post_records_handler, result)), current_page, page_size)
190191

191192

193+
def native_page_handler(page_size: int,
194+
queryset: QuerySet | Dict[str, QuerySet],
195+
select_string: str,
196+
field_replace_dict=None,
197+
with_table_name=False,
198+
primary_key=None,
199+
get_primary_value=None,
200+
primary_queryset: str = None,
201+
):
202+
if isinstance(queryset, Dict):
203+
exec_sql, exec_params = generate_sql_by_query_dict({**queryset,
204+
primary_queryset: queryset[primary_queryset].order_by(
205+
primary_key)}, select_string, field_replace_dict, with_table_name)
206+
else:
207+
exec_sql, exec_params = generate_sql_by_query(queryset.order_by(
208+
primary_key), select_string, field_replace_dict, with_table_name)
209+
total_sql = "SELECT \"count\"(*) FROM (%s) temp" % exec_sql
210+
total = select_one(total_sql, exec_params)
211+
processed_count = 0
212+
last_id = None
213+
while processed_count < total.get("count"):
214+
if last_id is not None:
215+
if isinstance(queryset, Dict):
216+
exec_sql, exec_params = generate_sql_by_query_dict({**queryset,
217+
primary_queryset: queryset[primary_queryset].filter(
218+
**{f"{primary_key}__gt": last_id}).order_by(
219+
primary_key)},
220+
select_string, field_replace_dict,
221+
with_table_name)
222+
else:
223+
exec_sql, exec_params = generate_sql_by_query(
224+
queryset.filter(**{f"{primary_key}__gt": last_id}).order_by(
225+
primary_key),
226+
select_string, field_replace_dict,
227+
with_table_name)
228+
limit_sql = connections[DEFAULT_DB_ALIAS].ops.limit_offset_sql(
229+
0, page_size
230+
)
231+
page_sql = exec_sql + " " + limit_sql
232+
result = select_list(page_sql, exec_params)
233+
yield result
234+
processed_count += page_size
235+
last_id = get_primary_value(result[-1])
236+
237+
192238
def get_field_replace_dict(queryset: QuerySet):
193239
"""
194240
获取需要替换的字段 默认 “xxx.xxx”需要被替换成 “xxx”."xxx"

0 commit comments

Comments
 (0)