Skip to content

Commit 37d90e9

Browse files
committed
backend for streming works
1 parent 0c5532b commit 37d90e9

File tree

4 files changed

+178
-17
lines changed

4 files changed

+178
-17
lines changed

backend/app/api/routes/learn.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Literal
22
from fastapi import APIRouter, Depends, HTTPException
3+
from fastapi.responses import StreamingResponse
34
from pydantic import BaseModel
45
from app.api import deps
56
from app.core.ai_client import ChatManager, AnthropicClient, OpenAIClient
@@ -66,6 +67,29 @@ async def chat_general(
6667

6768
return ChatResponse(message=response)
6869

70+
@router.post("/learn/chat/stream")
71+
async def chat_stream(
72+
request: ChatRequest,
73+
current_user = Depends(deps.get_current_user),
74+
):
75+
"""Streaming chat endpoint."""
76+
chat_key = f"{current_user.id}_general"
77+
if chat_key not in active_chats:
78+
active_chats[chat_key] = ChatManager(client=request.model)
79+
80+
return StreamingResponse(
81+
active_chats[chat_key].stream_message(
82+
request.message,
83+
system=request.system_prompt
84+
),
85+
media_type='text/event-stream',
86+
headers={
87+
'Cache-Control': 'no-cache',
88+
'Connection': 'keep-alive',
89+
'X-Accel-Buffering': 'no' # Disable buffering in nginx
90+
}
91+
)
92+
6993
@router.post("/learn/{path_id}", response_model=ChatResponse)
7094
async def chat(
7195
path_id: str,

backend/app/core/ai_client.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
11
from typing import AsyncGenerator, Dict, Protocol, Literal
22
from typing import Literal, TypedDict
3-
from anthropic import Anthropic
3+
from anthropic import AsyncAnthropic
44
from openai import AsyncOpenAI
55
from app.core.config import settings
6+
import json
7+
import logging
8+
9+
logger = logging.getLogger(__name__)
610

711
class Message(TypedDict):
812
role: Literal["user", "assistant", "system"]
913
content: str
1014

11-
class BaseAIClient:
15+
class BaseAIClient(Protocol):
1216
async def chat(self, messages: list[Message], system: str | None = None) -> str:
1317
raise NotImplementedError
18+
19+
async def chat_stream(self, messages: list[Message], system: str | None = None) -> AsyncGenerator[str, None]:
20+
raise NotImplementedError
1421

1522
class AnthropicClient(BaseAIClient):
1623
def __init__(self):
17-
self.client = Anthropic(api_key=settings.ANTHROPIC_API_KEY)
24+
self.client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
1825
self.model = settings.ANTHROPIC_MODEL
1926

2027
async def chat(self, messages: list[Message], system: str | None = None) -> str:
@@ -27,6 +34,26 @@ async def chat(self, messages: list[Message], system: str | None = None) -> str:
2734
)
2835
return response.content[0].text
2936

37+
async def chat_stream(self, messages: list[Message], system: str | None = None) -> AsyncGenerator[str, None]:
38+
"""Stream chat responses."""
39+
request_params = {
40+
'messages': [{"role": m["role"], "content": m["content"]} for m in messages],
41+
'model': self.model,
42+
'max_tokens': settings.MAX_TOKENS,
43+
'temperature': settings.TEMPERATURE,
44+
}
45+
if system:
46+
request_params['system'] = system
47+
48+
async with self.client.messages.stream(**request_params) as stream:
49+
async for text in stream.text_stream:
50+
yield f"data: {json.dumps({'type': 'content', 'content': text})}\n\n"
51+
52+
# Get the final message for history
53+
message = await stream.get_final_message()
54+
self.add_message("assistant", message.content) # Add to history
55+
yield f"data: {json.dumps({'type': 'done', 'content': ''})}\n\n"
56+
3057
class OpenAIClient(BaseAIClient):
3158
def __init__(self):
3259
self.client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
@@ -44,6 +71,22 @@ async def chat(self, messages: list[Message], system: str | None = None) -> str:
4471
)
4572
return response.choices[0].message.content
4673

