Skip to content

Commit 59f8319

Browse files
committed
feat(DataSynthesis): refactor data synthesis models and update task handling logic
1 parent b58d561 commit 59f8319

File tree

1 file changed

+13
-11
lines changed

1 file changed

+13
-11
lines changed

runtime/datamate-python/app/module/generation/service/generation_service.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class GenerationService:
2929
def __init__(self, db: AsyncSession):
3030
self.db = db
3131
# 全局并发信号量:保证任意时刻最多 10 次模型调用
32-
self.question_semaphore = asyncio.Semaphore(10)
33-
self.answer_semaphore = asyncio.Semaphore(10)
32+
self.question_semaphore = asyncio.Semaphore(100)
33+
self.answer_semaphore = asyncio.Semaphore(100)
3434

3535
async def process_task(self, task_id: str):
3636
"""处理数据合成任务入口:根据任务ID加载任务并逐个处理源文件。"""
@@ -147,9 +147,8 @@ async def _process_single_file(
147147
answer_chat = get_chat_client(answer_model)
148148

149149
# 分批次从 DB 读取并处理 chunk
150-
batch_size = 16
150+
batch_size = 100
151151
current_index = 1
152-
processed_chunks = 0
153152

154153
while current_index <= total_chunks:
155154
end_index = min(current_index + batch_size - 1, total_chunks)
@@ -234,7 +233,14 @@ async def _process_single_chunk_qa(
234233
answer_chat=answer_chat,
235234
)
236235

237-
# todo:每次处理完一个chunk,更新已经处理的chunk数量,要避免并发写冲突
236+
# 每次处理完一个chunk,若至少生成一条QA,则安全更新已处理的chunk数量,避免并发冲突
237+
if success_any:
238+
try:
239+
await self._increment_processed_chunks(file_task.id, 1)
240+
except Exception as e:
241+
logger.exception(
242+
f"Failed to increment processed_chunks for file_task={file_task.id}, chunk_index={chunk_index}: {e}"
243+
)
238244

239245
return success_any
240246

@@ -246,6 +252,8 @@ async def _generate_questions_for_one_chunk(
246252
) -> list[str]:
247253
"""针对单个 chunk 文本,调用 question_chat 生成问题列表。"""
248254
number = question_cfg.number or 5
255+
number = number if number is not None else 5
256+
number = int(len(chunk_text) / 1000 * number)
249257
template = getattr(question_cfg, "prompt_template", QUESTION_GENERATOR_PROMPT)
250258
template = template if (template is not None and template.strip() != "") else QUESTION_GENERATOR_PROMPT
251259

@@ -507,12 +515,6 @@ async def _load_chunk_batch(
507515
return list(result.scalars().all())
508516

509517
async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> None:
510-
"""安全地增加文件级 processed_chunks 计数。
511-
512-
本方法在单协程上下文中被顺序调用(每个文件一个逻辑写入者),
513-
避免了并发写冲突;同时采用读取 + 增加 + 提交的方式保证最终一致性。
514-
"""
515-
# 重新加载最新的 file_task 记录,避免使用过期实例
516518
result = await self.db.execute(
517519
select(DataSynthesisFileInstance).where(
518520
DataSynthesisFileInstance.id == file_task_id,

0 commit comments

Comments
 (0)