Skip to content

Commit 7a15f6d

Browse files
Merge pull request #21 from basedosdados/feat/stream-responses
feat: stream responses
2 parents 034ed42 + ccefd07 commit 7a15f6d

File tree

11 files changed

+1795
-1383
lines changed

11 files changed

+1795
-1383
lines changed

frontend/api/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
from .api_client import APIClient
2+
3+
__all__ = ["APIClient"]

frontend/api/api_client.py

Lines changed: 72 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import json
2+
from typing import Iterator
13
from uuid import UUID
24

3-
import requests
5+
import httpx
46
from loguru import logger
57

6-
from frontend.datatypes import MessagePair, Thread, UserMessage
8+
from frontend.datatypes import MessagePair, Step, Thread, UserMessage
79

810

911
class APIClient:
@@ -26,7 +28,7 @@ def authenticate(self, email: str, password: str) -> tuple[str|None, str]:
2628
message = "Ops! Ocorreu um erro durante o login. Por favor, tente novamente."
2729

2830
try:
29-
response = requests.post(
31+
response = httpx.post(
3032
url=f"{self.base_url}/chatbot/token/",
3133
data={
3234
"email": email,
@@ -42,13 +44,13 @@ def authenticate(self, email: str, password: str) -> tuple[str|None, str]:
4244
message = "Conectado com sucesso!"
4345
else:
4446
self.logger.error(f"[LOGIN] No access token returned")
45-
except requests.exceptions.HTTPError:
46-
if response.status_code == requests.codes.unauthorized:
47+
except httpx.HTTPStatusError:
48+
if response.status_code == httpx.codes.UNAUTHORIZED:
4749
self.logger.warning(f"[LOGIN] Invalid credentials")
4850
message = "Usuário ou senha incorretos."
4951
else:
5052
self.logger.exception(f"[LOGIN] HTTP error:")
51-
except requests.exceptions.RequestException:
53+
except Exception:
5254
self.logger.exception(f"[LOGIN] Login error:")
5355

5456
return access_token, message
@@ -66,7 +68,7 @@ def create_thread(self, access_token: str, title: str) -> Thread|None:
6668
self.logger.info("[THREAD] Creating thread")
6769

6870
try:
69-
response = requests.post(
71+
response = httpx.post(
7072
url=f"{self.base_url}/chatbot/threads/",
7173
json={"title": title},
7274
headers={"Authorization": f"Bearer {access_token}"},
@@ -75,7 +77,7 @@ def create_thread(self, access_token: str, title: str) -> Thread|None:
7577
thread = Thread(**response.json())
7678
self.logger.success(f"[THREAD] Thread created successfully for user {thread.account}")
7779
return thread
78-
except requests.RequestException:
80+
except Exception:
7981
self.logger.exception(f"[THREAD] Error on thread creation:")
8082
return None
8183

@@ -90,7 +92,7 @@ def get_threads(self, access_token: str) -> list[Thread]|None:
9092
"""
9193
self.logger.info("[THREAD] Retrieving threads")
9294
try:
93-
response = requests.get(
95+
response = httpx.get(
9496
url=f"{self.base_url}/chatbot/threads/",
9597
params={"order_by": "created_at"},
9698
headers={"Authorization": f"Bearer {access_token}"}
@@ -99,14 +101,14 @@ def get_threads(self, access_token: str) -> list[Thread]|None:
99101
threads = [Thread(**thread) for thread in response.json()]
100102
self.logger.success(f"[THREAD] Threads retrieved successfully")
101103
return threads
102-
except requests.RequestException:
104+
except Exception:
103105
self.logger.exception(f"[THREAD] Error on threads retrieval:")
104106
return None
105107

106108
def get_message_pairs(self, access_token: str, thread_id: UUID) -> list[MessagePair]|None:
107109
self.logger.info(f"[MESSAGE] Retrieving message pairs for thread {thread_id}")
108110
try:
109-
response = requests.get(
111+
response = httpx.get(
110112
url=f"{self.base_url}/chatbot/threads/{thread_id}/messages/",
111113
params={"order_by": "created_at"},
112114
headers={"Authorization": f"Bearer {access_token}"}
@@ -115,48 +117,78 @@ def get_message_pairs(self, access_token: str, thread_id: UUID) -> list[MessageP
115117
message_pairs = [MessagePair(**pair) for pair in response.json()]
116118
self.logger.success(f"[MESSAGE] Message pairs retrieved successfully for thread {thread_id}")
117119
return message_pairs
118-
except requests.RequestException:
120+
except Exception:
119121
self.logger.exception(f"[MESSAGE] Error on message pairs retrieval for thread {thread_id}:")
120122
return None
121123

122-
def send_message(self, access_token: str, message: str, thread_id: UUID) -> MessagePair:
123-
"""Send a user message.
124+
def send_message(self, access_token: str, message: str, thread_id: UUID) -> Iterator[tuple[str, Step|MessagePair]]:
125+
"""Send a user message and stream the assistant's response.
124126
125127
Args:
126-
access_token (str): User access token.
127-
message (str): User message.
128-
thread_id (UUID): Thread unique identifier.
128+
access_token (str): The user's access token.
129+
message (str): The message sent by the user.
130+
thread_id (UUID): The unique identifier of the thread.
129131
130-
Returns:
131-
MessagePair:
132-
A MessagePair object containing:
133-
- id: unique identifier
134-
- user_message: user message
135-
- assistant_message: assistant message
136-
- generated_queries: generated sql queries
132+
Yields:
133+
Iterator[tuple[str, Step|MessagePair]]: Tuples containing a status message and either a `Step` or `MessagePair` object.
134+
While streaming, `Step` objects are yielded. Once streaming is complete, a final `MessagePair` is yielded.
137135
"""
138136
user_message = UserMessage(content=message)
139137

140138
self.logger.info(f"[MESSAGE] Sending message {user_message.id} in thread {thread_id}")
141139

140+
steps = []
141+
142142
try:
143-
response = requests.post(
143+
with httpx.stream(
144+
method="POST",
144145
url=f"{self.base_url}/chatbot/threads/{thread_id}/messages/",
146+
headers={"Authorization": f"Bearer {access_token}"},
145147
json=user_message.model_dump(mode="json"),
146-
headers={"Authorization": f"Bearer {access_token}"}
148+
timeout=httpx.Timeout(5.0, read=300.0),
149+
) as response:
150+
response.raise_for_status()
151+
152+
self.logger.success(f"[MESSAGE] User message sent successfully")
153+
154+
for line in response.iter_lines():
155+
if not line:
156+
continue
157+
158+
payload = json.loads(line)
159+
streaming_status = payload["status"]
160+
data = payload["data"]
161+
162+
if streaming_status == "running":
163+
message = Step.model_validate_json(data)
164+
steps.append(message)
165+
elif streaming_status == "complete":
166+
data["steps"] = steps
167+
message = MessagePair(**data)
168+
169+
yield streaming_status, message
170+
except httpx.ReadTimeout:
171+
self.logger.exception(f"[MESSAGE] Timeout error on sending user message:")
172+
message = MessagePair(
173+
user_message=user_message.content,
174+
error_message=(
175+
"Ops, parece que a solicitação expirou! Por favor, tente novamente. "
176+
"Se o problema persistir, avise-nos. Obrigado pela paciência!"
177+
),
178+
steps=steps or [],
147179
)
148-
response.raise_for_status()
149-
self.logger.success(f"[MESSAGE] User message sent successfully")
150-
message_pair = response.json()
151-
except requests.RequestException:
180+
yield "complete", message
181+
except Exception:
152182
self.logger.exception(f"[MESSAGE] Error on sending user message:")
153-
message_pair = {
154-
"user_message": user_message.content,
155-
"assistant_message": "Ops, algo deu errado! Por favor, tente novamente. "\
183+
message = MessagePair(
184+
user_message=user_message.content,
185+
error_message=(
186+
"Ops, algo deu errado! Por favor, tente novamente. "
156187
"Se o problema persistir, avise-nos. Obrigado pela paciência!"
157-
}
158-
159-
return MessagePair(**message_pair)
188+
),
189+
steps=steps or [],
190+
)
191+
yield "complete", message
160192

161193
def send_feedback(self, access_token: str, message_pair_id: UUID, rating: int, comments: str) -> bool:
162194
"""Send a feedback.
@@ -175,7 +207,7 @@ def send_feedback(self, access_token: str, message_pair_id: UUID, rating: int, c
175207
self.logger.info(f"[FEEDBACK] Sending {feedback_meaning} feedback for message pair {message_pair_id}")
176208

177209
try:
178-
response = requests.put(
210+
response = httpx.put(
179211
url=f"{self.base_url}/chatbot/message-pairs/{message_pair_id}/feedbacks/",
180212
json={
181213
"rating": rating,
@@ -186,7 +218,7 @@ def send_feedback(self, access_token: str, message_pair_id: UUID, rating: int, c
186218
response.raise_for_status()
187219
self.logger.success(f"[FEEDBACK] Feedback sent successfully")
188220
return True
189-
except requests.exceptions.RequestException:
221+
except Exception:
190222
self.logger.exception(f"[FEEDBACK] Error on sending feedback:")
191223
return False
192224

@@ -203,13 +235,13 @@ def delete_thread(self, access_token: str, thread_id: UUID) -> bool:
203235
self.logger.info(f"""[CLEAR] Clearing assistant memory""")
204236

205237
try:
206-
response = requests.delete(
238+
response = httpx.delete(
207239
url=f"{self.base_url}/chatbot/threads/{thread_id}/",
208240
headers={"Authorization": f"Bearer {access_token}"}
209241
)
210242
response.raise_for_status()
211243
self.logger.success(f"[CLEAR] Assistant memory cleared successfully")
212244
return True
213-
except requests.exceptions.RequestException:
245+
except Exception:
214246
self.logger.exception("[CLEAR] Error on clearing assistant memory:")
215247
return False

frontend/components/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,13 @@
33
from .stylable_containers import chart_button_container, code_button_container
44
from .three_dots import three_pulsing_dots, three_waving_dots
55
from .typewriter import typewrite
6+
7+
__all__ = [
8+
"render_card",
9+
"render_disclaimer",
10+
"chart_button_container",
11+
"code_button_container",
12+
"three_pulsing_dots",
13+
"three_waving_dots",
14+
"typewrite"
15+
]

0 commit comments

Comments
 (0)