Skip to content

Testing: Add integration tests for streaming endpoints #12

@Evrard-Nil

Description

@Evrard-Nil

Problem

The integration test suite in tests/integration.rs has good coverage for JSON endpoints but lacks tests for streaming (SSE) responses.

The streaming logic has significant complexity:

  • Background task spawning
  • SSE parsing across chunk boundaries
  • ID extraction from first chunk
  • [DONE] marker detection
  • Signature generation after stream completion

These are only tested via unit tests on SseParser, not end-to-end.

Impact

Medium - Streaming regressions could go undetected. The interaction between axum, tokio tasks, and reqwest streaming is not covered by unit tests.

Solution

Add streaming integration tests to tests/integration.rs:

#[tokio::test]
async fn test_chat_completions_streaming() {
    let mock_server = MockServer::start().await;
    
    // Mock backend returns SSE stream
    Mock::given(method("POST"))
        .and(path("/v1/chat/completions"))
        .respond_with(ResponseTemplate::new(200)
            .set_body_string(
                "data: {\"id\":\"chatcmpl-123\",\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}\\n\\n\
                 data: {\"id\":\"chatcmpl-123\",\"choices\":[{\"delta\":{\"content\":\" world\"}}]}\\n\\n\
                 data: [DONE]\\n\\n"
            )
            .insert_header("content-type", "text/event-stream"))
        .mount(&mock_server)
        .await;
    
    let app = build_test_app(&mock_server.uri());
    
    let request = Request::builder()
        .uri("/v1/chat/completions")
        .method("POST")
        .header("authorization", "Bearer test-token")
        .header("content-type", "application/json")
        .body(Body::from(r#"{"messages":[{"role":"user","content":"hi"}],"stream":true}"#))
        .unwrap();
    
    let response = app.oneshot(request).await.unwrap();
    
    assert_eq!(response.status(), StatusCode::OK);
    assert_eq!(
        response.headers().get("content-type").unwrap(),
        "text/event-stream"
    );
    
    // Collect stream body
    let body = response.into_body();
    let bytes = body_to_bytes(body).await;
    let body_str = String::from_utf8(bytes.to_vec()).unwrap();
    
    // Verify stream contains expected chunks
    assert!(body_str.contains("Hello"));
    assert!(body_str.contains("world"));
    assert!(body_str.contains("[DONE]"));
    
    // Verify signature was cached (give background task time to complete)
    tokio::time::sleep(Duration::from_millis(100)).await;
    // ... check cache for chatcmpl-123
}

#[tokio::test]
async fn test_streaming_chunk_boundaries() {
    // Test that SSE parsing works when data is split across chunks
    // (e.g., "data: {\"id\":\"cha" in one chunk, "t-123\"}" in next)
}

#[tokio::test]
async fn test_streaming_error_mid_stream() {
    // Test behavior when upstream connection fails mid-stream
}

#[tokio::test]
async fn test_streaming_client_disconnect() {
    // Test that background task cleans up when client disconnects
    // (needs tokio::time::timeout to simulate disconnect)
}

#[tokio::test]
async fn test_streaming_signature_cache_on_completion() {
    // Verify signature is only cached when stream completes with [DONE]
}

Test Coverage Goals

  • Normal streaming response
  • Chunk boundary edge cases
  • Missing [DONE] marker
  • Malformed JSON in stream
  • Client disconnect mid-stream
  • Backend error mid-stream
  • Signature caching on completion
  • ID extraction from first chunk

File Location

tests/integration.rs

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2: MediumMedium priority - fix when possibletestingRelated to testing and test coverage

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions