-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_streaming_simple.py
More file actions
85 lines (67 loc) · 2.51 KB
/
test_streaming_simple.py
File metadata and controls
85 lines (67 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
"""
Simple test script to verify streaming functionality.
"""
import asyncio
import sys
import logging
from pathlib import Path
# Configure logging to see debug messages
logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from core.brain import AutumnBrain
from core.memory import AutumnMemory
from config.personality import AutumnPersonality
async def simple_streaming_test():
"""Test basic streaming functionality."""
print("Initializing Autumn...")
# Initialize components
memory = AutumnMemory()
personality = AutumnPersonality()
brain = AutumnBrain(memory, personality)
# Initialize brain
await brain.initialize()
print("Testing streaming vs non-streaming...")
test_query = "Hello, how are you today?" # Simple query to trigger local processing
# Test non-streaming first
print("\n" + "="*50)
print("NON-STREAMING TEST")
print("="*50)
import time
start = time.time()
response = await brain.process(test_query)
end = time.time()
print(f"Time: {end-start:.2f}s")
print(f"Response: {response[:100]}...")
# Test streaming
print("\n" + "="*50)
print("STREAMING TEST")
print("="*50)
tokens_received = []
async def stream_callback(data):
print(f"[CALLBACK] Received data: {data}") # Debug info
if data["type"] == "token":
tokens_received.append(data["content"])
print(data["content"], end="", flush=True)
elif data["type"] == "sentence":
print(f"\n[SENTENCE: {len(data['content'])} chars]")
elif data["type"] == "complete":
print(f"\n[COMPLETE: {len(data['content'])} total chars]")
start = time.time()
try:
# Check brain status first
print(f"Brain ollama_available: {brain.ollama_available}")
print(f"Brain gemini_available: {brain.gemini_available}")
stream_response = await brain.process_streaming(test_query, stream_callback)
end = time.time()
print(f"\nTime: {end-start:.2f}s")
print(f"Tokens received: {len(tokens_received)}")
print(f"Response length: {len(stream_response)}")
except Exception as e:
print(f"Streaming error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(simple_streaming_test())