-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_client_example.py
More file actions
239 lines (195 loc) · 9.42 KB
/
api_client_example.py
File metadata and controls
239 lines (195 loc) · 9.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
Client example for the BI Assistant API.
This example demonstrates how to:
1. Create a session
2. Send chat messages (both streaming and non-streaming)
3. Manage conversation history
4. Handle errors
Usage:
python api_client_example.py
Make sure the API server is running:
python server.py
"""
import asyncio
import aiohttp
import json
import time
from typing import Optional
class BIAssistantClient:
"""Client for interacting with the BI Assistant API"""
def __init__(self, base_url: str = "http://localhost:8888"):
self.base_url = base_url
self.session_id: Optional[str] = None
async def create_session(
self,
username: str = "Test User",
user_role: str = "BI",
preferred_language: str = "en",
streaming: bool = True
) -> dict:
"""Create a new session"""
async with aiohttp.ClientSession() as session:
payload = {
"username": username,
"user_role": user_role,
"preferred_language": preferred_language,
"streaming": streaming
}
async with session.post(f"{self.base_url}/sessions", json=payload) as response:
if response.status == 200:
session_info = await response.json()
self.session_id = session_info["session_id"]
print(f"✅ Session created: {self.session_id}")
return session_info
else:
error_text = await response.text()
raise Exception(f"Failed to create session: {response.status} - {error_text}")
async def chat_non_streaming(self, message: str) -> dict:
"""Send a chat message without streaming"""
if not self.session_id:
raise Exception("No active session. Create a session first.")
async with aiohttp.ClientSession() as session:
payload = {
"message": message,
"session_id": self.session_id,
"stream": False
}
async with session.post(f"{self.base_url}/chat", json=payload) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
raise Exception(f"Chat failed: {response.status} - {error_text}")
async def chat_streaming(self, message: str):
"""Send a chat message with streaming"""
if not self.session_id:
raise Exception("No active session. Create a session first.")
async with aiohttp.ClientSession() as session:
payload = {
"message": message,
"session_id": self.session_id,
"stream": True
}
async with session.post(f"{self.base_url}/chat/stream", json=payload) as response:
if response.status == 200:
print(f"🎯 Streaming response for: '{message}'")
print("-" * 50)
async for line in response.content:
if line:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
try:
data = json.loads(line_str[6:]) # Remove 'data: ' prefix
if data['type'] == 'start':
print(f"📝 Message ID: {data['message_id']}")
elif data['type'] == 'text':
print(data['text'], end='', flush=True)
elif data['type'] == 'complete':
print(f"\n\n✅ Complete - Success: {data['success']}, Iterations: {data['iterations']}")
if data.get('error'):
print(f"❌ Error: {data['error']}")
elif data['type'] == 'error':
print(f"\n❌ Stream Error: {data['error']}")
break
except json.JSONDecodeError:
continue
print("\n" + "=" * 50)
else:
error_text = await response.text()
raise Exception(f"Streaming chat failed: {response.status} - {error_text}")
async def get_session_info(self) -> dict:
"""Get information about the current session"""
if not self.session_id:
raise Exception("No active session. Create a session first.")
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/sessions/{self.session_id}") as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
raise Exception(f"Failed to get session info: {response.status} - {error_text}")
async def get_conversation_history(self) -> dict:
"""Get conversation history for the current session"""
if not self.session_id:
raise Exception("No active session. Create a session first.")
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/sessions/{self.session_id}/history") as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
raise Exception(f"Failed to get history: {response.status} - {error_text}")
async def get_capabilities(self) -> dict:
"""Get available capabilities for the current session"""
if not self.session_id:
raise Exception("No active session. Create a session first.")
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/sessions/{self.session_id}/capabilities") as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
raise Exception(f"Failed to get capabilities: {response.status} - {error_text}")
async def delete_session(self) -> bool:
"""Delete the current session"""
if not self.session_id:
return False
async with aiohttp.ClientSession() as session:
async with session.delete(f"{self.base_url}/sessions/{self.session_id}") as response:
if response.status == 200:
print(f"🗑️ Session deleted: {self.session_id}")
self.session_id = None
return True
else:
return False
async def main():
"""Main demo function"""
print("🚀 BI Assistant API Client Demo")
print("=" * 40)
client = BIAssistantClient()
try:
# Create a session
session_info = await client.create_session(
username="Demo User",
user_role="Data Analyst",
preferred_language="en",
streaming=True
)
print(f"📊 Session Info: {json.dumps(session_info, indent=2, default=str)}")
# Get capabilities
print("\n🔧 Getting capabilities...")
capabilities = await client.get_capabilities()
print(f"Capabilities: {json.dumps(capabilities, indent=2)}")
# Example queries for testing
test_queries = [
"Hello! What can you help me with?",
"Can you generate a SQL query to show sales trends?",
"Create a chart showing monthly revenue data"
]
# Test streaming chat
for query in test_queries:
print(f"\n{'=' * 60}")
print(f"🗣️ Testing query: '{query}'")
await client.chat_streaming(query)
await asyncio.sleep(1) # Small delay between requests
# Test non-streaming chat
print(f"\n{'=' * 60}")
print("🔄 Testing non-streaming response...")
response = await client.chat_non_streaming("What's the status of our conversation?")
print(f"Response: {json.dumps(response, indent=2, default=str)}")
# Get conversation history
print(f"\n📜 Getting conversation history...")
history = await client.get_conversation_history()
print(f"History: {history['message_count']} messages")
# Get final session info
print(f"\n📊 Final session info...")
final_session_info = await client.get_session_info()
print(f"Session: {json.dumps(final_session_info, indent=2, default=str)}")
except Exception as e:
print(f"❌ Error: {str(e)}")
finally:
# Clean up
await client.delete_session()
print("\n👋 Demo completed!")
if __name__ == "__main__":
asyncio.run(main())