forked from nicknochnack/ACPWalkthrough
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path2. CrewAI via Server.py
More file actions
63 lines (53 loc) · 2.03 KB
/
2. CrewAI via Server.py
File metadata and controls
63 lines (53 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from crewai import Crew, Task, Agent, LLM
from crewai_tools import RagTool
from collections.abc import AsyncGenerator
from acp_sdk.models import Message, MessagePart
from acp_sdk.server import Context, RunYield, RunYieldResume, Server
llm = LLM(model="ollama_chat/qwen2.5:14b", base_url="http://localhost:11434", max_tokens=8192)
config = {
"llm": {
"provider": "ollama",
"config": {
"model": "qwen2.5:14b",
}
},
"embedding_model": {
"provider": "ollama",
"config": {
"model": "all-minilm:latest"
}
}
}
rag_tool = RagTool(config=config)
rag_tool.add("./data/gold-hospital-and-premium-extras.pdf", data_type="pdf_file")
insurance_agent = Agent(
role="Senior Insurance Coverage Assistant",
goal="Determine whether something is covered or not",
backstory="""You are an expert insurance agent designed to assist with coverage queries. Whenever you call the knowledge base tool the tool input must follow the pattern
{query: 'your query', kwargs:{'}}"
""",
verbose=True,
allow_delegation=False,
llm=llm,
tools=[rag_tool],
max_retry_limit=5
)
rag_tool
import logging
logger = logging.getLogger(__name__)
server = Server()
@server.agent()
async def policy_agent(input: list[Message], context: Context) -> AsyncGenerator[RunYield, RunYieldResume]:
"This is an agent for questions around policy coverage, it uses a RAG pattern to find answers based on policy documentation. Use it to help answer questions on coverage and waiting periods."
task1 = Task(
description=input[0].parts[0].content,
expected_output = "A comprehensive response as to the users question",
agent=insurance_agent
)
crew = Crew(agents=[insurance_agent], tasks=[task1], verbose=True)
task_output = await crew.kickoff_async()
logger.info("Task completed successfully")
logger.info(task_output)
yield Message(parts=[MessagePart(content=str(task_output))])
if __name__ == "__main__":
server.run(port=8001)