Skip to content

Commit 03e0b24

Browse files
committed
🐛 Bugfix: knowledgebase creation fail when upload a large file (with over 100 chunks)
1 parent ec141cd commit 03e0b24

File tree

6 files changed

+1865
-1091
lines changed

6 files changed

+1865
-1091
lines changed

backend/data_process/tasks.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ def process(
201201
f"[{self.request.id}] PROCESS TASK: File size: {file_size_mb:.2f}MB")
202202

203203
# The unified actor call, mapping 'file' source_type to 'local' destination
204-
# Submit Ray work and do not block here
205-
logger.debug(
204+
# Submit Ray work and WAIT for processing to complete
205+
logger.info(
206206
f"[{self.request.id}] PROCESS TASK: Submitting Ray processing for source='{source}', strategy='{chunking_strategy}', destination='{source_type}'")
207207
chunks_ref = actor.process_file.remote(
208208
source,
@@ -211,10 +211,17 @@ def process(
211211
task_id=task_id,
212212
**params
213213
)
214-
# Persist chunks into Redis via Ray to decouple Celery
214+
# Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state)
215+
logger.info(
216+
f"[{self.request.id}] PROCESS TASK: Waiting for Ray processing to complete...")
217+
chunks = ray.get(chunks_ref)
218+
logger.info(
219+
f"[{self.request.id}] PROCESS TASK: Ray processing completed, got {len(chunks) if chunks else 0} chunks")
220+
221+
# Persist chunks into Redis via Ray (fire-and-forget, don't block)
215222
redis_key = f"dp:{task_id}:chunks"
216-
actor.store_chunks_in_redis.remote(redis_key, chunks_ref)
217-
logger.debug(
223+
actor.store_chunks_in_redis.remote(redis_key, chunks)
224+
logger.info(
218225
f"[{self.request.id}] PROCESS TASK: Scheduled store_chunks_in_redis for key '{redis_key}'")
219226

220227
end_time = time.time()
@@ -229,7 +236,7 @@ def process(
229236
f"[{self.request.id}] PROCESS TASK: Processing from URL: {source}")
230237

231238
# For URL source, core.py expects a non-local destination to trigger URL fetching
232-
logger.debug(
239+
logger.info(
233240
f"[{self.request.id}] PROCESS TASK: Submitting Ray processing for URL='{source}', strategy='{chunking_strategy}', destination='{source_type}'")
234241
chunks_ref = actor.process_file.remote(
235242
source,
@@ -238,11 +245,19 @@ def process(
238245
task_id=task_id,
239246
**params
240247
)
241-
# Persist chunks into Redis via Ray to decouple Celery
248+
# Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state)
249+
logger.info(
250+
f"[{self.request.id}] PROCESS TASK: Waiting for Ray processing to complete...")
251+
chunks = ray.get(chunks_ref)
252+
logger.info(
253+
f"[{self.request.id}] PROCESS TASK: Ray processing completed, got {len(chunks) if chunks else 0} chunks")
254+
255+
# Persist chunks into Redis via Ray (fire-and-forget, don't block)
242256
redis_key = f"dp:{task_id}:chunks"
243-
actor.store_chunks_in_redis.remote(redis_key, chunks_ref)
244-
logger.debug(
257+
actor.store_chunks_in_redis.remote(redis_key, chunks)
258+
logger.info(
245259
f"[{self.request.id}] PROCESS TASK: Scheduled store_chunks_in_redis for key '{redis_key}'")
260+
246261
end_time = time.time()
247262
elapsed_time = end_time - start_time
248263
logger.info(
@@ -253,24 +268,25 @@ def process(
253268
raise NotImplementedError(
254269
f"Source type '{source_type}' not yet supported")
255270

256-
# Update task state to SUCCESS with metadata (without materializing chunks here)
271+
# Update task state to SUCCESS after Ray processing completes
272+
# This transitions from STARTED (PROCESSING) to SUCCESS (WAIT_FOR_FORWARDING)
257273
self.update_state(
258274
state=states.SUCCESS,
259275
meta={
260-
'chunks_count': None,
276+
'chunks_count': len(chunks) if chunks else 0,
261277
'processing_time': elapsed_time,
262278
'source': source,
263279
'index_name': index_name,
264280
'original_filename': original_filename,
265281
'task_name': 'process',
266282
'stage': 'text_extracted',
267283
'file_size_mb': file_size_mb,
268-
'processing_speed_mb_s': file_size_mb / elapsed_time if elapsed_time > 0 else 0
284+
'processing_speed_mb_s': file_size_mb / elapsed_time if file_size_mb > 0 and elapsed_time > 0 else 0
269285
}
270286
)
271287

272288
logger.info(
273-
f"[{self.request.id}] PROCESS TASK: Submitted for Ray processing; result will be fetched by forward")
289+
f"[{self.request.id}] PROCESS TASK: Processing complete, waiting for forward task")
274290

275291
# Prepare data for the next task in the chain; pass redis_key
276292
returned_data = {
@@ -563,6 +579,9 @@ async def index_documents():
563579
"source": original_source,
564580
"original_filename": original_filename
565581
}, ensure_ascii=False))
582+
583+
logger.info(
584+
f"[{self.request.id}] FORWARD TASK: Starting ES indexing for {len(formatted_chunks)} chunks to index '{original_index_name}'...")
566585
es_result = run_async(index_documents())
567586
logger.debug(
568587
f"[{self.request.id}] FORWARD TASK: API response from main_server for source '{original_source}': {es_result}")
@@ -605,6 +624,8 @@ async def index_documents():
605624
"original_filename": original_filename
606625
}, ensure_ascii=False))
607626
end_time = time.time()
627+
logger.info(
628+
f"[{self.request.id}] FORWARD TASK: Updating task state to SUCCESS after ES indexing completion")
608629
self.update_state(
609630
state=states.SUCCESS,
610631
meta={
@@ -620,7 +641,7 @@ async def index_documents():
620641
)
621642

622643
logger.info(
623-
f"Stored {len(chunks)} chunks to index {original_index_name} in {end_time - start_time:.2f}s")
644+
f"[{self.request.id}] FORWARD TASK: Successfully stored {len(chunks)} chunks to index {original_index_name} in {end_time - start_time:.2f}s")
624645
return {
625646
'task_id': task_id,
626647
'source': original_source,

0 commit comments

Comments
 (0)