Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Templates included:
- `Crew/` — A Crew AI example that creates a small crew of agents to research news on a topic+date and produce interesting trivia snippets.
- `WebSearch/` - A simple LangGraph agent that uses a web-search tool.
- `KnowledgeBaseRAG/` - A simple LangGraph agent that queries your DigitalOcean built Knowledge Base.
- `Streaming_and_Tools/` - A LangGraph agent that uses a web-search tool and streams back responses

Each template directory includes a `main.py` (the ADK entrypoint) and a `requirements.txt`. See the README in each template for details and quickstart instructions:

Expand All @@ -17,5 +18,5 @@ Each template directory includes a `main.py` (the ADK entrypoint) and a `require
- `Crew/README.md` — details for the Crew AI trivia generator and its search tool.
- `WebSearch/README.md` — details for the WebSearch agent.
- `KnowledgeBaseRAG/README.md` — details for the Agent that queries your DigitalOcean Knowledge Base.

- `Streaming_and_Tools/README.md` - details for the agent that uses a search tool and streams responses.

3 changes: 3 additions & 0 deletions Streaming_and_Tools/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DIGITALOCEAN_INFERENCE_KEY=
TAVILY_API_KEY=
DIGITALOCEAN_API_TOKEN=
5 changes: 5 additions & 0 deletions Streaming_and_Tools/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Byte-compiled / optimized / DLL files
__pycache__/

# Environments
.env
3 changes: 3 additions & 0 deletions Streaming_and_Tools/.gradient/agent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
agent_environment: main
agent_name: streaming-agent-with-tool
entrypoint_file: main.py
130 changes: 130 additions & 0 deletions Streaming_and_Tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Streaming Agents and using Python Functions as Tools

This example demonstrates an ADK agent that streams responses and uses a tool that is a Python function. Using Langgraph's `tool` decorator, we can construct an agent that can execute python functions as a tool. In this example, we use the Tavily search libary to enable our agent to search the web.
The runtime builds a LangGraph `StateGraph` where the first node decides if tools need to be used. If so, the agent queries the tools and then passes along the results from the tool to a final answer node. If a tool is not required, the answer node responds directly.
```
+-----------+
| __start__ |
+-----------+
*
*
*
+---------------------+
| decision_model_call |
+---------------------+
.. ..
.. .
. ..
+-------+ .
| tools | ..
+-------+ .
** ..
** ..
* .
+-------------------+
| answer_model_call |
+-------------------+
*
*
*
+---------+
| __end__ |
+---------+
```

Additionally, this agent yields streaming outputs instead of providing an entire response. Enabling streaming responses from your agent can help improve the user experience by allowing users of the agent to start getting a response quickly without waiting for the entire response to complete.


## Quickstart

1. Create and activate a virtual environment.
2. Install dependencies:

pip install -r Templates/ToolCalling/requirements.txt

3. Set the required environment variables in the .env file (`TAVILY_API_KEY` and `DIGITALOCEAN_INFERENCE_KEY`). You can obtain a Tavily API key for free at [this link](https://app.tavily.com/home).

4. Set your DIGITALOCEAN_API_TOKEN via

```
export DIGITALOCEAN_API_TOKEN=<Your DigitalOcean API Token> # On MacOS/Linux

set DIGITALOCEAN_API_TOKEN=<Your DigitalOcean API Token> # On Windows
```

5. Run your agent locally via

`gradient agent run`

Since this agent produces streaming responses, you can invoke the agent via code to see the streaming

```python
import requests
import json

def stream_endpoint(url: str, body : dict, headers: dict = {}, chunk_size: int = 1024):
payload = json.dumps(body)
with requests.post(url, data = payload, headers=headers, stream=True) as resp:
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk: # filter keep-alive chunks
yield chunk

if __name__ == "__main__":
body = {
"prompt" : {
"messages" : "Who were winners and runners up of the women's 2025 cricket world cup?"
}
}

buffer = ""
for chunk in stream_endpoint("http://localhost:8080/run", body=body):
buffer += chunk.decode("utf-8")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if not line.strip():
continue
response = json.loads(line)
print(response["response"], end="", flush=True)

```

6. Change the name of the agent if you need to in `gradient/agent.yaml` and then deploy with

```
gradient agent deploy
```

You can the invoke the agent via the same script, just using your deployed agent's URL instead. Make sure you include your API key for authorization

```python
import requests
import json

def stream_endpoint(url: str, body : dict, headers: dict = {}, chunk_size: int = 1024):
payload = json.dumps(body)
with requests.post(url, data = payload, headers=headers, stream=True) as resp:
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk: # filter keep-alive chunks
yield chunk

if __name__ == "__main__":
body = {
"prompt" : {
"messages" : "Who were winners and runners up of the women's 2025 cricket world cup?"
}
}

headers = {'Authorization': 'Bearer <YOUR DIGITALOCEAN API KEY>'}

