Skip to content

Commit b67341a

Browse files
authored
♻️ Refactor the conversation_management_app&conversation_management_service&data_process_app&es
2 parents 29b8b58 + 1e43188 commit b67341a

12 files changed

+2980
-439
lines changed

backend/apps/conversation_management_app.py

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from http import HTTPStatus
23
from typing import Any, Dict, Optional
34

45
from fastapi import APIRouter, Header, HTTPException, Request
@@ -11,7 +12,6 @@
1112
OpinionRequest,
1213
RenameRequest,
1314
)
14-
from database.conversation_db import get_message_id_by_index
1515
from services.conversation_management_service import (
1616
create_new_conversation,
1717
delete_conversation_service,
@@ -20,7 +20,7 @@
2020
get_conversation_list_service,
2121
get_sources_service,
2222
rename_conversation_service,
23-
update_message_opinion_service,
23+
update_message_opinion_service, get_message_id_by_index_impl,
2424
)
2525
from utils.auth_utils import get_current_user_id, get_current_user_info
2626

@@ -50,7 +50,7 @@ async def create_new_conversation_endpoint(request: ConversationRequest, authori
5050
return ConversationResponse(code=0, message="success", data=conversation_data)
5151
except Exception as e:
5252
logging.error(f"Failed to create conversation: {str(e)}")
53-
raise HTTPException(status_code=500, detail=str(e))
53+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
5454

5555

5656
@router.get("/list", response_model=ConversationResponse)
@@ -67,16 +67,12 @@ async def list_conversations_endpoint(authorization: Optional[str] = Header(None
6767
try:
6868
user_id, tenant_id = get_current_user_id(authorization)
6969
if not user_id:
70-
raise HTTPException(status_code=401, detail="未授权访问,请先登录")
71-
70+
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="Unauthorized access, Please login first")
7271
conversations = get_conversation_list_service(user_id)
7372
return ConversationResponse(code=0, message="success", data=conversations)
74-
except HTTPException as he:
75-
# Throw HTTP Exception Directly
76-
raise he
7773
except Exception as e:
7874
logging.error(f"Failed to get conversation list: {str(e)}")
79-
raise HTTPException(status_code=500, detail=str(e))
75+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
8076

8177

8278
@router.post("/rename", response_model=ConversationResponse)
@@ -100,9 +96,7 @@ async def rename_conversation_endpoint(request: RenameRequest, authorization: Op
10096
return ConversationResponse(code=0, message="success", data=True)
10197
except Exception as e:
10298
logging.error(f"Failed to rename conversation: {str(e)}")
103-
if isinstance(e, HTTPException):
104-
raise e
105-
raise HTTPException(status_code=500, detail=str(e))
99+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
106100

107101

108102
@router.delete("/{conversation_id}", response_model=ConversationResponse)
@@ -123,9 +117,7 @@ async def delete_conversation_endpoint(conversation_id: int, authorization: Opti
123117
return ConversationResponse(code=0, message="success", data=True)
124118
except Exception as e:
125119
logging.error(f"Failed to delete conversation: {str(e)}")
126-
if isinstance(e, HTTPException):
127-
raise e
128-
raise HTTPException(status_code=500, detail=str(e))
120+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
129121

130122

131123
@router.get("/{conversation_id}", response_model=ConversationResponse)
@@ -147,9 +139,7 @@ async def get_conversation_history_endpoint(conversation_id: int, authorization:
147139
return ConversationResponse(code=0, message="success", data=history_data)
148140
except Exception as e:
149141
logging.error(f"Failed to get conversation history: {str(e)}")
150-
if isinstance(e, HTTPException):
151-
raise e
152-
raise HTTPException(status_code=500, detail=str(e))
142+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
153143

154144

155145
@router.post("/sources", response_model=Dict[str, Any])
@@ -175,11 +165,7 @@ async def get_sources_endpoint(request: Dict[str, Any], authorization: Optional[
175165
return get_sources_service(conversation_id, message_id, source_type, user_id)
176166
except Exception as e:
177167
logging.error(f"Failed to get message sources: {str(e)}")
178-
return {
179-
"code": 500,
180-
"message": str(e),
181-
"data": None
182-
}
168+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
183169

184170

185171
@router.post("/generate_title", response_model=ConversationResponse)
@@ -208,9 +194,7 @@ async def generate_conversation_title_endpoint(
208194
return ConversationResponse(code=0, message="success", data=title)
209195
except Exception as e:
210196
logging.error(f"Failed to generate conversation title: {str(e)}")
211-
if isinstance(e, HTTPException):
212-
raise e
213-
raise HTTPException(status_code=500, detail=str(e))
197+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
214198

215199

216200
@router.post("/message/update_opinion", response_model=ConversationResponse)
@@ -230,9 +214,7 @@ async def update_opinion_endpoint(request: OpinionRequest, authorization: Option
230214
return ConversationResponse(code=0, message="success", data=True)
231215
except Exception as e:
232216
logging.error(f"Failed to update message like/dislike: {str(e)}")
233-
if isinstance(e, HTTPException):
234-
raise e
235-
raise HTTPException(status_code=500, detail=str(e))
217+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
236218

237219

238220
@router.post("/message/id", response_model=ConversationResponse)
@@ -249,14 +231,8 @@ async def get_message_id_endpoint(request: MessageIdRequest):
249231
ConversationResponse object containing message_id
250232
"""
251233
try:
252-
message_id = get_message_id_by_index(
253-
request.conversation_id, request.message_index)
254-
if message_id is None:
255-
raise HTTPException(status_code=404, detail="Message not found")
256-
234+
message_id = await get_message_id_by_index_impl(request.conversation_id, request.message_index)
257235
return ConversationResponse(code=0, message="success", data=message_id)
258236
except Exception as e:
259237
logging.error(f"Failed to get message ID: {str(e)}")
260-
if isinstance(e, HTTPException):
261-
raise e
262-
raise HTTPException(status_code=500, detail=str(e))
238+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))

backend/apps/data_process_app.py

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import time
55
from contextlib import asynccontextmanager
6+
from http import HTTPStatus
67
from typing import Optional
78

89
from fastapi import APIRouter, File, Form, Header, HTTPException, UploadFile
@@ -119,7 +120,7 @@ async def process_sync_endpoint(
119120
except Exception as e:
120121
logger.error(f"Error in synchronous processing: {str(e)}")
121122
raise HTTPException(
122-
status_code=500,
123+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
123124
detail=f"Error processing file: {str(e)}"
124125
)
125126

@@ -133,48 +134,13 @@ async def create_batch_tasks(request: BatchTaskRequest, authorization: Optional[
133134
Processing happens in the background for each file independently.
134135
"""
135136
try:
136-
task_ids = []
137-
138-
# Create individual tasks for each source
139-
for source_config in request.sources:
140-
# Extract parameters
141-
source = source_config.get('source')
142-
source_type = source_config.get('source_type')
143-
chunking_strategy = source_config.get('chunking_strategy')
144-
index_name = source_config.get('index_name')
145-
original_filename = source_config.get('original_filename')
146-
147-
# Validate required fields
148-
if not source:
149-
logger.error(
150-
f"Missing required field 'source' in source config: {source_config}")
151-
continue
152-
if not index_name:
153-
logger.error(
154-
f"Missing required field 'index_name' in source config: {source_config}")
155-
continue
156-
157-
# Create individual task for this source
158-
task_result = process_and_forward.delay(
159-
source=source,
160-
source_type=source_type,
161-
chunking_strategy=chunking_strategy,
162-
index_name=index_name,
163-
original_filename=original_filename,
164-
authorization=authorization
165-
)
166-
167-
task_ids.append(task_result.id)
168-
logger.debug(f"Created task {task_result.id} for source: {source}")
169-
170-
logger.info(
171-
f"Created {len(task_ids)} individual tasks for batch processing")
137+
task_ids = await service.create_batch_tasks_impl(authorization=authorization, request=request)
172138
return BatchTaskResponse(task_ids=task_ids)
173139

174140
except Exception as e:
175141
logger.error(f"Error creating batch tasks: {str(e)}")
176142
raise HTTPException(
177-
status_code=500, detail=f"Failed to create batch tasks: {str(e)}")
143+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Failed to create batch tasks: {str(e)}")
178144

179145

180146
@router.get("/load_image")
@@ -194,7 +160,7 @@ async def load_image(url: str):
194160

195161
if image is None:
196162
raise HTTPException(
197-
status_code=404, detail="Failed to load image or image format not supported")
163+
status_code=HTTPStatus.NOT_FOUND, detail="Failed to load image or image format not supported")
198164

199165
# Convert PIL image to base64
200166
img_byte_arr = io.BytesIO()
@@ -214,7 +180,7 @@ async def load_image(url: str):
214180
except Exception as e:
215181
logger.error(f"Error loading image: {str(e)}")
216182
raise HTTPException(
217-
status_code=500, detail=f"Error loading image: {str(e)}")
183+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Error loading image: {str(e)}")
218184

219185

220186
@router.get("/{task_id}", response_model=SimpleTaskStatusResponse)
@@ -224,7 +190,7 @@ async def get_task(task_id: str):
224190

225191
if not task:
226192
raise HTTPException(
227-
status_code=404, detail=f"Task with ID {task_id} not found")
193+
status_code=HTTPStatus.NOT_FOUND, detail=f"Task with ID {task_id} not found")
228194
return SimpleTaskStatusResponse(
229195
id=task["id"],
230196
task_name=task["task_name"],
@@ -272,7 +238,7 @@ async def get_index_tasks(index_name: str):
272238
try:
273239
return await service.get_index_tasks(index_name)
274240
except Exception as e:
275-
raise HTTPException(status_code=500, detail=str(e))
241+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
276242

277243

278244
@router.get("/{task_id}/details")
@@ -281,7 +247,7 @@ async def get_task_details(task_id: str):
281247
from data_process.utils import get_task_details as utils_get_task_details
282248
task = await utils_get_task_details(task_id)
283249
if not task:
284-
raise HTTPException(status_code=404, detail="Task not found")
250+
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Task not found")
285251
return task
286252

287253

@@ -307,7 +273,7 @@ async def filter_important_image(
307273
except Exception as e:
308274
logger.error(f"Error processing image: {str(e)}")
309275
raise HTTPException(
310-
status_code=500, detail=f"Error processing image: {str(e)}")
276+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Error processing image: {str(e)}")
311277

312278

313279
@router.post("/process_text_file", response_model=dict, status_code=200)
@@ -380,7 +346,7 @@ async def process_text_file(
380346
logger.exception(
381347
f"Error processing uploaded file {file.filename}: {str(e)}")
382348
raise HTTPException(
383-
status_code=500,
349+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
384350
detail=f"An error occurred while processing the file: {str(e)}"
385351
)
386352

0 commit comments

Comments
 (0)