Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion langgraph.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"chat_with_tools_agent": "template_langgraph.agents.chat_with_tools_agent.agent:graph",
"kabuto_helpdesk_agent": "template_langgraph.agents.kabuto_helpdesk_agent.agent:graph",
"issue_formatter_agent": "template_langgraph.agents.issue_formatter_agent.agent:graph",
"task_decomposer_agent": "template_langgraph.agents.task_decomposer_agent.agent:graph"
"task_decomposer_agent": "template_langgraph.agents.task_decomposer_agent.agent:graph",
"news_summarizer_agent": "template_langgraph.agents.news_summarizer_agent.agent:graph"
},
"env": ".env"
}
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ environment = { python-version = "3.10" }
unknown-argument = "ignore"
invalid-parameter-default = "ignore"
non-subscriptable = "ignore"
possibly-unbound-attribute = "ignore"
unresolved-attribute = "ignore"
invalid-argument-type = "ignore"
66 changes: 66 additions & 0 deletions scripts/agent_operator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
from uuid import uuid4

import typer
from dotenv import load_dotenv

from template_langgraph.agents.chat_with_tools_agent.agent import graph as chat_with_tools_agent_graph
from template_langgraph.agents.issue_formatter_agent.agent import graph as issue_formatter_agent_graph
from template_langgraph.agents.kabuto_helpdesk_agent.agent import graph as kabuto_helpdesk_agent_graph
from template_langgraph.agents.news_summarizer_agent.agent import (
graph as news_summarizer_agent_graph,
)
from template_langgraph.agents.task_decomposer_agent.agent import graph as task_decomposer_agent_graph
from template_langgraph.loggers import get_logger

Expand All @@ -28,6 +32,8 @@ def get_agent_graph(name: str):
return task_decomposer_agent_graph
elif name == "kabuto_helpdesk_agent":
return kabuto_helpdesk_agent_graph
elif name == "news_summarizer_agent":
return news_summarizer_agent_graph
else:
raise ValueError(f"Unknown agent name: {name}")

