1212import aiohttp
1313import ray
1414from celery import Task , chain , states
15+ from celery .exceptions import Retry
1516
1617from consts .const import ELASTICSEARCH_SERVICE
1718from utils .file_management_utils import get_file_size
1819from .app import app
1920from .ray_actors import DataProcessorRayActor
21+ from consts .const import (
22+ REDIS_BACKEND_URL ,
23+ FORWARD_REDIS_RETRY_DELAY_S ,
24+ FORWARD_REDIS_RETRY_MAX ,
25+ )
2026
2127
2228logger = logging .getLogger ("data_process.tasks" )
@@ -195,15 +201,21 @@ def process(
195201 f"[{ self .request .id } ] PROCESS TASK: File size: { file_size_mb :.2f} MB" )
196202
197203 # The unified actor call, mapping 'file' source_type to 'local' destination
204+ # Submit Ray work and do not block here
205+ logger .debug (
206+ f"[{ self .request .id } ] PROCESS TASK: Submitting Ray processing for source='{ source } ', strategy='{ chunking_strategy } ', destination='{ source_type } '" )
198207 chunks_ref = actor .process_file .remote (
199208 source ,
200209 chunking_strategy ,
201210 destination = source_type ,
202211 task_id = task_id ,
203212 ** params
204213 )
205-
206- chunks = ray .get (chunks_ref )
214+ # Persist chunks into Redis via Ray to decouple Celery
215+ redis_key = f"dp:{ task_id } :chunks"
216+ actor .store_chunks_in_redis .remote (redis_key , chunks_ref )
217+ logger .debug (
218+ f"[{ self .request .id } ] PROCESS TASK: Scheduled store_chunks_in_redis for key '{ redis_key } '" )
207219
208220 end_time = time .time ()
209221 elapsed_time = end_time - start_time
@@ -217,14 +229,20 @@ def process(
217229 f"[{ self .request .id } ] PROCESS TASK: Processing from URL: { source } " )
218230
219231 # For URL source, core.py expects a non-local destination to trigger URL fetching
232+ logger .debug (
233+ f"[{ self .request .id } ] PROCESS TASK: Submitting Ray processing for URL='{ source } ', strategy='{ chunking_strategy } ', destination='{ source_type } '" )
220234 chunks_ref = actor .process_file .remote (
221235 source ,
222236 chunking_strategy ,
223237 destination = source_type ,
224238 task_id = task_id ,
225239 ** params
226240 )
227- chunks = ray .get (chunks_ref )
241+ # Persist chunks into Redis via Ray to decouple Celery
242+ redis_key = f"dp:{ task_id } :chunks"
243+ actor .store_chunks_in_redis .remote (redis_key , chunks_ref )
244+ logger .debug (
245+ f"[{ self .request .id } ] PROCESS TASK: Scheduled store_chunks_in_redis for key '{ redis_key } '" )
228246 end_time = time .time ()
229247 elapsed_time = end_time - start_time
230248 logger .info (
@@ -235,11 +253,11 @@ def process(
235253 raise NotImplementedError (
236254 f"Source type '{ source_type } ' not yet supported" )
237255
238- # Update task state to SUCCESS with metadata
256+ # Update task state to SUCCESS with metadata (without materializing chunks here)
239257 self .update_state (
240258 state = states .SUCCESS ,
241259 meta = {
242- 'chunks_count' : len ( chunks ) ,
260+ 'chunks_count' : None ,
243261 'processing_time' : elapsed_time ,
244262 'source' : source ,
245263 'index_name' : index_name ,
@@ -252,11 +270,12 @@ def process(
252270 )
253271
254272 logger .info (
255- f"[{ self .request .id } ] PROCESS TASK: Successfully processed { len ( chunks ) } chunks in { elapsed_time :.2f } s " )
273+ f"[{ self .request .id } ] PROCESS TASK: Submitted for Ray processing; result will be fetched by forward " )
256274
257- # Prepare data for the next task in the chain
275+ # Prepare data for the next task in the chain; pass redis_key
258276 returned_data = {
259- 'chunks' : chunks ,
277+ 'redis_key' : f"dp:{ task_id } :chunks" ,
278+ 'chunks' : None ,
260279 'source' : source ,
261280 'index_name' : index_name ,
262281 'original_filename' : original_filename ,
@@ -329,6 +348,60 @@ def forward(
329348
330349 try :
331350 chunks = processed_data .get ('chunks' )
351+ # If chunks are not in payload, try loading from Redis via the redis_key
352+ if (not chunks ) and processed_data .get ('redis_key' ):
353+ redis_key = processed_data .get ('redis_key' )
354+ if not REDIS_BACKEND_URL :
355+ raise Exception (json .dumps ({
356+ "message" : "REDIS_BACKEND_URL not configured to retrieve chunks" ,
357+ "index_name" : original_index_name ,
358+ "task_name" : "forward" ,
359+ "source" : original_source ,
360+ "original_filename" : filename
361+ }, ensure_ascii = False ))
362+ try :
363+ import redis
364+ client = redis .Redis .from_url (
365+ REDIS_BACKEND_URL , decode_responses = True )
366+ cached = client .get (redis_key )
367+ if cached :
368+ try :
369+ logger .debug (
370+ f"[{ self .request .id } ] FORWARD TASK: Retrieved Redis key '{ redis_key } ', payload_length={ len (cached )} " )
371+ chunks = json .loads (cached )
372+ except json .JSONDecodeError as jde :
373+ # Log raw prefix to help diagnose incorrect writes
374+ raw_preview = cached [:120 ] if isinstance (
375+ cached , str ) else str (type (cached ))
376+ logger .error (
377+ f"[{ self .request .id } ] FORWARD TASK: JSON decode error for key '{ redis_key } ': { str (jde )} ; raw_prefix={ raw_preview !r} " )
378+ raise
379+ else :
380+ # No busy-wait: release the worker slot and retry later
381+ retry_num = getattr (self .request , 'retries' , 0 )
382+ logger .info (
383+ f"[{ self .request .id } ] FORWARD TASK: Chunks not yet available for key { redis_key } . Retry { retry_num + 1 } /{ FORWARD_REDIS_RETRY_MAX } in { FORWARD_REDIS_RETRY_DELAY_S } s" )
384+ raise self .retry (
385+ countdown = FORWARD_REDIS_RETRY_DELAY_S ,
386+ max_retries = FORWARD_REDIS_RETRY_MAX ,
387+ exc = Exception (json .dumps ({
388+ "message" : "Chunks not ready in Redis; will retry" ,
389+ "index_name" : original_index_name ,
390+ "task_name" : "forward" ,
391+ "source" : original_source ,
392+ "original_filename" : filename
393+ }, ensure_ascii = False ))
394+ )
395+ except Retry :
396+ raise
397+ except Exception as exc :
398+ raise Exception (json .dumps ({
399+ "message" : f"Failed to retrieve chunks from Redis: { str (exc )} " ,
400+ "index_name" : original_index_name ,
401+ "task_name" : "forward" ,
402+ "source" : original_source ,
403+ "original_filename" : filename
404+ }, ensure_ascii = False ))
332405 if processed_data .get ('source' ):
333406 original_source = processed_data .get ('source' )
334407 if processed_data .get ('index_name' ):
@@ -357,7 +430,7 @@ def forward(
357430 "index_name" : original_index_name ,
358431 "task_name" : "forward" ,
359432 "source" : original_source ,
360- "original_filename" : filename
433+ "original_filename" : original_filename
361434 }, ensure_ascii = False ))
362435 if len (chunks ) == 0 :
363436 logger .warning (
0 commit comments