1
+ import asyncio
2
+ import os
3
+ from dotenv import load_dotenv
4
+ from typing import List , Dict , Any
5
+ from openai import OpenAI
6
+ from llama_index .core .workflow import (
7
+ StartEvent ,
8
+ StopEvent ,
9
+ Workflow ,
10
+ step ,
11
+ Event ,
12
+ Context ,
13
+ )
14
+
15
+ # Load environment variables
16
+ load_dotenv ()
17
+ API_KEY = os .getenv ("SONAR_API_KEY" )
18
+
19
+ # Custom events for our workflow
20
+ class QueryEvent (Event ):
21
+ query : str
22
+
23
+ class ProcessQueryEvent (Event ):
24
+ query : str
25
+ conversation_history : List [Dict [str , str ]]
26
+
27
+ class GenerateResponseEvent (Event ):
28
+ query : str
29
+ conversation_history : List [Dict [str , str ]]
30
+ context : str
31
+
32
+ # Function to call Sonar API
33
+ def call_sonar_api (messages ):
34
+ client = OpenAI (api_key = API_KEY , base_url = "https://api.perplexity.ai" )
35
+ response = client .chat .completions .create (
36
+ model = "sonar-pro" ,
37
+ messages = messages ,
38
+ )
39
+ return response .choices [0 ].message .content
40
+
41
+ # Main workflow class
42
+ class SonarConversationWorkflow (Workflow ):
43
+ @step
44
+ async def start (self , ctx : Context , ev : StartEvent ) -> QueryEvent :
45
+ # Entry point - extract the query
46
+ return QueryEvent (query = ev .query )
47
+
48
+ @step
49
+ async def process_query (self , ctx : Context , ev : QueryEvent ) -> ProcessQueryEvent :
50
+ # Get conversation history from state or initialize new
51
+ conversation_history = await ctx .get ("conversation_history" , default = [])
52
+
53
+ # Add user query to history
54
+ conversation_history .append ({"role" : "user" , "content" : ev .query })
55
+
56
+ # Store updated history
57
+ await ctx .set ("conversation_history" , conversation_history )
58
+
59
+ return ProcessQueryEvent (
60
+ query = ev .query ,
61
+ conversation_history = conversation_history
62
+ )
63
+
64
+ @step
65
+ async def retrieve_context (self , ctx : Context , ev : ProcessQueryEvent ) -> GenerateResponseEvent :
66
+ # Get or initialize entities state
67
+ entities = await ctx .get ("entities" , default = {})
68
+
69
+ # Context generation from entities
70
+ context_parts = []
71
+ for entity_name , attributes in entities .items ():
72
+ attrs_str = ", " .join ([f"{ k } : { v } " for k , v in attributes .items ()])
73
+ context_parts .append (f"{ entity_name } : { attrs_str } " )
74
+
75
+ context = "\n " .join (context_parts ) if context_parts else "No prior context available."
76
+
77
+ return GenerateResponseEvent (
78
+ query = ev .query ,
79
+ conversation_history = ev .conversation_history ,
80
+ context = context
81
+ )
82
+
83
+ @step
84
+ async def generate_response (self , ctx : Context , ev : GenerateResponseEvent ) -> StopEvent :
85
+ # Prepare messages for API call
86
+ system_message = {
87
+ "role" : "system" ,
88
+ "content" : (
89
+ "You are a helpful AI assistant that maintains context across multiple questions. "
90
+ f"Current conversation context: { ev .context } "
91
+ )
92
+ }
93
+
94
+ # Create full message history for the API call
95
+ messages = [system_message ] + ev .conversation_history
96
+
97
+ # Call the Sonar API
98
+ response = call_sonar_api (messages )
99
+
100
+ # Extract entities from response (simplified version)
101
+ if "president" in ev .query .lower ():
102
+ entities = await ctx .get ("entities" , default = {})
103
+ if "Donald Trump" in response :
104
+ entities ["US President" ] = {
105
+ "name" : "Donald Trump" ,
106
+ "position" : "President of the United States"
107
+ }
108
+ await ctx .set ("entities" , entities )
109
+
110
+ # Check for age-related info
111
+ if "age" in ev .query .lower () and "US President" in await ctx .get ("entities" , {}):
112
+ entities = await ctx .get ("entities" )
113
+ if "78" in response or "seventy-eight" in response .lower ():
114
+ entities ["US President" ]["age" ] = "78"
115
+ await ctx .set ("entities" , entities )
116
+
117
+ # Update conversation history
118
+ conversation_history = ev .conversation_history .copy ()
119
+ conversation_history .append ({"role" : "assistant" , "content" : response })
120
+ await ctx .set ("conversation_history" , conversation_history )
121
+
122
+ return StopEvent (result = {
123
+ "response" : response ,
124
+ "conversation_history" : conversation_history ,
125
+ "updated_context" : await ctx .get ("entities" , default = {})
126
+ })
127
+
128
+ # Function to process a single question
129
+ async def ask_question (workflow , query ):
130
+ result = await workflow .run (query = query )
131
+ return result
132
+
133
+ # Interactive CLI chatbot function
134
+ async def run_chatbot ():
135
+ print ("=" * 50 )
136
+ print ("Welcome to the LlamaIndex Context-Aware Chatbot" )
137
+ print ("Type 'quit', 'exit', or 'bye' to end the conversation" )
138
+ print ("=" * 50 )
139
+
140
+ # Initialize workflow
141
+ conversation_flow = SonarConversationWorkflow (timeout = 30 , verbose = False )
142
+
143
+ while True :
144
+ # Get user input
145
+ user_input = input ("\n You: " )
146
+
147
+ # Check for exit commands
148
+ if user_input .lower () in ['quit' , 'exit' , 'bye' ]:
149
+ print ("\n Goodbye! Thanks for chatting." )
150
+ break
151
+
152
+ # Process user query
153
+ try :
154
+ print ("\n Bot: " , end = "" , flush = True )
155
+ result = await ask_question (conversation_flow , user_input )
156
+
157
+ # Print the response
158
+ print (result ["response" ])
159
+
160
+ # Optionally, uncomment to show the current context state
161
+ # print("\nCurrent context:", result["updated_context"])
162
+ except Exception as e :
163
+ print (f"Error: { str (e )} " )
164
+
165
+ # Run the async main function
166
+ if __name__ == "__main__" :
167
+ try :
168
+ asyncio .run (run_chatbot ())
169
+ except KeyboardInterrupt :
170
+ print ("\n Chatbot terminated by user." )
0 commit comments