Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions inference/core/interfaces/http/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
OperationTypeNotRecognisedError,
)
from inference.core.workflows.errors import (
BlockTraceback,
ClientCausedStepExecutionError,
DynamicBlockError,
ExecutionGraphStructureError,
InvalidReferenceTargetError,
NotSupportedExecutionEngineError,
DynamicBlockCodeError,
ReferenceTypeError,
RuntimeInputError,
StepExecutionError,
Expand Down Expand Up @@ -80,6 +82,66 @@
)


def _build_dynamic_block_code_error_response(
error: DynamicBlockCodeError,
) -> "WorkflowErrorResponse":
"""Build a WorkflowErrorResponse for DynamicBlockCodeError."""
return WorkflowErrorResponse(
message=error.public_message,
error_type=error.__class__.__name__,
context=error.context,
inner_error_type=error.inner_error_type,
inner_error_message=str(error.inner_error) if error.inner_error else None,
blocks_errors=[
WorkflowBlockError(
block_id=error.block_type_name or "Dynamic Block",
block_type=error.block_type_name,
property_name="Python code",
property_details=error.public_message,
block_traceback=(
BlockTraceback(
error_line=error.error_line,
code_snippet=error.code_snippet,
traceback=error.traceback_str,
stdout=error.stdout,
stderr=error.stderr,
)
if any(
[
error.error_line,
error.traceback_str,
error.stdout,
error.stderr,
]
)
else None
),
),
],
)


def _build_step_execution_error_response(
error: StepExecutionError,
) -> "WorkflowErrorResponse":
"""Build a WorkflowErrorResponse for StepExecutionError (runtime errors)."""
return WorkflowErrorResponse(
message=error.public_message,
error_type=error.__class__.__name__,
context=error.context,
inner_error_type=error.inner_error_type,
inner_error_message=str(error.inner_error),
blocks_errors=[
WorkflowBlockError(
block_id=error.block_id,
block_type=error.block_type,
property_details=str(error.inner_error),
block_traceback=error.block_traceback,
),
],
)


