Skip to content

Commit 4f14330

Browse files
paultranvanclaude
andcommitted
feat(phase-2): centralized exception handling for API layer
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 23a4897 commit 4f14330

File tree

8 files changed

+162
-249
lines changed

8 files changed

+162
-249
lines changed

openrag/api.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,16 @@ async def openrag_exception_handler(request: Request, exc: OpenRAGError):
167167
return JSONResponse(status_code=exc.status_code, content=exc.to_dict())
168168

169169

170+
@app.exception_handler(Exception)
171+
async def unhandled_exception_handler(request: Request, exc: Exception):
172+
logger = get_logger()
173+
logger.exception("Unhandled exception", error=str(exc))
174+
return JSONResponse(
175+
status_code=500,
176+
content={"detail": "[UNEXPECTED_ERROR]: An unexpected error occurred", "extra": {}},
177+
)
178+
179+
170180
# Add CORS middleware
171181
allow_origins = [
172182
"http://localhost:3042",

openrag/components/pipeline.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,17 @@ async def _prepare_for_completions(self, partition: list[str], payload: dict):
189189
return payload, docs[:n_docs]
190190

191191
async def completions(self, partition: list[str], payload: dict):
192-
try:
193-
if partition is None:
194-
docs = []
195-
else:
196-
payload, docs = await self._prepare_for_completions(partition=partition, payload=payload)
197-
llm_output = self.llm_client.completions(request=payload)
198-
return llm_output, docs
199-
except Exception as e:
200-
logger.error(f"Error during chat completion: {e!s}")
201-
raise e
192+
if partition is None:
193+
docs = []
194+
else:
195+
payload, docs = await self._prepare_for_completions(partition=partition, payload=payload)
196+
llm_output = self.llm_client.completions(request=payload)
197+
return llm_output, docs
202198

203199
async def chat_completion(self, partition: list[str] | None, payload: dict):
204-
try:
205-
if partition is None:
206-
docs = []
207-
else:
208-
payload, docs = await self._prepare_for_chat_completion(partition=partition, payload=payload)
209-
llm_output = self.llm_client.chat_completion(request=payload)
210-
return llm_output, docs
211-
except Exception as e:
212-
logger.error(f"Error during chat completion: {e!s}")
213-
raise e
200+
if partition is None:
201+
docs = []
202+
else:
203+
payload, docs = await self._prepare_for_chat_completion(partition=partition, payload=payload)
204+
llm_output = self.llm_client.chat_completion(request=payload)
205+
return llm_output, docs

openrag/routers/actors.py

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -51,24 +51,17 @@
5151
)
5252
async def list_ray_actors():
5353
"""List all known Ray actors and their status."""
54-
try:
55-
actors = [
56-
{
57-
"actor_id": a.actor_id,
58-
"name": a.name,
59-
"class_name": a.class_name,
60-
"state": a.state,
61-
"namespace": a.ray_namespace,
62-
}
63-
for a in list_actors()
64-
]
65-
return JSONResponse(status_code=status.HTTP_200_OK, content={"actors": actors})
66-
except Exception:
67-
logger.exception("Error getting actor summaries")
68-
raise HTTPException(
69-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
70-
detail="Failed to retrieve actor summaries.",
71-
)
54+
actors = [
55+
{
56+
"actor_id": a.actor_id,
57+
"name": a.name,
58+
"class_name": a.class_name,
59+
"state": a.state,
60+
"namespace": a.ray_namespace,
61+
}
62+
for a in list_actors()
63+
]
64+
return JSONResponse(status_code=status.HTTP_200_OK, content={"actors": actors})
7265

7366

