Skip to content

Commit 61ad1fb

Browse files
feat: add support for llm message interceptor hooks
1 parent 54710a8 commit 61ad1fb

22 files changed

+2421
-35
lines changed

docs/en/concepts/llms.mdx

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,52 @@ Learn how to get the most out of your LLM configuration:
12001200
)
12011201
```
12021202
</Accordion>
1203+
1204+
<Accordion title="Transport Interceptors">
1205+
CrewAI provides message interceptors for several providers, allowing you to hook into request/response cycles at the transport layer.
1206+
1207+
**Supported Providers:**
1208+
- ✅ OpenAI
1209+
- ✅ Anthropic
1210+
1211+
**Basic Usage:**
1212+
```python
1213+
import httpx
1214+
from crewai import LLM
1215+
from crewai.llms.hooks import BaseInterceptor
1216+
1217+
class CustomInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
1218+
"""Custom interceptor to modify requests and responses."""
1219+
1220+
def on_outbound(self, request: httpx.Request) -> httpx.Request:
1221+
"""Print request before sending to the LLM provider."""
1222+
print(request)
1223+
return request
1224+
1225+
def on_inbound(self, response: httpx.Response) -> httpx.Response:
1226+
"""Process response after receiving from the LLM provider."""
1227+
print(f"Status: {response.status_code}")
1228+
print(f"Response time: {response.elapsed}")
1229+
return response
1230+
1231+
# Use the interceptor with an LLM
1232+
llm = LLM(
1233+
model="openai/gpt-4o",
1234+
interceptor=CustomInterceptor()
1235+
)
1236+
```
1237+
1238+
**Important Notes:**
1239+
- Both methods must return the received object or type of object.
1240+
- Modifying received objects may result in unexpected behavior or application crashes.
1241+
- Not all providers support interceptors - check the supported providers list above
1242+
1243+
<Info>
1244+
Interceptors operate at the transport layer. This is particularly useful for:
1245+
- Message transformation and filtering
1246+
- Debugging API interactions
1247+
</Info>
1248+
</Accordion>
12031249
</AccordionGroup>
12041250

12051251
## Common Issues and Solutions

lib/crewai/src/crewai/llm.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121

2222
from dotenv import load_dotenv
23+
import httpx
2324
from pydantic import BaseModel, Field
2425
from typing_extensions import Self
2526

@@ -53,6 +54,7 @@
5354
from litellm.utils import supports_response_schema
5455

5556
from crewai.agent.core import Agent
57+
from crewai.llms.hooks.base import BaseInterceptor
5658
from crewai.task import Task
5759
from crewai.tools.base_tool import BaseTool
5860
from crewai.utilities.types import LLMMessage
@@ -334,6 +336,8 @@ def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM:
334336
return cast(
335337
Self, native_class(model=model_string, provider=provider, **kwargs)
336338
)
339+
except NotImplementedError:
340+
raise
337341
except Exception as e:
338342
raise ImportError(f"Error importing native provider: {e}") from e
339343

@@ -403,6 +407,7 @@ def __init__(
403407
callbacks: list[Any] | None = None,
404408
reasoning_effort: Literal["none", "low", "medium", "high"] | None = None,
405409
stream: bool = False,
410+
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
406411
**kwargs: Any,
407412
) -> None:
408413
"""Initialize LLM instance.
@@ -442,6 +447,7 @@ def __init__(
442447
self.additional_params = kwargs
443448
self.is_anthropic = self._is_anthropic_model(model)
444449
self.stream = stream
450+
self.interceptor = interceptor
445451

446452
litellm.drop_params = True
447453

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Interceptor contracts for crewai"""
2+
3+
from crewai.llms.hooks.base import BaseInterceptor
4+
5+
6+
__all__ = ["BaseInterceptor"]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Base classes for LLM transport interceptors.
2+
3+
This module provides abstract base classes for intercepting and modifying
4+
outbound and inbound messages at the transport level.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from abc import ABC, abstractmethod
10+
from typing import Generic, TypeVar
11+
12+
13+
T = TypeVar("T")
14+
U = TypeVar("U")
15+
16+
17+
class BaseInterceptor(ABC, Generic[T, U]):
18+
"""Abstract base class for intercepting transport-level messages.
19+
20+
Provides hooks to intercept and modify outbound and inbound messages
21+
at the transport layer.
22+
23+
Type parameters:
24+
T: Outbound message type (e.g., httpx.Request)
25+
U: Inbound message type (e.g., httpx.Response)
26+
27+
Example:
28+
>>> class CustomInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
29+
... def on_outbound(self, message: httpx.Request) -> httpx.Request:
30+
... message.headers["X-Custom-Header"] = "value"
31+
... return message
32+
...
33+
... def on_inbound(self, message: httpx.Response) -> httpx.Response:
34+
... print(f"Status: {message.status_code}")
35+
... return message
36+
"""
37+
38+
@abstractmethod
39+
def on_outbound(self, message: T) -> T:
40+
"""Intercept outbound message before sending.
41+
42+
Args:
43+
message: Outbound message object.
44+
45+
Returns:
46+
Modified message object.
47+
"""
48+
...
49+
50+
@abstractmethod
51+
def on_inbound(self, message: U) -> U:
52+
"""Intercept inbound message after receiving.
53+
54+
Args:
55+
message: Inbound message object.
56+
57+
Returns:
58+
Modified message object.
59+
"""
60+
...
61+
62+
async def aon_outbound(self, message: T) -> T:
63+
"""Async version of on_outbound.
64+
65+
Args:
66+
message: Outbound message object.
67+
68+
Returns:
69+
Modified message object.
70+
"""
71+
raise NotImplementedError
72+
73+
async def aon_inbound(self, message: U) -> U:
74+
"""Async version of on_inbound.
75+
76+
Args:
77+
message: Inbound message object.
78+
79+
Returns:
80+
Modified message object.
81+
"""
82+
raise NotImplementedError
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""HTTP transport implementations for LLM request/response interception.
2+
3+
This module provides internal transport classes that integrate with BaseInterceptor
4+
to enable request/response modification at the transport level.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from typing import TYPE_CHECKING, Any
10+
11+
import httpx
12+
13+
14+
if TYPE_CHECKING:
15+
from crewai.llms.hooks.base import BaseInterceptor
16+
17+
18+
class HTTPTransport(httpx.HTTPTransport):
19+
"""HTTP transport that uses an interceptor for request/response modification.
20+
21+
This transport is used internally when a user provides a BaseInterceptor.
22+
Users should not instantiate this class directly - instead, pass an interceptor
23+
to the LLM client and this transport will be created automatically.
24+
"""
25+
26+
def __init__(
27+
self,
28+
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
29+
**kwargs: Any,
30+
) -> None:
31+
"""Initialize transport with interceptor.
32+
33+
Args:
34+
interceptor: HTTP interceptor for modifying raw request/response objects.
35+
**kwargs: Additional arguments passed to httpx.HTTPTransport.
36+
"""
37+
super().__init__(**kwargs)
38+
self.interceptor = interceptor
39+
40+
def handle_request(self, request: httpx.Request) -> httpx.Response:
41+
"""Handle request with interception.
42+
43+
Args:
44+
request: The HTTP request to handle.
45+
46+
Returns:
47+
The HTTP response.
48+
"""
49+
request = self.interceptor.on_outbound(request)
50+
response = super().handle_request(request)
51+
return self.interceptor.on_inbound(response)
52+
53+
54+
class AsyncHTTPransport(httpx.AsyncHTTPTransport):
55+
"""Async HTTP transport that uses an interceptor for request/response modification.
56+
57+
This transport is used internally when a user provides a BaseInterceptor.
58+
Users should not instantiate this class directly - instead, pass an interceptor
59+
to the LLM client and this transport will be created automatically.
60+
"""
61+
62+
def __init__(
63+
self,
64+
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
65+
**kwargs: Any,
66+
) -> None:
67+
"""Initialize async transport with interceptor.
68+
69+
Args:
70+
interceptor: HTTP interceptor for modifying raw request/response objects.
71+
**kwargs: Additional arguments passed to httpx.AsyncHTTPTransport.
72+
"""
73+
super().__init__(**kwargs)
74+
self.interceptor = interceptor
75+
76+
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
77+
"""Handle async request with interception.
78+
79+
Args:
80+
request: The HTTP request to handle.
81+
82+
Returns:
83+
The HTTP response.
84+
"""
85+
request = await self.interceptor.aon_outbound(request)
86+
response = await super().handle_async_request(request)
87+
return await self.interceptor.aon_inbound(response)

lib/crewai/src/crewai/llms/providers/anthropic/completion.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,30 @@
11
from __future__ import annotations
22

3-
43
import json
54
import logging
65
import os
7-
from typing import Any, cast
6+
from typing import TYPE_CHECKING, Any, cast
87

98
from pydantic import BaseModel
109

1110
from crewai.events.types.llm_events import LLMCallType
1211
from crewai.llms.base_llm import BaseLLM
12+
from crewai.llms.hooks.transport import HTTPTransport
1313
from crewai.utilities.agent_utils import is_context_length_exceeded
1414
from crewai.utilities.exceptions.context_window_exceeding_exception import (
1515
LLMContextLengthExceededError,
1616
)
1717
from crewai.utilities.types import LLMMessage
1818

1919

20+
if TYPE_CHECKING:
21+
from crewai.llms.hooks.base import BaseInterceptor
22+
2023
try:
2124
from anthropic import Anthropic
2225
from anthropic.types import Message
2326
from anthropic.types.tool_use_block import ToolUseBlock
27+
import httpx
2428
except ImportError:
2529
raise ImportError(
2630
'Anthropic native provider not available, to install: uv add "crewai[anthropic]"'
@@ -47,7 +51,8 @@ def __init__(
4751
stop_sequences: list[str] | None = None,
4852
stream: bool = False,
4953
client_params: dict[str, Any] | None = None,
50-
**kwargs,
54+
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
55+
**kwargs: Any,
5156
):
5257
"""Initialize Anthropic chat completion client.
5358
@@ -63,13 +68,15 @@ def __init__(
6368
stop_sequences: Stop sequences (Anthropic uses stop_sequences, not stop)
6469
stream: Enable streaming responses
6570
client_params: Additional parameters for the Anthropic client
71+
interceptor: HTTP interceptor for modifying requests/responses at transport level.
6672
**kwargs: Additional parameters
6773
"""
6874
super().__init__(
6975
model=model, temperature=temperature, stop=stop_sequences or [], **kwargs
7076
)
7177

7278
# Client params
79+
self.interceptor = interceptor
7380
self.client_params = client_params
7481
self.base_url = base_url
7582
self.timeout = timeout
@@ -102,6 +109,11 @@ def _get_client_params(self) -> dict[str, Any]:
102109
"max_retries": self.max_retries,
103110
}
104111

112+
if self.interceptor:
113+
transport = HTTPTransport(interceptor=self.interceptor)
114+
http_client = httpx.Client(transport=transport)
115+
client_params["http_client"] = http_client # type: ignore[assignment]
116+
105117
if self.client_params:
106118
client_params.update(self.client_params)
107119

@@ -110,7 +122,7 @@ def _get_client_params(self) -> dict[str, Any]:
110122
def call(
111123
self,
112124
messages: str | list[LLMMessage],
113-
tools: list[dict] | None = None,
125+
tools: list[dict[str, Any]] | None = None,
114126
callbacks: list[Any] | None = None,
115127
available_functions: dict[str, Any] | None = None,
116128
from_task: Any | None = None,
@@ -133,7 +145,7 @@ def call(
133145
try:
134146
# Emit call started event
135147
self._emit_call_started_event(
136-
messages=messages, # type: ignore[arg-type]
148+
messages=messages,
137149
tools=tools,
138150
callbacks=callbacks,
139151
available_functions=available_functions,
@@ -143,7 +155,7 @@ def call(
143155

144156
# Format messages for Anthropic
145157
formatted_messages, system_message = self._format_messages_for_anthropic(
146-
messages # type: ignore[arg-type]
158+
messages
147159
)
148160

149161
# Prepare completion parameters
@@ -181,7 +193,7 @@ def _prepare_completion_params(
181193
self,
182194
messages: list[LLMMessage],
183195
system_message: str | None = None,
184-
tools: list[dict] | None = None,
196+
tools: list[dict[str, Any]] | None = None,
185197
) -> dict[str, Any]:
186198
"""Prepare parameters for Anthropic messages API.
187199
@@ -218,7 +230,9 @@ def _prepare_completion_params(
218230

219231
return params
220232

221-
def _convert_tools_for_interference(self, tools: list[dict]) -> list[dict]:
233+
def _convert_tools_for_interference(
234+
self, tools: list[dict[str, Any]]
235+
) -> list[dict[str, Any]]:
222236
"""Convert CrewAI tool format to Anthropic tool use format."""
223237
anthropic_tools = []
224238

0 commit comments

Comments
 (0)