@@ -201,8 +201,8 @@ def process(
201201 f"[{ self .request .id } ] PROCESS TASK: File size: { file_size_mb :.2f} MB" )
202202
203203 # The unified actor call, mapping 'file' source_type to 'local' destination
204- # Submit Ray work and do not block here
205- logger .debug (
204+ # Submit Ray work and WAIT for processing to complete
205+ logger .info (
206206 f"[{ self .request .id } ] PROCESS TASK: Submitting Ray processing for source='{ source } ', strategy='{ chunking_strategy } ', destination='{ source_type } '" )
207207 chunks_ref = actor .process_file .remote (
208208 source ,
@@ -211,10 +211,17 @@ def process(
211211 task_id = task_id ,
212212 ** params
213213 )
214- # Persist chunks into Redis via Ray to decouple Celery
214+ # Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state)
215+ logger .info (
216+ f"[{ self .request .id } ] PROCESS TASK: Waiting for Ray processing to complete..." )
217+ chunks = ray .get (chunks_ref )
218+ logger .info (
219+ f"[{ self .request .id } ] PROCESS TASK: Ray processing completed, got { len (chunks ) if chunks else 0 } chunks" )
220+
221+ # Persist chunks into Redis via Ray (fire-and-forget, don't block)
215222 redis_key = f"dp:{ task_id } :chunks"
216- actor .store_chunks_in_redis .remote (redis_key , chunks_ref )
217- logger .debug (
223+ actor .store_chunks_in_redis .remote (redis_key , chunks )
224+ logger .info (
218225 f"[{ self .request .id } ] PROCESS TASK: Scheduled store_chunks_in_redis for key '{ redis_key } '" )
219226
220227 end_time = time .time ()
@@ -229,7 +236,7 @@ def process(
229236 f"[{ self .request .id } ] PROCESS TASK: Processing from URL: { source } " )
230237
231238 # For URL source, core.py expects a non-local destination to trigger URL fetching
232- logger .debug (
239+ logger .info (
233240 f"[{ self .request .id } ] PROCESS TASK: Submitting Ray processing for URL='{ source } ', strategy='{ chunking_strategy } ', destination='{ source_type } '" )
234241 chunks_ref = actor .process_file .remote (
235242 source ,
@@ -238,11 +245,19 @@ def process(
238245 task_id = task_id ,
239246 ** params
240247 )
241- # Persist chunks into Redis via Ray to decouple Celery
248+ # Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state)
249+ logger .info (
250+ f"[{ self .request .id } ] PROCESS TASK: Waiting for Ray processing to complete..." )
251+ chunks = ray .get (chunks_ref )
252+ logger .info (
253+ f"[{ self .request .id } ] PROCESS TASK: Ray processing completed, got { len (chunks ) if chunks else 0 } chunks" )
254+
255+ # Persist chunks into Redis via Ray (fire-and-forget, don't block)
242256 redis_key = f"dp:{ task_id } :chunks"
243- actor .store_chunks_in_redis .remote (redis_key , chunks_ref )
244- logger .debug (
257+ actor .store_chunks_in_redis .remote (redis_key , chunks )
258+ logger .info (
245259 f"[{ self .request .id } ] PROCESS TASK: Scheduled store_chunks_in_redis for key '{ redis_key } '" )
260+
246261 end_time = time .time ()
247262 elapsed_time = end_time - start_time
248263 logger .info (
@@ -253,24 +268,25 @@ def process(
253268 raise NotImplementedError (
254269 f"Source type '{ source_type } ' not yet supported" )
255270
256- # Update task state to SUCCESS with metadata (without materializing chunks here)
271+ # Update task state to SUCCESS after Ray processing completes
272+ # This transitions from STARTED (PROCESSING) to SUCCESS (WAIT_FOR_FORWARDING)
257273 self .update_state (
258274 state = states .SUCCESS ,
259275 meta = {
260- 'chunks_count' : None ,
276+ 'chunks_count' : len ( chunks ) if chunks else 0 ,
261277 'processing_time' : elapsed_time ,
262278 'source' : source ,
263279 'index_name' : index_name ,
264280 'original_filename' : original_filename ,
265281 'task_name' : 'process' ,
266282 'stage' : 'text_extracted' ,
267283 'file_size_mb' : file_size_mb ,
268- 'processing_speed_mb_s' : file_size_mb / elapsed_time if elapsed_time > 0 else 0
284+ 'processing_speed_mb_s' : file_size_mb / elapsed_time if file_size_mb > 0 and elapsed_time > 0 else 0
269285 }
270286 )
271287
272288 logger .info (
273- f"[{ self .request .id } ] PROCESS TASK: Submitted for Ray processing; result will be fetched by forward" )
289+ f"[{ self .request .id } ] PROCESS TASK: Processing complete, waiting for forward task " )
274290
275291 # Prepare data for the next task in the chain; pass redis_key
276292 returned_data = {
@@ -563,6 +579,9 @@ async def index_documents():
563579 "source" : original_source ,
564580 "original_filename" : original_filename
565581 }, ensure_ascii = False ))
582+
583+ logger .info (
584+ f"[{ self .request .id } ] FORWARD TASK: Starting ES indexing for { len (formatted_chunks )} chunks to index '{ original_index_name } '..." )
566585 es_result = run_async (index_documents ())
567586 logger .debug (
568587 f"[{ self .request .id } ] FORWARD TASK: API response from main_server for source '{ original_source } ': { es_result } " )
@@ -605,6 +624,8 @@ async def index_documents():
605624 "original_filename" : original_filename
606625 }, ensure_ascii = False ))
607626 end_time = time .time ()
627+ logger .info (
628+ f"[{ self .request .id } ] FORWARD TASK: Updating task state to SUCCESS after ES indexing completion" )
608629 self .update_state (
609630 state = states .SUCCESS ,
610631 meta = {
@@ -620,7 +641,7 @@ async def index_documents():
620641 )
621642
622643 logger .info (
623- f"Stored { len (chunks )} chunks to index { original_index_name } in { end_time - start_time :.2f} s" )
644+ f"[ { self . request . id } ] FORWARD TASK: Successfully stored { len (chunks )} chunks to index { original_index_name } in { end_time - start_time :.2f} s" )
624645 return {
625646 'task_id' : task_id ,
626647 'source' : original_source ,
0 commit comments