-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathagent.py
More file actions
160 lines (133 loc) · 6.18 KB
/
agent.py
File metadata and controls
160 lines (133 loc) · 6.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import re
import random
import logging
import aiohttp
from datetime import datetime
from typing import Annotated
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import AgentCallContext, VoicePipelineAgent
from livekit.rtc import ParticipantKind
from livekit.plugins import openai, deepgram, elevenlabs, silero, turn_detector
load_dotenv(dotenv_path=".env.local")
logger = logging.getLogger("voice-agent")
class AssistantFnc(llm.FunctionContext):
# the llm.ai_callable decorator marks this function as a tool available to the LLM
# by default, it'll use the docstring as the function's description
@llm.ai_callable()
async def get_weather(
self,
# by using the Annotated type, arg description and type are available to the LLM
location: Annotated[
str, llm.TypeInfo(description="The location to get the weather for")
],
):
"""Called when the user asks about the weather. This function will return the weather for the given location."""
# Clean the location string of special characters
location = re.sub(r"[^a-zA-Z0-9]+", " ", location).strip()
# When a function call is running, there are a couple of options to inform the user
# that it might take awhile:
# Option 1: you can use .say filler message immediately after the call is triggered
# Option 2: you can prompt the agent to return a text response when it's making a function call
agent = AgentCallContext.get_current().agent
if (
not agent.chat_ctx.messages
or agent.chat_ctx.messages[-1].role != "assistant"
):
# skip if assistant already said something
filler_messages = [
"Let me check the weather in {location} for you.",
"Let me see what the weather is like in {location} right now.",
# LLM will complete this sentence if it is added to the end of the chat context
"The current weather in {location} is ",
]
message = random.choice(filler_messages).format(location=location)
logger.info(f"saying filler message: {message}")
# NOTE: set add_to_chat_ctx=True will add the message to the end
# of the chat context of the function call for answer synthesis
speech_handle = await agent.say(message, add_to_chat_ctx=True) # noqa: F841
logger.info(f"getting weather for {location}")
url = f"https://wttr.in/{location}?format=%C+%t"
weather_data = ""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
weather_data = (f"The weather in {location} is {await response.text()}.")
logger.info(f"weather data: {weather_data}")
else:
raise f"Failed to get weather data, status code: {response.status}"
# (optional) To wait for the speech to finish before giving results of the function call
# await speech_handle.join()
return weather_data
@llm.ai_callable()
def get_time(self):
"""called to retrieve the current local time"""
return datetime.now().strftime("%H:%M:%S")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation. "
"You were created as a demo to showcase the capabilities of LiveKit's agents framework."
),
)
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# Wait for the first participant to connect
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
logger.info(f"participant.name: {participant.name}")
logger.info(f"participant.attributes: {participant.attributes}")
dg_model = "nova-2-general"
if participant.kind == ParticipantKind.PARTICIPANT_KIND_SIP:
# use a model optimized for telephony
dg_model = "nova-2-phonecall"
# This project is configured to use Deepgram STT, OpenAI LLM and ElevenLabs TTS plugins
# Other providers exist like Cerebras, Cartesia, Groq, Play.ht, Rime, and more
# Learn more and pick the best one for your app:
# https://docs.livekit.io/agents/plugins
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(model=dg_model),
llm=openai.LLM(model="gpt-4o-mini"),
tts=elevenlabs.TTS(),
turn_detector=turn_detector.EOUModel(),
# minimum delay for endpointing, used when turn detector believes the user is done with their turn
min_endpointing_delay=0.5,
# maximum delay for endpointing, used when turn detector does not believe the user is done with their turn
max_endpointing_delay=5.0,
chat_ctx=initial_ctx,
fnc_ctx=AssistantFnc(),
)
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def on_metrics_collected(agent_metrics: metrics.AgentMetrics):
metrics.log_metrics(agent_metrics)
usage_collector.collect(agent_metrics)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Summary Usage: {summary}")
# At shutdown, generate and log the summary from the usage collector
ctx.add_shutdown_callback(log_usage)
agent.start(ctx.room, participant)
# The agent greeting when the user joins
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
agent_name="inbound-agent",
),
)