7467
@router.post(
@@ -116,29 +109,16 @@ async def restart_actor(
116109
logger.info(f"Killed actor: {actor_name}")
117110
except ValueError:
118111
logger.warning("Actor not found. Creating new instance.", actor=actor_name)
119-
except Exception as e:
120-
logger.exception("Failed to kill actor", actor=actor_name)
121-
raise HTTPException(
122-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
123-
detail=f"Failed to kill actor {actor_name}: {e!s}",
124-
)
125112

126-
try:
127-
new_actor = actor_creation_map[actor_name]()
128-
if "Semaphore" in actor_name:
129-
new_actor = new_actor._actor
130-
logger.info(f"Restarted actor: {actor_name}")
131-
return JSONResponse(
132-
status_code=status.HTTP_200_OK,
133-
content={
134-
"message": f"Actor {actor_name} restarted successfully.",
135-
"actor_name": actor_name,
136-
"actor_id": new_actor._actor_id.hex(),
137-
},
138-
)
139-
except Exception as e:
140-
logger.exception("Failed to restart actor", actor=actor_name)
141-
raise HTTPException(
142-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
143-
detail=f"Failed to restart actor {actor_name}: {e!s}",
144-
)
113+
new_actor = actor_creation_map[actor_name]()
114+
if "Semaphore" in actor_name:
115+
new_actor = new_actor._actor
116+
logger.info(f"Restarted actor: {actor_name}")
117+
return JSONResponse(
118+
status_code=status.HTTP_200_OK,
119+
content={
120+
"message": f"Actor {actor_name} restarted successfully.",
121+
"actor_name": actor_name,
122+
"actor_id": new_actor._actor_id.hex(),
123+
},
124+
)

openrag/routers/extract.py

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,23 @@ async def get_extract(
4848
user_partitions=Depends(current_user_or_admin_partitions_list),
4949
):
5050
log = logger.bind(extract_id=extract_id)
51-
try:
52-
chunk = await vectordb.get_chunk_by_id.remote(extract_id)
53-
if chunk is None:
54-
log.warning("Extract not found.")
55-
raise HTTPException(
56-
status_code=status.HTTP_404_NOT_FOUND,
57-
detail=f"Extract '{extract_id}' not found.",
58-
)
59-
chunk_partition = chunk.metadata["partition"]
60-
log.info(f"User partitions: {user_partitions}, Chunk partition: {chunk_partition}")
61-
if chunk_partition not in user_partitions and user_partitions != ["all"]:
62-
log.warning("User does not have access to this extract.")
63-
raise HTTPException(
64-
status_code=status.HTTP_403_FORBIDDEN,
65-
detail=f"User does not have access to extract '{extract_id}'.",
66-
)
67-
log.info("Extract successfully retrieved.")
68-
except HTTPException:
69-
raise
70-
except Exception as e:
71-
log.exception("Failed to retrieve extract.", error=str(e))
51+
52+
chunk = await vectordb.get_chunk_by_id.remote(extract_id)
53+
if chunk is None:
54+
log.warning("Extract not found.")
55+
raise HTTPException(
56+
status_code=status.HTTP_404_NOT_FOUND,
57+
detail=f"Extract '{extract_id}' not found.",
58+
)
59+
chunk_partition = chunk.metadata["partition"]
60+
log.info(f"User partitions: {user_partitions}, Chunk partition: {chunk_partition}")
61+
if chunk_partition not in user_partitions and user_partitions != ["all"]:
62+
log.warning("User does not have access to this extract.")
7263
raise HTTPException(
73-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
74-
detail=f"Failed to retrieve extract: {e!s}",
64+
status_code=status.HTTP_403_FORBIDDEN,
65+
detail=f"User does not have access to extract '{extract_id}'.",
7566
)
67+
log.info("Extract successfully retrieved.")
7668

7769
return JSONResponse(
7870
status_code=status.HTTP_200_OK,

openrag/routers/indexer.py

Lines changed: 38 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -123,30 +123,16 @@ async def add_file(
123123
vectordb=Depends(get_vectordb),
124124
user=Depends(require_partition_editor),
125125
):
126-
log = logger.bind(
127-
file_id=file_id,
128-
partition=partition,
129-
filename=file.filename,
130-
user=user.get("display_name"),
131-
)
132-
133126
if await vectordb.file_exists.remote(file_id, partition):
134127
raise HTTPException(
135128
status_code=status.HTTP_409_CONFLICT,
136129
detail=f"File '{file_id}' already exists in partition {partition}",
137130
)
138131

139132
save_dir = Path(DATA_DIR)
140-
try:
141-
original_filename = file.filename
142-
file.filename = sanitize_filename(file.filename)
143-
file_path = await save_file_to_disk(file, save_dir, with_random_prefix=True)
144-
except Exception as e:
145-
log.exception("Failed to save file to disk.", error=str(e))
146-
raise HTTPException(
147-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
148-
detail=str(e),
149-
)
133+
original_filename = file.filename
134+
file.filename = sanitize_filename(file.filename)
135+
file_path = await save_file_to_disk(file, save_dir, with_random_prefix=True)
150136

151137
metadata.update(
152138
{
@@ -240,8 +226,6 @@ async def put_file(
240226
vectordb=Depends(get_vectordb),
241227
user=Depends(require_partition_editor),
242228
):
243-
log = logger.bind(file_id=file_id, partition=partition, filename=file.filename)
244-
245229
if not await vectordb.file_exists.remote(file_id, partition):
246230
raise HTTPException(
247231
status_code=status.HTTP_404_NOT_FOUND,
@@ -252,16 +236,9 @@ async def put_file(
252236
await indexer.delete_file.remote(file_id, partition)
253237

254238
save_dir = Path(DATA_DIR)
255-
try:
256-
original_filename = file.filename
257-
file.filename = sanitize_filename(file.filename)
258-
file_path = await save_file_to_disk(file, save_dir, with_random_prefix=True)
259-
except Exception:
260-
log.exception("Failed to save file to disk.")
261-
raise HTTPException(
262-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
263-
detail="Failed to save uploaded file.",
264-
)
239+
original_filename = file.filename
240+
file.filename = sanitize_filename(file.filename)
241+
file_path = await save_file_to_disk(file, save_dir, with_random_prefix=True)
265242

266243
metadata.update(
267244
{
@@ -441,21 +418,13 @@ async def get_task_error(
441418
task_state_manager=Depends(get_task_state_manager),
442419
task_details=Depends(require_task_owner),
443420
):
444-
try:
445-
error = await task_state_manager.get_error.remote(task_id)
446-
if error is None:
447-
raise HTTPException(
448-
status_code=status.HTTP_404_NOT_FOUND,
449-
detail=f"No error found for task '{task_id}'.",
450-
)
451-
return {"task_id": task_id, "traceback": error.splitlines()}
452-
except HTTPException:
453-
raise
454-
except Exception:
421+
error = await task_state_manager.get_error.remote(task_id)
422+
if error is None:
455423
raise HTTPException(
456-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
457-
detail="Failed to retrieve task error.",
424+
status_code=status.HTTP_404_NOT_FOUND,
425+
detail=f"No error found for task '{task_id}'.",
458426
)
427+
return {"task_id": task_id, "traceback": error.splitlines()}
459428

460429

461430
@router.get(
@@ -475,32 +444,27 @@ async def get_task_error(
475444
""",
476445
)
477446
async def get_task_logs(task_id: str, max_lines: int = 100, task_details=Depends(require_task_owner)):
478-
try:
479-
if not LOG_FILE.exists():
480-
raise HTTPException(status_code=500, detail="Log file not found.")
481-
482-
logs = []
483-
with open(LOG_FILE, errors="replace") as f:
484-
for line in reversed(list(f)):
485-
try:
486-
record = json.loads(line).get("record", {})
487-
if record.get("extra", {}).get("task_id") == task_id:
488-
logs.append(
489-
f"{record['time']['repr']} - {record['level']['name']} - {record['message']} - {(record['extra'])}"
490-
)
491-
if len(logs) >= max_lines:
492-
break
493-
except json.JSONDecodeError:
494-
continue
495-
496-
if not logs:
497-
raise HTTPException(status_code=404, detail=f"No logs found for task '{task_id}'")
498-
499-
return JSONResponse(content={"task_id": task_id, "logs": logs[::-1]}) # restore order
500-
except HTTPException:
501-
raise
502-
except Exception as e:
503-
raise HTTPException(status_code=500, detail=f"Failed to fetch logs: {e!s}")
447+
if not LOG_FILE.exists():
448+
raise HTTPException(status_code=500, detail="Log file not found.")
449+
450+
logs = []
451+
with open(LOG_FILE, errors="replace") as f:
452+
for line in reversed(list(f)):
453+
try:
454+
record = json.loads(line).get("record", {})
455+
if record.get("extra", {}).get("task_id") == task_id:
456+
logs.append(
457+
f"{record['time']['repr']} - {record['level']['name']} - {record['message']} - {(record['extra'])}"
458+
)
459+
if len(logs) >= max_lines:
460+
break
461+
except json.JSONDecodeError:
462+
continue
463+
464+
if not logs:
465+
raise HTTPException(status_code=404, detail=f"No logs found for task '{task_id}'")
466+
467+
return JSONResponse(content={"task_id": task_id, "logs": logs[::-1]}) # restore order
504468

505469

506470
@router.delete(
@@ -525,13 +489,9 @@ async def cancel_task(
525489
task_state_manager=Depends(get_task_state_manager),
526490
task_details=Depends(require_task_owner),
527491
):
528-
try:
529-
obj_ref = await task_state_manager.get_object_ref.remote(task_id)
530-
if obj_ref is None:
531-
raise HTTPException(404, f"No ObjectRef stored for task {task_id}")
532-
533-
ray.cancel(obj_ref["ref"], recursive=True)
534-
return {"message": f"Cancellation signal sent for task {task_id}"}
535-
except Exception as e:
536-
logger.exception("Failed to cancel task.")
537-
raise HTTPException(status_code=500, detail=str(e))
492+
obj_ref = await task_state_manager.get_object_ref.remote(task_id)
493+
if obj_ref is None:
494+
raise HTTPException(404, f"No ObjectRef stored for task {task_id}")
495+
496+
ray.cancel(obj_ref["ref"], recursive=True)
497+
return {"message": f"Cancellation signal sent for task {task_id}"}

0 commit comments

Comments
 (0)