Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All necessary details are provided in the comments at the top of each script.
| [Multimodal Prompt with PDF Input](/examples/core/prompt/multimodal_with_pdf.py) | [ragbits-core](/packages/ragbits-core) | Example of how to use the `Prompt` class to answer the question using an LLM with both text and PDF inputs. |
| [Multimodal Prompt with Few Shots](/examples/core/prompt/multimodal_with_few_shots.py) | [ragbits-core](/packages/ragbits-core) | Example of how to use the `Prompt` class to generate themed text using an LLM with multimodal inputs and few-shot examples. |
| [Tool Use with LLM](/examples/core/llms/tool_use.py) | [ragbits-core](/packages/ragbits-core) | Example of how to provide tools and return tool calls from LLM. |
| [Reasoning with LLM](/examples/core/llms/reasoning.py) | [ragbits-core](/packages/ragbits-core) | Example of how to use reasoning with LLM. |
| [OpenTelemetry Audit](/examples/core/audit/otel.py) | [ragbits-core](/packages/ragbits-core) | Example of how to collect traces and metrics using Ragbits audit module with OpenTelemetry. |
| [Logfire Audit](/examples/core/audit/logfire_.py) | [ragbits-core](/packages/ragbits-core) | Example of how to collect traces and metrics using Ragbits audit module with Logfire. |
| [Basic Document Search](/examples/document-search/basic.py) | [ragbits-document-search](/packages/ragbits-document-search) | Example of how to use the `DocumentSearch` class to search for documents with the `InMemoryVectorStore` class to store the embeddings. |
Expand All @@ -38,6 +39,9 @@ All necessary details are provided in the comments at the top of each script.
| [Recontextualize Last Message](/examples/chat/recontextualize_message.py) | [ragbits-chat](/packages/ragbits-chat) | Example of how to use the `StandaloneMessageCompressor` compressor to recontextualize the last message in a conversation history. |
| [Agents Tool Use](/examples/agents/tool_use.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use agent with tools. |
| [Agents OpenAI Native Tool Use](/examples/agents/openai_native_tool_use.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use agent with OpenAI native tools. |
| [Agents Post Processors](/examples/agents/post_processors.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use post-processors with agent. |
| [Agents CLI](/examples/agents/cli_agent.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use agent in CLI. |
| [MCP Local](/examples/agents/mcp/local.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use the `Agent` class to connect with a local MCP server. |
| [MCP SSE](/examples/agents/mcp/sse.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use the `Agent` class to connect with a remote MCP server via SSE. |
| [MCP Streamable HTTP](/examples/agents/mcp/streamable_http.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to use the `Agent` class to connect with a remote MCP server via HTTP. |
| [A2A Orchestration](/examples/agents/a2a/run_orchestrator.py) | [ragbits-agents](/packages/ragbits-agents) | Example of how to setup A2A orchestration. |
108 changes: 108 additions & 0 deletions examples/agents/post_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Ragbits Agents Example: Post-Processors

This example demonstrates how to use post-processors with Agent.run() and Agent.run_streaming() methods.

To run the script, execute the following command:

```bash
uv run examples/agents/post_processors.py
```
"""

# /// script
# requires-python = ">=3.10"
# dependencies = [
# "ragbits-core",
# "ragbits-agents",
# ]
# ///

import asyncio
from types import SimpleNamespace

from ragbits.agents import Agent, AgentResult, BasePostProcessor, ToolCallResult
from ragbits.core.llms.base import BasePrompt, ToolCall, Usage
from ragbits.core.llms.litellm import LiteLLM


class CustomStreamingProcessor(BasePostProcessor):
"""
Streaming post-processor that checks for forbidden words.
"""

def __init__(self, forbidden_words: list[str]) -> None:
self.forbidden_words = forbidden_words

@property
def supports_streaming(self) -> bool:
"""
Whether this post-processor supports streaming mode.
"""
return True

async def process_streaming(
self, chunk: str | ToolCall | ToolCallResult | SimpleNamespace | BasePrompt | Usage, agent: "Agent"
) -> str | ToolCall | ToolCallResult | SimpleNamespace | BasePrompt | Usage:
"""
Process chunks during streaming.
"""
if isinstance(chunk, str) and chunk.lower().strip() in self.forbidden_words:
return "[FORBIDDEN_WORD]"
return chunk


class CustomNonStreamingProcessor(BasePostProcessor):
"""
Non-streaming post-processor that truncates the content.
"""

def __init__(self, max_length: int = 200) -> None:
self.max_length = max_length

@property
def supports_streaming(self) -> bool:
"""
Whether this post-processor supports streaming mode.
"""
return False

async def process(self, result: "AgentResult", agent: "Agent") -> "AgentResult":
"""
Process the agent result.
"""
content = result.content
content_length = len(content)

if content_length > self.max_length:
content = content[: self.max_length]
content += f"... [TRUNCATED] ({content_length} > {self.max_length} chars)"

return AgentResult(
content=content,
metadata=result.metadata,
tool_calls=result.tool_calls,
history=result.history,
usage=result.usage,
)


async def main() -> None:
"""
Run the example.
"""
llm = LiteLLM("gpt-3.5-turbo")
agent = Agent(llm=llm, prompt="You are a helpful assistant.")
post_processors = [
CustomStreamingProcessor(forbidden_words=["python"]),
CustomNonStreamingProcessor(max_length=200),
]
stream_result = agent.run_streaming("What is Python?", post_processors=post_processors, allow_non_streaming=True)
async for chunk in stream_result:
if isinstance(chunk, str):
print(chunk, end="")
print(f"\nFinal answer:\n{stream_result.content}")


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions packages/ragbits-agents/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Support wrapping downstream agents as tools (#818)
- Add post-processors (#821)

## 1.3.0 (2025-09-11)
### Changed
Expand Down
2 changes: 2 additions & 0 deletions packages/ragbits-agents/src/ragbits/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AgentResult,
AgentResultStreaming,
AgentRunContext,
BasePostProcessor,
ToolCallResult,
)
from ragbits.agents.types import QuestionAnswerAgent, QuestionAnswerPromptInput, QuestionAnswerPromptOutput
Expand All @@ -16,6 +17,7 @@
"AgentResult",
"AgentResultStreaming",
"AgentRunContext",
"BasePostProcessor",
"QuestionAnswerAgent",
"QuestionAnswerPromptInput",
"QuestionAnswerPromptOutput",
Expand Down
Loading
Loading