74+
async def chat_stream(self, messages: list[Message], system: str | None = None) -> AsyncGenerator[str, None]:
75+
"""Stream chat responses."""
76+
if system:
77+
messages = [{"role": "system", "content": system}, *messages]
78+
79+
response = await self.client.chat.completions.create(
80+
model=self.model,
81+
temperature=settings.TEMPERATURE,
82+
max_tokens=settings.MAX_TOKENS,
83+
messages=[{"role": m["role"], "content": m["content"]} for m in messages],
84+
stream=True
85+
)
86+
async for chunk in response:
87+
if chunk.choices:
88+
yield chunk.choices[0].delta.content
89+
4790
class ChatManager:
4891
def __init__(self, client: Literal["anthropic", "openai"] = "anthropic"):
4992
self.history: list[Message] = []
@@ -57,3 +100,15 @@ async def send_message(self, content: str, system: str | None = None) -> str:
57100
response = await self.client.chat(self.history, system)
58101
self.add_message("assistant", response)
59102
return response
103+
104+
async def stream_message(self, content: str, system: str | None = None) -> AsyncGenerator[str, None]:
105+
"""Stream a message response."""
106+
self.add_message("user", content)
107+
async for chunk in self.client.chat_stream(self.history, system):
108+
yield chunk
109+
# Add the complete message to history after streaming
110+
if self.history[-1]["role"] == "user":
111+
last_chunk = None
112+
async for chunk in self.client.chat_stream(self.history, system):
113+
last_chunk = chunk
114+
self.add_message("assistant", last_chunk) # Add the last chunk as the complete response
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { Flex, Spinner } from "@chakra-ui/react"
2+
import { Outlet, createFileRoute, redirect } from "@tanstack/react-router"
3+
4+
import Sidebar from "../../components/Common/Sidebar"
5+
import UserMenu from "../../components/Common/UserMenu"
6+
import useAuth, { isLoggedIn } from "../../hooks/useAuth"
7+
8+
export const Route = createFileRoute("/learn/_layout")({
9+
component: Layout,
10+
beforeLoad: async () => {
11+
if (!isLoggedIn()) {
12+
throw redirect({
13+
to: "/login",
14+
})
15+
}
16+
},
17+
})
18+
19+
function Layout() {
20+
const { isLoading } = useAuth()
21+
22+
return (
23+
<Flex maxW="large" h="auto" position="relative">
24+
<Sidebar />
25+
{isLoading ? (
26+
<Flex justify="center" align="center" height="100vh" width="full">
27+
<Spinner size="xl" color="ui.main" />
28+
</Flex>
29+
) : (
30+
<Outlet />
31+
)}
32+
<UserMenu />
33+
</Flex>
34+
)
35+
}

frontend/src/routes/learn/chat/index.tsx

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ const ChatMessage = ({ message }: { message: ChatMessage }) => {
4040
)
4141
}
4242

43-
export const Route = createFileRoute('/learn/chat')({
43+
export const Route = createFileRoute('/learn/_layout/chat')({
4444
component: ChatRoute
4545
})
4646

@@ -74,30 +74,77 @@ function ChatRoute() {
7474
setCurrentMessage("")
7575
setIsLoading(true)
7676

77-
// Simulate streaming response
78-
const response = "This is a simulated streaming response..."
79-
let streamedContent = ""
80-
81-
const newMessage: ChatMessage = {
77+
const assistantMessage: ChatMessage = {
8278
id: window.crypto.randomUUID(),
8379
content: "",
8480
isUser: false,
8581
}
86-
setMessages(prev => [...prev, newMessage])
82+
setMessages(prev => [...prev, assistantMessage])
8783

88-
for (const char of response) {
89-
await new Promise(resolve => setTimeout(resolve, 50))
90-
streamedContent += char
84+
try {
85+
const response = await fetch('/api/v1/learn/chat/stream', {
86+
method: 'POST',
87+
headers: {
88+
'Content-Type': 'application/json',
89+
'Accept': 'text/event-stream',
90+
'Authorization': `Bearer ${localStorage.getItem('access_token')}`,
91+
},
92+
body: JSON.stringify({
93+
message: currentMessage,
94+
model: "anthropic"
95+
})
96+
})
97+
98+
if (!response.ok) {
99+
const errorData = await response.json()
100+
throw new Error(`HTTP error! status: ${response.status}, message: ${errorData.detail || 'Unknown error'}`)
101+
}
102+
103+
const reader = response.body?.getReader()
104+
if (!reader) throw new Error('No reader available')
105+
106+
let streamedContent = ""
107+
const decoder = new TextDecoder()
108+
109+
while (true) {
110+
const {done, value} = await reader.read()
111+
if (done) break
112+
113+
const chunk = decoder.decode(value)
114+
const lines = chunk.split('\n')
115+
116+
for (const line of lines) {
117+
if (line.startsWith('data: ')) {
118+
try {
119+
const data = JSON.parse(line.slice(6))
120+
if (data.type === 'content' && data.content) {
121+
streamedContent += data.content
122+
setMessages(prev =>
123+
prev.map(msg =>
124+
msg.id === assistantMessage.id
125+
? { ...msg, content: streamedContent }
126+
: msg
127+
)
128+
)
129+
}
130+
} catch (e) {
131+
console.error('Error parsing SSE data:', e)
132+
}
133+
}
134+
}
135+
}
136+
} catch (error) {
137+
console.error('Error:', error)
91138
setMessages(prev =>
92139
prev.map(msg =>
93-
msg.id === newMessage.id
94-
? { ...msg, content: streamedContent }
140+
msg.id === assistantMessage.id
141+
? { ...msg, content: "Sorry, there was an error processing your request." }
95142
: msg
96143
)
97144
)
145+
} finally {
146+
setIsLoading(false)
98147
}
99-
100-
setIsLoading(false)
101148
}
102149

103150
return (

0 commit comments

Comments
 (0)