1- import asyncio
2- import json
31import logging
42import os
53from http import HTTPStatus
6- from io import BytesIO
74from typing import List , Optional
85
9- import httpx
10- import requests
116from fastapi import APIRouter , Body , File , Form , Header , HTTPException , Path as PathParam , Query , Request , UploadFile
127from fastapi .responses import JSONResponse , RedirectResponse , StreamingResponse
138
14- from agents .preprocess_manager import preprocess_manager
15- from consts .const import DATA_PROCESS_SERVICE
169from consts .model import ProcessParams
1710from services .file_management_service import upload_to_minio , upload_files_impl , \
18- get_file_url_impl , get_file_stream_impl , delete_file_impl , list_files_impl
19- from utils . attachment_utils import convert_image_to_text , convert_long_text_to_text
11+ get_file_url_impl , get_file_stream_impl , delete_file_impl , list_files_impl , \
12+ preprocess_files_generator
2013from utils .auth_utils import get_current_user_info
2114from utils .file_management_utils import trigger_data_process
2215
@@ -314,82 +307,16 @@ async def agent_preprocess_api(
314307 else :
315308 conversation_id = - 1 # Default for cases without conversation_id
316309
317- async def generate ():
318- file_descriptions = []
319- total_files = len (file_cache )
320-
321- # Create and register the preprocess task
322- task = asyncio .current_task ()
323- if task :
324- preprocess_manager .register_preprocess_task (
325- task_id , conversation_id , task )
326-
327- try :
328- for index , file_data in enumerate (file_cache ):
329- # Check if task should stop
330- if task and task .done ():
331- logger .info (f"Preprocess task { task_id } was cancelled" )
332- break
333-
334- progress = int ((index / total_files ) * 100 )
335-
336- progress_message = json .dumps ({
337- "type" : "progress" ,
338- "progress" : progress ,
339- "message" : f"Parsing file { index + 1 } /{ total_files } : { file_data ['filename' ]} "
340- }, ensure_ascii = False )
341- yield f"data: { progress_message } \n \n "
342- await asyncio .sleep (0.1 )
343-
344- try :
345- # Check if file already has an error
346- if "error" in file_data :
347- raise Exception (file_data ["error" ])
348-
349- if file_data ["ext" ] in ['.jpg' , '.jpeg' , '.png' , '.gif' , '.bmp' , '.webp' ]:
350- description = await process_image_file (
351- query , file_data ["filename" ], file_data ["content" ], tenant_id , language
352- )
353- else :
354- description = await process_text_file (
355- query , file_data ["filename" ], file_data ["content" ], tenant_id , language
356- )
357- file_descriptions .append (description )
358-
359- # Send processing result for each file
360- file_message = json .dumps ({
361- "type" : "file_processed" ,
362- "filename" : file_data ["filename" ],
363- "description" : description
364- }, ensure_ascii = False )
365- yield f"data: { file_message } \n \n "
366- await asyncio .sleep (0.1 )
367- except Exception as e :
368- logger .exception (
369- f"Error parsing file { file_data ['filename' ]} : { str (e )} " )
370- error_description = f"Error parsing file { file_data ['filename' ]} : { str (e )} "
371- file_descriptions .append (error_description )
372- error_message = json .dumps ({
373- "type" : "error" ,
374- "filename" : file_data ["filename" ],
375- "message" : error_description
376- }, ensure_ascii = False )
377- yield f"data: { error_message } \n \n "
378- await asyncio .sleep (0.1 )
379-
380- # Send completion message
381- complete_message = json .dumps ({
382- "type" : "complete" ,
383- "progress" : 100 ,
384- "final_query" : query
385- }, ensure_ascii = False )
386- yield f"data: { complete_message } \n \n "
387- finally :
388- # Always unregister the task
389- preprocess_manager .unregister_preprocess_task (task_id )
390-
310+ # Call service layer to generate streaming response
391311 return StreamingResponse (
392- generate (),
312+ preprocess_files_generator (
313+ query = query ,
314+ file_cache = file_cache ,
315+ tenant_id = tenant_id ,
316+ language = language ,
317+ task_id = task_id ,
318+ conversation_id = conversation_id
319+ ),
393320 media_type = "text/event-stream" ,
394321 headers = {
395322 "Cache-Control" : "no-cache" ,
@@ -401,71 +328,3 @@ async def generate():
401328 status_code = 500 , detail = f"File preprocessing error: { str (e )} " )
402329
403330
404- async def process_image_file (query , filename , file_content , tenant_id : str , language : str = 'zh' ) -> str :
405- """
406- Process image file, convert to text using external API
407- """
408- image_stream = BytesIO (file_content )
409- text = convert_image_to_text (query , image_stream , tenant_id , language )
410-
411- return f"Image file { filename } content: { text } "
412-
413-
414- async def process_text_file (query , filename , file_content , tenant_id : str , language : str = 'zh' ) -> str :
415- """
416- Process text file, convert to text using external API
417- """
418- # file_content is byte data, need to send to API through file upload
419- data_process_service_url = DATA_PROCESS_SERVICE
420- api_url = f"{ data_process_service_url } /tasks/process_text_file"
421- logger .info (f"Processing text file { filename } with API: { api_url } " )
422-
423- try :
424- # Upload byte data as a file
425- files = {
426- 'file' : (filename , file_content , 'application/octet-stream' )
427- }
428- data = {
429- 'chunking_strategy' : 'basic' ,
430- 'timeout' : 60
431- }
432- async with httpx .AsyncClient () as client :
433- response = await client .post (api_url , files = files , data = data , timeout = 60 )
434-
435- if response .status_code == 200 :
436- result = response .json ()
437- raw_text = result .get ("text" , "" )
438- logger .info (
439- f"File processed successfully: { raw_text [:200 ]} ...{ raw_text [- 200 :]} ..., length: { len (raw_text )} " )
440- else :
441- error_detail = response .json ().get ('detail' , '未知错误' ) if response .headers .get (
442- 'content-type' , '' ).startswith ('application/json' ) else response .text
443- logger .error (
444- f"File processing failed (status code: { response .status_code } ): { error_detail } " )
445- raise Exception (
446- f"File processing failed (status code: { response .status_code } ): { error_detail } " )
447-
448- except requests .exceptions .Timeout :
449- raise Exception ("API call timeout" )
450- except requests .exceptions .ConnectionError :
451- raise Exception (
452- f"Cannot connect to data processing service: { api_url } " )
453- except Exception as e :
454- raise Exception (f"Error processing file: { str (e )} " )
455-
456- text = convert_long_text_to_text (query , raw_text , tenant_id , language )
457- return f"File { filename } content: { text } "
458-
459-
460- def get_file_description (files : List [UploadFile ]) -> str :
461- """
462- Generate file description text
463- """
464- description = "User provided some reference files:\n "
465- for file in files :
466- ext = os .path .splitext (file .filename or "" )[1 ].lower ()
467- if ext in ['.jpg' , '.jpeg' , '.png' , '.gif' , '.bmp' ]:
468- description += f"- Image file { file .filename or '' } \n "
469- else :
470- description += f"- File { file .filename or '' } \n "
471- return description
0 commit comments