Expand Down Expand Up @@ -90,6 +96,10 @@ def run(
if verbose:
logger.setLevel(logging.DEBUG)

assert name not in [
"news_summarizer_agent",
], f"{name} is not supported. Please use another agent."

graph = get_agent_graph(name)
for event in graph.stream(
input={
Expand All @@ -105,6 +115,62 @@ def run(
logger.info(f"Event: {event}")


@app.command()
def news_summarizer_agent(
request: str = typer.Option(
"Please summarize the latest news articles in Japanese briefly in 3 sentences.",
"--request",
"-r",
help="Request to the agent",
),
urls: str = typer.Option(
"https://example.com/article1,https://example.com/article2",
"--urls",
"-u",
help="Comma-separated list of URLs to summarize",
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Enable verbose output",
),
):
from template_langgraph.agents.news_summarizer_agent.models import (
AgentInputState,
AgentOutputState,
AgentState,
)

# Set up logging
if verbose:
logger.setLevel(logging.DEBUG)

graph = news_summarizer_agent_graph
for event in graph.stream(
input=AgentState(
input=AgentInputState(
request=request,
request_id=str(uuid4()),
urls=urls.split(",") if urls else [],
),
output=AgentOutputState(
result="N/A",
articles=[],
),
target_url_index=None,
)
):
logger.info("-" * 20)
logger.info(f"Event: {event}")

output: AgentOutputState = event["notify"]["output"]
for article in output.articles:
logger.info(article.url)
logger.info(f"is_valid_url: {article.is_valid_url}, is_valid_content: {article.is_valid_content}")
logger.info(article.structured_article.model_dump_json(indent=2))


if __name__ == "__main__":
load_dotenv(
override=True,
Expand Down
1 change: 1 addition & 0 deletions scripts/test_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ AGENT_NAMES=(
"issue_formatter_agent"
"kabuto_helpdesk_agent"
"task_decomposer_agent"
"news_summarizer_agent"
)
for AGENT_NAME in "${AGENT_NAMES[@]}"; do
uv run python scripts/agent_operator.py png --name "$AGENT_NAME" --verbose --output "generated/${AGENT_NAME}.png"
Expand Down
Empty file.
181 changes: 181 additions & 0 deletions template_langgraph/agents/news_summarizer_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import httpx
from langgraph.graph import StateGraph
from langgraph.types import Send

from template_langgraph.agents.news_summarizer_agent.models import AgentState, Article, StructuredArticle
from template_langgraph.llms.azure_openais import AzureOpenAiWrapper
from template_langgraph.loggers import get_logger

logger = get_logger(__name__)


class MockNotifier:
def notify(self, request_id: str, body: dict) -> None:
"""Simulate sending a notification to the user."""
logger.info(f"Notification sent for request {request_id}: {body}")


class MockScraper:
def scrape(self, url: str) -> str:
"""Simulate scraping a web page."""
return "<html><body><h1>Mocked web content</h1></body></html>"


class HttpxScraper:
def scrape(self, url: str) -> str:
"""Retrieve the HTML content of a web page."""
with httpx.Client() as client:
response = client.get(url)
response.raise_for_status()
return response.text


class MockSummarizer:
def summarize(
self,
prompt: str,
content: str,
) -> StructuredArticle:
"""Simulate summarizing the input."""
return StructuredArticle(
title="Mocked Title",
date="2023-01-01",
summary=f"Mocked summary of the content: {content}, prompt: {prompt}",
keywords=["mock", "summary"],
score=75,
)


class LlmSummarizer:
def __init__(self, llm=AzureOpenAiWrapper().chat_model):
self.llm = llm

def summarize(
self,
prompt: str,
content: str,
) -> StructuredArticle:
"""Use the LLM to summarize the input."""
logger.info(f"Summarizing input with LLM: {prompt}")
return self.llm.with_structured_output(StructuredArticle).invoke(
input=[
{"role": "system", "content": prompt},
{"role": "user", "content": content},
]
)


class NewsSummarizerAgent:
def __init__(
self,
llm=AzureOpenAiWrapper().chat_model,
notifier=MockNotifier(),
scraper=MockScraper(),
summarizer=MockSummarizer(),
):
self.llm = llm
self.notifier = notifier
self.scraper = scraper
self.summarizer = summarizer

def create_graph(self):
"""Create the main graph for the agent."""
# Create the workflow state graph
workflow = StateGraph(AgentState)

# Create nodes
workflow.add_node("initialize", self.initialize)
workflow.add_node("fetch_web_content", self.fetch_web_content)
workflow.add_node("notify", self.notify)

# Create edges
workflow.set_entry_point("initialize")
workflow.add_conditional_edges(
source="initialize",
path=self.run_subtasks,
)
workflow.add_edge("fetch_web_content", "notify")
workflow.set_finish_point("notify")
return workflow.compile(
name=NewsSummarizerAgent.__name__,
)

def initialize(self, state: AgentState) -> AgentState:
"""Initialize the agent state."""
logger.info(f"Initializing state: {state}")
# FIXME: retrieve urls from user request
return state

def run_subtasks(self, state: AgentState) -> list[Send]:
"""Run the subtasks for the agent."""
logger.info(f"Running subtasks with state: {state}")
return [
Send(
node="fetch_web_content",
arg=AgentState(
input=state.input,
output=state.output,
target_url_index=idx,
),
)
for idx, _ in enumerate(state.input.urls)
]

def fetch_web_content(self, state: AgentState):
url: str = state.input.urls[state.target_url_index]
is_valid_url = url.startswith("http")
is_valid_content = False
content = ""

# Check if the URL is valid
if not is_valid_url:
logger.error(f"Invalid URL: {url}")
is_valid_content = False
else:
# Scrape the web content
try:
logger.info(f"Scraping URL: {url}")
content = self.scraper.scrape(url)
is_valid_content = True
except httpx.RequestError as e:
logger.error(f"Error fetching web content: {e}")

if is_valid_content:
logger.info(f"Summarizing content with LLM @ {state.target_url_index}: {url}")
structured_article: StructuredArticle = self.summarizer.summarize(
prompt=state.input.request,
content=content,
)
state.output.articles.append(
Article(
is_valid_url=is_valid_url,
is_valid_content=is_valid_content,
content=content,
url=url,
structured_article=structured_article,
),
)

def notify(self, state: AgentState) -> AgentState:
"""Send notifications to the user."""
logger.info(f"Sending notifications with state: {state}")
# Simulate sending notifications
# convert list of articles to a dictionary for notification
summary = {}
for i, article in enumerate(state.output.articles):
summary[i] = article.model_dump()
self.notifier.notify(
request_id=state.input.request_id,
body=summary,
)
return state


# For testing
# graph = NewsSummarizerAgent().create_graph()

graph = NewsSummarizerAgent(
notifier=MockNotifier(),
scraper=HttpxScraper(),
summarizer=LlmSummarizer(),
).create_graph()
33 changes: 33 additions & 0 deletions template_langgraph/agents/news_summarizer_agent/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pydantic import BaseModel, Field


class StructuredArticle(BaseModel):
title: str = Field(..., description="Title of the article")
date: str = Field(..., description="Publication date of the article")
summary: str = Field(..., description="Summary of the article")
keywords: list[str] = Field(..., description="Keywords extracted from the article")
score: int = Field(..., description="Score of the article based on user request from 0 to 100")


class Article(BaseModel):
is_valid_url: bool = Field(..., description="Indicates if the article URL is valid")
is_valid_content: bool = Field(..., description="Indicates if the article content is valid")
content: str = Field(..., description="Original content of the article")
url: str = Field(..., description="URL of the article")
structured_article: StructuredArticle = Field(..., description="Structured representation of the article")


class AgentInputState(BaseModel):
request: str = Field(..., description="Request from the user")
request_id: str = Field(..., description="Unique identifier for the request")
urls: list[str] = Field(..., description="List of article URLs")


class AgentOutputState(BaseModel):
articles: list[Article] = Field(..., description="List of articles processed by the agent")


class AgentState(BaseModel):
input: AgentInputState = Field(..., description="Input state for the agent")
output: AgentOutputState = Field(..., description="Output state for the agent")
target_url_index: int | None = Field(..., description="Index of the target URL being processed")
Loading