buffer = ""
for chunk in stream_endpoint('https://agents.do-ai.run/<DEPLOYED_AGENT_ID>/main/run', body=body, headers=headers):
buffer += chunk.decode("utf-8")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if not line.strip():
continue
response = json.loads(line)
print(response["response"], end="", flush=True)
```
145 changes: 145 additions & 0 deletions Streaming_and_Tools/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import os
import json
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langchain.messages import SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from typing import Dict, Literal
from gradient_adk import entrypoint
from tavily import TavilyClient
from langgraph.config import get_stream_writer

load_dotenv()

TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)


@tool
def search_tool(query: str) -> dict:
"""
A tool to search a query on the web using Tavily for detailed and up to date information.
If you need to look up recent information beyond your knowledge cutoff date, use this tool.
"""
response = tavily_client.search(query)
return response


# This function determines whether to branch to the tools node or the answer model node
def tools_branch_condition(
state,
messages_key: str = "messages",
) -> Literal["tools", "answer_model_call"]:
if isinstance(state, list):
ai_message = state[-1]
elif isinstance(state, dict) and (messages := state.get(messages_key, [])):
ai_message = messages[-1]
elif messages := getattr(state, messages_key, []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return "answer_model_call"


def build_graph():

# We have two models - one for the initial decision making (non-streaming), and one for the final answer (streaming)
decision_model = ChatOpenAI(
model="openai-gpt-4.1",
base_url="https://inference.do-ai.run/v1",
api_key=os.getenv("DIGITALOCEAN_INFERENCE_KEY"),
)
answer_model = ChatOpenAI(
model="openai-gpt-4.1",
base_url="https://inference.do-ai.run/v1",
api_key=os.getenv("DIGITALOCEAN_INFERENCE_KEY"),
streaming=True,
)

tools = [search_tool]
decision_model = decision_model.bind_tools(tools)

async def decision_model_call(state: MessagesState):
messages = state["messages"]
system_prompt = SystemMessage(
"Determine if the user query is out of your scope of knowledge and requires a web search. If so, invoke the tool. Otherwise, simply repeat the original query so that the next model can answer it."
)
response = await decision_model.ainvoke([system_prompt] + messages)
return {"messages": response}

# In order to stream from the answer model, we use a stream writer to yield chunks as they arrive
# We store the streamed chunks in a custom key in the state dict, which we can then access in the entrypoint.
# For more information, see https://docs.langchain.com/oss/python/langgraph/streaming#llm-tokens
async def answer_model_call(state: MessagesState):
messages = state["messages"]
writer = get_stream_writer()
async for response in answer_model.astream(messages):
writer({"custom_llm_chunk": response})
return {"result": "completed"}

builder = StateGraph(MessagesState)
builder.add_node("decision_model_call", decision_model_call)
builder.add_node("tools", ToolNode(tools))
builder.add_node("answer_model_call", answer_model_call)
builder.add_edge(START, "decision_model_call")
builder.add_conditional_edges(
"decision_model_call",
tools_branch_condition,
)
builder.add_edge("tools", "answer_model_call")
builder.add_edge("answer_model_call", END)
graph = builder.compile()
return graph


AGENT_GRAPH = build_graph()


@entrypoint
async def main(input: Dict, context: Dict):
"""Entrypoint"""

input_request = input.get("prompt")

# As mentioned earlier, we stream the response from the answer model by accessing the custom key in the state dict
async for _, chunk in AGENT_GRAPH.astream(input_request, stream_mode=["custom"]):
chunk_data = chunk.get("custom_llm_chunk", None)
if chunk_data and chunk_data.content:
response_text = chunk_data.content
yield json.dumps({"response": response_text}) + "\n"


# # You can stream responses from this agent by sending a streaming request to the endpoint.
# import requests
# import json

# def stream_endpoint(url: str, body : dict, headers : dict = {}, chunk_size: int = 1024):
# payload = json.dumps(body)
# with requests.post(url, data = payload, headers=headers, stream=True) as resp:
# resp.raise_for_status()
# for chunk in resp.iter_content(chunk_size=chunk_size):
# if chunk: # filter keep-alive chunks
# yield chunk

# if __name__ == "__main__":
# body = {
# "prompt" : {
# "messages" : "Who were winners and runners up of the women's 2025 cricket world cup?"
# }
# }
#
# headers = {"Authorization": f"Bearer {os.getenv('DIGITALOCEAN_API_TOKEN')}"}

# buffer = ""
# for chunk in stream_endpoint(url, body=body, headers=headers):
# buffer += chunk.decode("utf-8")
# while "\n" in buffer:
# line, buffer = buffer.split("\n", 1)
# if not line.strip():
# continue
# response = json.loads(line)
# print(response["response"], end="", flush=True)
8 changes: 8 additions & 0 deletions Streaming_and_Tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
gradient-adk
langgraph
langchain
langchain-core
gradient
langchain-community
langchain_openai
tavily