def with_route_exceptions(route):
"""
A decorator that wraps a FastAPI route to handle specific exceptions. If an exception
Expand Down Expand Up @@ -167,6 +229,10 @@ def wrapped_route(*args, **kwargs):
blocks_errors=error.blocks_errors,
)
resp = JSONResponse(status_code=400, content=content.model_dump())
except DynamicBlockCodeError as error:
logger.exception("%s: %s", type(error).__name__, error)
content = _build_dynamic_block_code_error_response(error)
resp = JSONResponse(status_code=400, content=content.model_dump())
except (
WorkflowDefinitionError,
ReferenceTypeError,
Expand Down Expand Up @@ -384,19 +450,7 @@ def wrapped_route(*args, **kwargs):
)
except StepExecutionError as error:
logger.exception("%s: %s", type(error).__name__, error)
content = WorkflowErrorResponse(
message=str(error.public_message),
error_type=error.__class__.__name__,
context=str(error.context),
inner_error_type=str(error.inner_error_type),
inner_error_message=str(error.inner_error),
blocks_errors=[
WorkflowBlockError(
block_id=error.block_id,
block_type=error.block_type,
),
],
)
content = _build_step_execution_error_response(error)
resp = JSONResponse(
status_code=500,
content=content.model_dump(),
Expand Down Expand Up @@ -539,6 +593,10 @@ async def wrapped_route(*args, **kwargs):
blocks_errors=error.blocks_errors,
)
resp = JSONResponse(status_code=400, content=content.model_dump())
except DynamicBlockCodeError as error:
logger.exception("%s: %s", type(error).__name__, error)
content = _build_dynamic_block_code_error_response(error)
resp = JSONResponse(status_code=400, content=content.model_dump())
except (
WorkflowDefinitionError,
ReferenceTypeError,
Expand Down Expand Up @@ -756,19 +814,7 @@ async def wrapped_route(*args, **kwargs):
)
except StepExecutionError as error:
logger.exception("%s: %s", type(error).__name__, error)
content = WorkflowErrorResponse(
message=str(error.public_message),
error_type=error.__class__.__name__,
context=str(error.context),
inner_error_type=str(error.inner_error_type),
inner_error_message=str(error.inner_error),
blocks_errors=[
WorkflowBlockError(
block_id=error.block_id,
block_type=error.block_type,
),
],
)
content = _build_step_execution_error_response(error)
resp = JSONResponse(
status_code=500,
content=content.model_dump(),
Expand Down
28 changes: 26 additions & 2 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@
from inference.core.utils.container import is_docker_socket_mounted
from inference.core.utils.notebooks import start_notebook
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.errors import WorkflowError, WorkflowSyntaxError
from inference.core.workflows.errors import (
WorkflowBlockError,
WorkflowError,
WorkflowSyntaxError,
)
from inference.core.workflows.execution_engine.core import (
ExecutionEngine,
get_available_versions,
Expand Down Expand Up @@ -1741,10 +1745,30 @@ async def initialise_webrtc_worker(
)
if worker_result.exception_type is not None:
if worker_result.exception_type == "WorkflowSyntaxError":
# Reconstruct exception from serialized worker result.
# We dynamically create an exception class to preserve
# the original type name (e.g., "ValidationError") for
# the inner_error_type property, since exceptions can't
# be pickled across the worker process boundary.
inner_error = None
if worker_result.inner_error and worker_result.inner_error_type:
inner_error = type(
worker_result.inner_error_type,
(Exception,),
{},
)(worker_result.inner_error)

blocks_errors = None
if worker_result.blocks_errors:
blocks_errors = [
WorkflowBlockError(**be)
for be in worker_result.blocks_errors
]
raise WorkflowSyntaxError(
public_message=worker_result.error_message,
context=worker_result.error_context,
inner_error=worker_result.inner_error,
inner_error=inner_error,
blocks_errors=blocks_errors,
)
if worker_result.exception_type == "WorkflowError":
raise WorkflowError(
Expand Down
2 changes: 2 additions & 0 deletions inference/core/interfaces/webrtc_worker/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class WebRTCWorkerResult(BaseModel):
error_message: Optional[str] = None
error_context: Optional[str] = None
inner_error: Optional[str] = None
inner_error_type: Optional[str] = None
blocks_errors: Optional[List[Dict[str, Any]]] = None


class StreamOutputMode(str, Enum):
Expand Down
30 changes: 22 additions & 8 deletions inference/core/interfaces/webrtc_worker/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ async def init_rtc_peer_connection_with_loop(
NotImplementedError,
) as error:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
send_answer(
WebRTCWorkerResult(
exception_type=error.__class__.__name__,
Expand All @@ -940,7 +941,8 @@ async def init_rtc_peer_connection_with_loop(
return
except WebRTCConfigurationError as error:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
send_answer(
WebRTCWorkerResult(
exception_type=error.__class__.__name__,
Expand All @@ -950,7 +952,8 @@ async def init_rtc_peer_connection_with_loop(
return
except RoboflowAPINotAuthorizedError:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
send_answer(
WebRTCWorkerResult(
exception_type=RoboflowAPINotAuthorizedError.__name__,
Expand All @@ -960,7 +963,8 @@ async def init_rtc_peer_connection_with_loop(
return
except RoboflowAPINotNotFoundError:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
send_answer(
WebRTCWorkerResult(
exception_type=RoboflowAPINotNotFoundError.__name__,
Expand All @@ -970,19 +974,29 @@ async def init_rtc_peer_connection_with_loop(
return
except WorkflowSyntaxError as error:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
blocks_errors_serialized = None
if error.blocks_errors:
blocks_errors_serialized = [
block_error.model_dump() for block_error in error.blocks_errors
]

send_answer(
WebRTCWorkerResult(
exception_type=WorkflowSyntaxError.__name__,
error_message=str(error),
error_message=error.public_message,
error_context=str(error.context),
inner_error=str(error.inner_error),
inner_error=str(error.inner_error) if error.inner_error else None,
inner_error_type=error.inner_error_type,
blocks_errors=blocks_errors_serialized,
)
)
return
except WorkflowError as error:
# heartbeat to indicate caller error
heartbeat_callback()
if heartbeat_callback:
heartbeat_callback()
send_answer(
WebRTCWorkerResult(
exception_type=WorkflowError.__name__,
Expand Down
37 changes: 37 additions & 0 deletions inference/core/workflows/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
from pydantic import BaseModel, Field


class BlockTraceback(BaseModel):
traceback: Optional[str] = None
error_line: Optional[int] = None
code_snippet: Optional[str] = None
stdout: Optional[str] = None
stderr: Optional[str] = None


class WorkflowBlockError(BaseModel):
block_id: Optional[str] = None
block_type: Optional[str] = None
block_details: Optional[str] = None
property_name: Optional[str] = None
property_details: Optional[str] = None
block_traceback: Optional[BlockTraceback] = None


class WorkflowError(Exception):
Expand Down Expand Up @@ -152,6 +161,32 @@ class WorkflowExecutionEngineError(WorkflowError):
pass


class DynamicBlockCodeError(WorkflowExecutionEngineError):
"""Exception for dynamic block code execution errors (errors provoked by user's code)."""

def __init__(
self,
public_message: str,
context: str = "dynamic_block_code_execution",
inner_error: Optional[Exception] = None,
block_type_name: Optional[str] = None,
error_line: Optional[int] = None,
code_snippet: Optional[str] = None,
traceback_str: Optional[str] = None,
stdout: Optional[str] = None,
stderr: Optional[str] = None,
):
super().__init__(
public_message=public_message, context=context, inner_error=inner_error
)
self.block_type_name = block_type_name
self.error_line = error_line
self.code_snippet = code_snippet
self.traceback_str = traceback_str
self.stdout = stdout
self.stderr = stderr


class NotSupportedExecutionEngineError(WorkflowExecutionEngineError):
pass

Expand All @@ -165,12 +200,14 @@ def __init__(
self,
block_id: str,
block_type: str,
block_traceback: Optional[BlockTraceback] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.block_id = block_id
self.block_type = block_type
self.block_traceback = block_traceback


class ClientCausedStepExecutionError(WorkflowExecutionEngineError):
Expand Down
Loading
Loading