Skip to content

Latest commit

 

History

History
257 lines (212 loc) · 7.06 KB

File metadata and controls

257 lines (212 loc) · 7.06 KB

Autumn AI Streaming Implementation Guide

Overview

This document explains the implementation of streaming responses in Autumn AI Assistant and the potential challenges you might face.

What is Streaming?

When you set "stream": True in the Ollama API request, instead of waiting for the complete response, the model sends back tokens (words/word pieces) as they are generated in real-time.

Benefits of Streaming:

  1. Better User Experience: Users see text appearing character by character (like ChatGPT)
  2. Perceived Speed: Feels faster even though total generation time is the same
  3. Early Interruption: Users can stop generation if they don't like the direction
  4. Real-time Feedback: Can process sentences as they complete for TTS
  5. Better Engagement: More interactive and dynamic conversation flow

Technical Implementation

Current Non-Streaming (brain.py):

response = await self.http_client.post(
    "http://localhost:11434/api/generate",
    json={
        "model": "gemma3:1b-it-q4_K_M",
        "prompt": prompt,
        "stream": False,  # Wait for complete response
        "options": {
            "temperature": 0.7,
            "top_p": 0.9,
            "num_predict": 500
        }
    }
)

New Streaming Implementation:

async with self.http_client.stream(
    "POST", "http://localhost:11434/api/generate",
    json={
        "model": "gemma3:1b-it-q4_K_M",
        "prompt": prompt,
        "stream": True,  # Enable streaming
        "options": { /* same options */ }
    }
) as response:
    async for chunk in response.aiter_lines():
        chunk_data = json.loads(chunk)
        token = chunk_data.get("response", "")
        # Process token immediately

Potential Problems & Solutions

1. JSON Parsing Issues

Problem: Each streaming chunk is a separate JSON object, some might be malformed.

{"response": "Hello"}
{"response": " world"}
{"response": "!", "done": true}

Solution: Wrap JSON parsing in try-catch and skip invalid chunks:

try:
    chunk_data = json.loads(chunk)
except json.JSONDecodeError:
    logger.warning(f"Invalid JSON chunk: {chunk}")
    continue

2. Connection Interruption

Problem: Network issues can break the stream mid-response.

Solution: Implement retry logic and fallback to non-streaming:

except httpx.ConnectError:
    logger.error("Stream connection lost, falling back to non-streaming")
    return await self._process_local_non_streaming(prompt)

3. Buffer Management

Problem: Tokens might come in incomplete words or sentences.

Solution: Implement sentence buffering for TTS:

sentence_buffer += token
if self._is_sentence_complete(sentence_buffer):
    # Send complete sentence to TTS
    await self._speak_sentence(sentence_buffer)
    sentence_buffer = ""

4. Error Recovery

Problem: Harder to detect errors mid-stream vs complete responses.

Solution: Monitor for error indicators in stream:

if chunk_data.get("error"):
    logger.error(f"Stream error: {chunk_data['error']}")
    break

5. UI Synchronization

Problem: UI needs to update in real-time with streaming tokens.

Solution: Use async callbacks for UI updates:

async def stream_callback(data):
    if data["type"] == "token":
        ui.append_text(data["content"])
    elif data["type"] == "sentence":
        tts.speak(data["content"])

6. Memory Management

Problem: Long responses might consume excessive memory with token buffering.

Solution: Implement sliding window for very long responses:

if len(full_response) > MAX_RESPONSE_LENGTH:
    # Trim older content but keep recent context
    full_response = full_response[-KEEP_LENGTH:]

7. TTS Integration Challenges

Problem: TTS needs complete sentences, but streaming gives individual tokens.

Solution: Buffer tokens until sentence completion:

def _is_sentence_complete(text):
    endings = ['.', '!', '?', ':', ';']
    return (len(text.strip()) > 10 and 
            any(text.strip().endswith(end) for end in endings))

8. Timeout Handling

Problem: Streaming can hang if model stops generating without "done" signal.

Solution: Implement timeout with heartbeat monitoring:

async with asyncio.timeout(60.0):  # 1 minute max
    async for chunk in response.aiter_lines():
        # Process chunks

9. Unicode/Emoji Issues

Problem: Multi-byte characters might be split across tokens.

Solution: Use proper UTF-8 handling and buffer incomplete characters:

try:
    token.encode('utf-8').decode('utf-8')
except UnicodeDecodeError:
    # Buffer incomplete character
    continue

10. Performance Overhead

Problem: Processing many small chunks vs one large response.

Solution: Batch small tokens for processing:

token_batch = []
for token in tokens:
    token_batch.append(token)
    if len(token_batch) >= BATCH_SIZE:
        process_batch(token_batch)
        token_batch = []

Testing Strategy

1. Functional Tests

  • Test with various query lengths
  • Test with Unicode/emoji content
  • Test connection interruption scenarios
  • Test timeout scenarios

2. Performance Tests

  • Compare streaming vs non-streaming response times
  • Memory usage monitoring during long responses
  • Network usage patterns

3. Error Tests

  • Invalid JSON chunks
  • Network disconnection mid-stream
  • Model timeout scenarios
  • Empty responses

Best Practices

1. Always Have Fallback

if streaming_failed:
    return await self._process_non_streaming(prompt)

2. Implement Circuit Breaker

if streaming_error_count > MAX_ERRORS:
    self.disable_streaming_temporarily()

3. Monitor Performance

stream_start = time.time()
# ... streaming logic
stream_duration = time.time() - stream_start
if stream_duration > non_stream_duration * 1.5:
    logger.warning("Streaming slower than non-streaming")

4. User Control

# Allow users to toggle streaming
settings.enable_streaming = user_preference

Usage Examples

Basic Streaming:

async def stream_callback(data):
    if data["type"] == "token":
        print(data["content"], end="", flush=True)
    elif data["type"] == "complete":
        print("\n[Done]")

response = await brain.process_streaming(
    "Tell me about AI", 
    stream_callback
)

Advanced with TTS:

async def advanced_callback(data):
    if data["type"] == "token":
        ui.update_text(data["content"])
    elif data["type"] == "sentence":
        await tts_engine.speak(data["content"])
    elif data["type"] == "complete":
        ui.mark_complete()

Conclusion

Streaming makes Autumn feel much more responsive and engaging, but requires careful handling of:

  • Network issues
  • JSON parsing errors
  • Buffer management
  • TTS integration
  • Error recovery

The implementation includes comprehensive error handling and fallback mechanisms to ensure reliability while providing the benefits of real-time streaming.