Skip to content

Commit fe5046f

Browse files
committed
✨ Real-time file processing docking Ray data processing module
1 parent 5607c4e commit fe5046f

File tree

11 files changed

+265
-222
lines changed

11 files changed

+265
-222
lines changed

backend/apps/data_process_app.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import logging
22
from contextlib import asynccontextmanager
3-
from fastapi import HTTPException, APIRouter, Form
3+
from fastapi import HTTPException, APIRouter, Form, File, UploadFile
44
import base64
55
import io
6+
import tempfile
7+
import os
68

79
from consts.model import TaskResponse, TaskRequest, BatchTaskResponse, BatchTaskRequest, SimpleTaskStatusResponse, \
810
SimpleTasksListResponse
@@ -292,3 +294,66 @@ async def filter_important_image(
292294
logger.error(f"Error processing image: {str(e)}")
293295
raise HTTPException(
294296
status_code=500, detail=f"Error processing image: {str(e)}")
297+
298+
299+
@router.post("/process_text_file", response_model=dict, status_code=200)
300+
async def process_text_file(
301+
file: UploadFile = File(...),
302+
chunking_strategy: str = Form("basic"),
303+
timeout: int = Form(60)
304+
):
305+
"""
306+
Transfer the uploaded file to text content
307+
308+
This interface is specifically used for file-to-text conversion, supporting multiple file formats including PDF, Word, Excel, etc.
309+
Use high-priority processing queue for fast response.
310+
311+
Parameters:
312+
file: Uploaded file object
313+
chunking_strategy: Chunking strategy, default is "basic"
314+
timeout: Processing timeout (seconds), default is 60 seconds
315+
316+
Returns:
317+
JSON object, containing the extracted full text content and processing metadata
318+
"""
319+
temp_file_path = None
320+
try:
321+
logger.info(f"Processing uploaded file: {file.filename}")
322+
323+
# Save the uploaded file to a temporary file
324+
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename or "")[1]) as temp_file:
325+
content = await file.read()
326+
temp_file.write(content)
327+
temp_file_path = temp_file.name
328+
329+
logger.info(f"Saved uploaded file to temporary path: {temp_file_path}")
330+
331+
result = process_sync(source=temp_file_path, source_type='file', chunking_strategy=chunking_strategy, timeout=timeout)
332+
logger.info(f"************************{str(result)}")
333+
logger.info(f"Successfully processed uploaded file: {file.filename}, extracted {result.get('text_length', 0)} characters")
334+
335+
return {
336+
"success": True,
337+
"task_id": result.get("task_id"),
338+
"filename": file.filename,
339+
"text": result.get("text", ""),
340+
"chunks_count": result.get("chunks_count", 0),
341+
"text_length": result.get("text_length", 0),
342+
"processing_time": result.get("processing_time", 0),
343+
"chunking_strategy": chunking_strategy
344+
}
345+
346+
except Exception as e:
347+
logger.exception(f"Error processing uploaded file {file.filename}: {str(e)}")
348+
raise HTTPException(
349+
status_code=500,
350+
detail=f"处理文件时发生错误: {str(e)}"
351+
)
352+
finally:
353+
# Clean up temporary files
354+
if temp_file_path and os.path.exists(temp_file_path):
355+
try:
356+
os.unlink(temp_file_path)
357+
logger.debug(f"Cleaned up temporary file: {temp_file_path}")
358+
except Exception as e:
359+
logger.warning(f"Failed to clean up temporary file {temp_file_path}: {str(e)}")

backend/apps/file_management_app.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from pathlib import Path
55
from typing import List, Optional
66
from io import BytesIO
7+
import requests
8+
import logging
79

810
from utils.auth_utils import get_current_user_id
911
from fastapi import UploadFile, File, HTTPException, Form, APIRouter, Query, Path as PathParam, Body, Header
@@ -16,7 +18,7 @@
1618
from database.attachment_db import (
1719
upload_fileobj, delete_file, get_file_url, list_files
1820
)
19-
from services.file_management_service import file_management_service
21+
logger = logging.getLogger("file_management_app")
2022

2123
# Create upload directory
2224
upload_dir = Path(UPLOAD_FOLDER)
@@ -444,7 +446,45 @@ async def process_text_file(query, filename, file_content, tenant_id: str) -> st
444446
"""
445447
Process text file, convert to text using external API
446448
"""
447-
raw_text = file_management_service.get_text_from_file(file_content)
449+
# file_content is byte data, need to send to API through file upload
450+
data_process_service_url = os.environ.get('DATA_PROCESS_SERVICE')
451+
api_url = f"{data_process_service_url}/tasks/process_text_file"
452+
logger.info(f"Processing text file {filename} with API: {api_url}")
453+
454+
try:
455+
# Upload byte data as a file
456+
files = {
457+
'file': (filename, file_content, 'application/octet-stream')
458+
}
459+
data = {
460+
'chunking_strategy': 'basic',
461+
'timeout': 60
462+
}
463+
464+
response = requests.post(
465+
api_url,
466+
files=files,
467+
data=data,
468+
timeout=60
469+
)
470+
471+
if response.status_code == 200:
472+
result = response.json()
473+
logger.info(f"File processed successfully: {result}...")
474+
raw_text = result.get("text", "")
475+
logger.info(f"File processed successfully: {raw_text[:100]}...")
476+
else:
477+
error_detail = response.json().get('detail', '未知错误') if response.headers.get('content-type', '').startswith('application/json') else response.text
478+
logger.error(f"File processing failed (status code: {response.status_code}): {error_detail}")
479+
raise Exception(f"File processing failed (status code: {response.status_code}): {error_detail}")
480+
481+
except requests.exceptions.Timeout:
482+
raise Exception("API call timeout")
483+
except requests.exceptions.ConnectionError:
484+
raise Exception(f"Cannot connect to data processing service: {api_url}")
485+
except Exception as e:
486+
raise Exception(f"Error processing file: {str(e)}")
487+
448488
text = convert_long_text_to_text(query, raw_text, tenant_id)
449489
return f"File {filename} content: {text}"
450490

backend/data_process/tasks.py

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Celery tasks for data processing and vector storage
33
"""
44
import logging
5+
import uuid
56
import os
67
import json
78
import time
@@ -611,19 +612,23 @@ def process_sync(self, source: str, source_type: str = "file",
611612
Dict containing the extracted text and metadata
612613
"""
613614
start_time = time.time()
614-
task_id = self.request.id
615+
task_id = self.request.id or str(uuid.uuid4())
615616

616-
# Update task state to PROCESSING
617-
self.update_state(
618-
state=states.STARTED,
619-
meta={
620-
'source': source,
621-
'source_type': source_type,
622-
'task_name': '',
623-
'start_time': start_time,
624-
'sync_mode': True
625-
}
626-
)
617+
# Check if we're in a valid Celery context before updating state
618+
is_celery_context = hasattr(self, 'request') and self.request.id is not None
619+
620+
# Update task state to PROCESSING only if in Celery context
621+
if is_celery_context:
622+
self.update_state(
623+
state=states.STARTED,
624+
meta={
625+
'source': source,
626+
'source_type': source_type,
627+
'task_name': 'process_sync',
628+
'start_time': start_time,
629+
'sync_mode': True
630+
}
631+
)
627632

628633
logger.info(f"Synchronous processing file: {source} with strategy: {chunking_strategy}")
629634

@@ -651,18 +656,19 @@ def process_sync(self, source: str, source_type: str = "file",
651656
# Extract text from chunks
652657
text_content = "\n\n".join([chunk.get("content", "") for chunk in chunks])
653658

654-
# Update task state to COMPLETE
655-
self.update_state(
656-
state=states.SUCCESS,
657-
meta={
658-
'chunks_count': len(chunks),
659-
'processing_time': elapsed_time,
660-
'source': source,
661-
'task_name': '',
662-
'text_length': len(text_content),
663-
'sync_mode': True
664-
}
665-
)
659+
# Update task state to COMPLETE only if in Celery context
660+
if is_celery_context:
661+
self.update_state(
662+
state=states.SUCCESS,
663+
meta={
664+
'chunks_count': len(chunks),
665+
'processing_time': elapsed_time,
666+
'source': source,
667+
'task_name': 'process_sync',
668+
'text_length': len(text_content),
669+
'sync_mode': True
670+
}
671+
)
666672

667673
logger.info(f"Synchronously processed {len(chunks)} chunks from {source} in {elapsed_time:.2f}s")
668674

@@ -679,17 +685,18 @@ def process_sync(self, source: str, source_type: str = "file",
679685
except Exception as e:
680686
logger.error(f"Error synchronously processing file {source}: {str(e)}")
681687

682-
# Update task state to FAILURE with custom metadata
683-
self.update_state(
684-
state=states.FAILURE,
685-
meta={
686-
'source': source,
687-
'task_name': 'process_sync',
688-
'custom_error': str(e),
689-
'sync_mode': True,
690-
'stage': 'sync_processing_failed'
691-
}
692-
)
688+
# Update task state to FAILURE with custom metadata only if in Celery context
689+
if is_celery_context:
690+
self.update_state(
691+
state=states.FAILURE,
692+
meta={
693+
'source': source,
694+
'task_name': 'process_sync',
695+
'custom_error': str(e),
696+
'sync_mode': True,
697+
'stage': 'sync_processing_failed'
698+
}
699+
)
693700

694701
# Re-raise to let Celery handle exception serialization
695702
raise

backend/services/file_management_service.py

Lines changed: 0 additions & 41 deletions
This file was deleted.

backend/utils/attachment_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def convert_long_text_to_text(query: str, file_context: str, tenant_id: str):
2929
long_text_to_text_model = OpenAILongContextModel(
3030
observer=MessageObserver(),
3131
model_id=get_model_name_from_config(secondary_model_config),
32-
api_base=secondary_model_config.get_config("LLM_SECONDARY_MODEL_URL"),
33-
api_key=secondary_model_config.get_config("LLM_SECONDARY_API_KEY")
32+
api_base=secondary_model_config.get("base_url"),
33+
api_key=secondary_model_config.get("api_key")
3434
)
3535
system_prompt = f"用户提出了一个问题:{query},请从回答这个问题的角度精简、仔细描述一下这段文本,200字以内。"
3636
user_prompt = "请仔细阅读并分析这段文本:"

0 commit comments

Comments
 (0)