diff --git a/tools/server/tests/conftest.py b/tools/server/tests/conftest.py index 017d1bb841efd..6462290f499a5 100644 --- a/tools/server/tests/conftest.py +++ b/tools/server/tests/conftest.py @@ -2,14 +2,92 @@ from utils import * -# ref: https://stackoverflow.com/questions/22627659/run-code-before-and-after-each-test-in-py-test @pytest.fixture(autouse=True) def stop_server_after_each_test(): - # do nothing before each test yield - # stop all servers after each test instances = set( server_instances - ) # copy the set to prevent 'Set changed size during iteration' + ) for server in instances: server.stop() + + +@pytest.fixture +def pipeline_process(): + """ + Fixture providing a PipelineTestProcess instance for E2E testing. + Automatically cleaned up after test completion. + """ + process = PipelineTestProcess() + yield process + if process.process is not None: + process.stop() + + +@pytest.fixture +def e2e_small_model_config(): + """ + Fixture providing configuration for a small model suitable for E2E testing. + Uses tinyllama for fast execution in CI environments. + """ + return { + "model_hf_repo": "ggml-org/models", + "model_hf_file": "tinyllamas/stories260K.gguf", + "model_alias": "tinyllama-e2e", + "n_ctx": 512, + "n_batch": 32, + "n_slots": 2, + "n_predict": 32, + "seed": 42, + "temperature": 0.8, + } + + +@pytest.fixture +def e2e_embedding_model_config(): + """ + Fixture providing configuration for embedding model E2E testing. + """ + return { + "model_hf_repo": "ggml-org/models", + "model_hf_file": "bert-bge-small/ggml-model-f16.gguf", + "model_alias": "bert-e2e", + "n_ctx": 512, + "n_batch": 128, + "n_ubatch": 128, + "n_slots": 2, + "seed": 42, + "server_embeddings": True, + } + + +@pytest.fixture +def e2e_multimodal_model_config(): + """ + Fixture providing configuration for multimodal model E2E testing. + """ + return { + "model_hf_repo": "ggml-org/tinygemma3-GGUF", + "model_hf_file": "tinygemma3-Q8_0.gguf", + "mmproj_url": "https://huggingface.co/ggml-org/tinygemma3-GGUF/resolve/main/mmproj-tinygemma3.gguf", + "model_alias": "tinygemma3-e2e", + "n_ctx": 1024, + "n_batch": 32, + "n_slots": 2, + "n_predict": 16, + "seed": 42, + } + + +@pytest.fixture +def concurrent_test_prompts(): + """ + Fixture providing a list of prompts for concurrent testing scenarios. + """ + return [ + "Once upon a time", + "In a distant land", + "There was a brave knight", + "The dragon soared", + "Magic filled the air", + ] diff --git a/tools/server/tests/e2e/README.md b/tools/server/tests/e2e/README.md new file mode 100644 index 0000000000000..e46b62a018ca1 --- /dev/null +++ b/tools/server/tests/e2e/README.md @@ -0,0 +1,273 @@ +# End-to-End Test Suite + +This directory contains comprehensive end-to-end (E2E) tests for llama.cpp, extending beyond unit-focused API testing to validate complete user workflows and component integration. + +## Overview + +The E2E test suite provides comprehensive coverage of: + +1. **Pipeline Workflows** - Complete model download, loading, and inference workflows +2. **Tool Integration** - CLI tool testing (llama-cli, llama-bench) +3. **Multimodal Workflows** - Vision + text processing coordination +4. **Concurrent Scenarios** - Multi-user simulation and parallel request handling + +## Test Files + +### test_pipeline_workflows.py + +Tests complete pipeline workflows from model acquisition to inference: + +- **Model Download & Loading**: Validates HuggingFace model download and loading +- **State Transitions**: Tracks server state progression (INITIAL → LOADING_MODEL → READY → GENERATING) +- **Context Management**: Tests extended inference sessions with context preservation +- **KV Cache Behavior**: Validates cache utilization during workflows +- **Streaming Pipeline**: Tests streaming inference through complete pipeline +- **Embedding Models**: Validates embedding model pipelines + +**Example:** +```bash +./tests.sh e2e/test_pipeline_workflows.py::test_basic_pipeline_workflow +``` + +### test_tool_integration.py + +Tests CLI tool integration and coordination: + +- **llama-cli Execution**: Basic and advanced CLI usage patterns +- **llama-bench Testing**: Performance benchmark execution +- **Embedding Generation**: CLI-based embedding workflows +- **Parameter Validation**: Error handling and validation +- **Server/CLI Coordination**: Resource sharing between tools + +**Example:** +```bash +./tests.sh e2e/test_tool_integration.py::test_cli_basic_execution +``` + +### test_multimodal_workflows.py + +Tests multimodal (vision + text) processing: + +- **Model Loading**: Multimodal model initialization with vision projection +- **Image Processing**: Image input handling with text completion +- **Context Preservation**: Cross-modal context management +- **Sequential Requests**: Mixed text-only and multimodal requests +- **Streaming**: Multimodal streaming responses +- **Error Handling**: Invalid input handling + +**Example:** +```bash +./tests.sh e2e/test_multimodal_workflows.py::test_multimodal_chat_with_image +``` + +### test_concurrent_scenarios.py + +Tests concurrent request handling and real-world scenarios: + +- **Concurrent Requests**: Multiple simultaneous completion/chat requests +- **Multi-turn Conversations**: Context preservation across conversation turns +- **Slot Management**: Request queuing and slot allocation under load +- **Streaming Concurrency**: Multiple streaming sessions +- **LoRA Switching**: Adapter loading/switching during active sessions +- **Mixed Workloads**: Different request types running concurrently + +**Example:** +```bash +./tests.sh e2e/test_concurrent_scenarios.py::test_concurrent_completion_requests +``` + +## Framework Extensions + +### PipelineTestProcess Class + +The `PipelineTestProcess` class extends `ServerProcess` with E2E testing capabilities: + +```python +from utils import PipelineTestProcess + +# Create pipeline test instance +pipeline = PipelineTestProcess() + +# Test complete pipeline workflow +results = pipeline.test_full_pipeline({ + "model_hf_repo": "ggml-org/models", + "model_hf_file": "tinyllamas/stories260K.gguf", + "n_ctx": 512, +}) + +# Run CLI commands +result = pipeline.run_cli_command(["-m", model_path, "-p", "Hello", "-n", "16"]) + +# Run benchmarks +bench_results = pipeline.run_bench_command(model_path, ["-p", "8", "-n", "8"]) +``` + +**Key Methods:** + +- `test_full_pipeline(model_config)` - Execute complete pipeline workflow +- `run_cli_command(args, input_text, timeout)` - Execute llama-cli +- `run_bench_command(model_path, args, timeout)` - Execute llama-bench +- `test_context_management(prompts, max_context)` - Test context handling +- `validate_kv_cache_behavior(context_size, tokens)` - Validate cache usage + +### Test Fixtures + +New pytest fixtures in `conftest.py`: + +- **`pipeline_process`** - PipelineTestProcess instance with automatic cleanup +- **`e2e_small_model_config`** - Small model config for fast E2E tests +- **`e2e_embedding_model_config`** - Embedding model configuration +- **`e2e_multimodal_model_config`** - Multimodal model configuration +- **`concurrent_test_prompts`** - Prompts for concurrent testing + +## Running E2E Tests + +### Run All E2E Tests + +```bash +./tests.sh e2e/ +``` + +### Run Specific Test File + +```bash +./tests.sh e2e/test_pipeline_workflows.py +``` + +### Run Single Test + +```bash +./tests.sh e2e/test_pipeline_workflows.py::test_basic_pipeline_workflow +``` + +### Run with Verbose Output + +```bash +DEBUG=1 ./tests.sh e2e/ -s -v +``` + +### Run Slow Tests + +Some tests are marked as slow and require the `SLOW_TESTS` environment variable: + +```bash +SLOW_TESTS=1 ./tests.sh e2e/ +``` + +## Configuration + +### Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `LLAMA_CLI_BIN_PATH` | Path to llama-cli binary | `../../../build/bin/llama-cli` | +| `LLAMA_BENCH_BIN_PATH` | Path to llama-bench binary | `../../../build/bin/llama-bench` | +| `LLAMA_CACHE` | Model cache directory | `tmp` | +| `SLOW_TESTS` | Enable slow tests | `0` | +| `DEBUG` | Enable verbose output | `0` | + +### Model Selection + +E2E tests use smaller models for CI compatibility: + +- **Text Generation**: tinyllama (stories260K.gguf) - Fast, small footprint +- **Embeddings**: bert-bge-small - Efficient embedding generation +- **Multimodal**: tinygemma3 - Compact vision+text model + +For local testing with larger models, modify the fixture configurations in `conftest.py`. + +## Writing New E2E Tests + +### Example Test Structure + +```python +def test_my_e2e_workflow(pipeline_process, e2e_small_model_config): + """ + Test description here. + + Validates: + - Point 1 + - Point 2 + """ + # Configure pipeline + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + # Start server + pipeline_process.start() + + # Test workflow + res = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Test", + "n_predict": 8, + }) + + # Assertions + assert res.status_code == 200 + assert "content" in res.body +``` + +### Best Practices + +1. **Use Fixtures**: Leverage existing fixtures for model configs and test data +2. **Small Models**: Use small models for fast execution in CI +3. **Resource Cleanup**: Fixtures handle cleanup automatically +4. **Test Isolation**: Each test should be independent +5. **Descriptive Names**: Use clear, descriptive test names +6. **Documentation**: Include docstrings explaining what is validated +7. **Slow Tests**: Mark expensive tests with `@pytest.mark.skipif(not is_slow_test_allowed())` + +## CI Integration + +E2E tests are designed to run in CI environments with: + +- 4 vCPU GitHub runners +- Limited memory footprint +- Fast model downloads from HuggingFace +- Reasonable timeout configurations + +Tests automatically skip slow scenarios unless `SLOW_TESTS=1` is set. + +## Troubleshooting + +### Tests Timeout + +- Increase timeout in test: `pipeline_process.start(timeout_seconds=120)` +- Use smaller models in CI +- Check network connectivity for model downloads + +### Model Download Issues + +- Set `LLAMA_CACHE` to a persistent directory +- Pre-download models before running tests +- Check HuggingFace availability + +### CLI Tool Not Found + +- Ensure binaries are built: `cmake --build build --target llama-cli llama-bench` +- Set `LLAMA_CLI_BIN_PATH` and `LLAMA_BENCH_BIN_PATH` +- Check binary permissions + +### Concurrent Test Failures + +- Increase `n_slots` for higher concurrency +- Adjust timing expectations for slower systems +- Enable `server_continuous_batching` for better scheduling + +## Contributing + +When adding new E2E tests: + +1. Place tests in appropriate file based on category +2. Use existing fixtures when possible +3. Add new fixtures to `conftest.py` if needed +4. Update this README with new test descriptions +5. Ensure tests pass in CI environment +6. Document special requirements or configurations + +## Related Documentation + +- [Main Test README](../README.md) - General testing documentation +- [Server Documentation](../../README.md) - llama-server documentation +- [Contributing Guide](../../../../CONTRIBUTING.md) - Project contribution guidelines diff --git a/tools/server/tests/e2e/__init__.py b/tools/server/tests/e2e/__init__.py new file mode 100644 index 0000000000000..3194e40467a89 --- /dev/null +++ b/tools/server/tests/e2e/__init__.py @@ -0,0 +1,9 @@ +""" +End-to-end test suite for llama.cpp server. + +This module provides comprehensive E2E testing covering: +- Complete pipeline workflows (download, conversion, loading, inference) +- Tool integration testing (llama-cli, llama-bench) +- Multimodal workflows (vision + text) +- Concurrent scenario simulation +""" diff --git a/tools/server/tests/e2e/test_concurrent_scenarios.py b/tools/server/tests/e2e/test_concurrent_scenarios.py new file mode 100644 index 0000000000000..c384c8e736739 --- /dev/null +++ b/tools/server/tests/e2e/test_concurrent_scenarios.py @@ -0,0 +1,471 @@ +""" +End-to-end tests for concurrent scenarios. + +Tests cover: +- Multi-turn conversation management with context preservation +- Concurrent user simulation and request queuing validation +- LoRA adapter loading and switching during active sessions +- Batch processing with multiple simultaneous users +- Request slot management under load conditions +""" + +import pytest +from utils import * + + +def test_concurrent_completion_requests(pipeline_process, e2e_small_model_config, concurrent_test_prompts): + """ + Test concurrent completion requests from multiple simulated users. + + Validates: + - Server handles multiple simultaneous requests + - All requests complete successfully + - Responses are independent and correct + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 4 + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/completion", { + "prompt": prompt, + "n_predict": 16, + "temperature": 0.8, + }) + ) + for prompt in concurrent_test_prompts + ] + + results = parallel_function_calls(tasks) + + assert len(results) == len(concurrent_test_prompts) + assert all([res.status_code == 200 for res in results]), \ + "All concurrent requests should succeed" + assert all(["content" in res.body for res in results]), \ + "All responses should contain content" + + +def test_concurrent_chat_completions(pipeline_process, e2e_small_model_config): + """ + Test concurrent chat completion requests. + + Validates: + - Multiple chat sessions run simultaneously + - Context is isolated between sessions + - No cross-contamination of conversations + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 3 + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + conversations = [ + [{"role": "user", "content": "Tell me about dogs"}], + [{"role": "user", "content": "Tell me about cats"}], + [{"role": "user", "content": "Tell me about birds"}], + ] + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/chat/completions", { + "messages": conv, + "max_tokens": 16, + }) + ) + for conv in conversations + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]) + assert all(["choices" in res.body for res in results]) + + +def test_multi_turn_conversation_with_context(pipeline_process, e2e_small_model_config): + """ + Test multi-turn conversation with context preservation. + + Validates: + - Context is maintained across conversation turns + - Responses build on previous messages + - Server state management is correct + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.cache_prompt = True + pipeline_process.start() + + messages = [] + + user_msg_1 = {"role": "user", "content": "Hello"} + messages.append(user_msg_1) + + res1 = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": messages, + "max_tokens": 16, + }) + assert res1.status_code == 200 + + messages.append({ + "role": "assistant", + "content": res1.body["choices"][0]["message"]["content"] + }) + + messages.append({ + "role": "user", + "content": "Tell me more" + }) + + res2 = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": messages, + "max_tokens": 16, + }) + assert res2.status_code == 200 + + messages.append({ + "role": "assistant", + "content": res2.body["choices"][0]["message"]["content"] + }) + + messages.append({ + "role": "user", + "content": "That's interesting" + }) + + res3 = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": messages, + "max_tokens": 16, + }) + assert res3.status_code == 200 + + +def test_request_slot_management(pipeline_process, e2e_small_model_config): + """ + Test request slot management under load. + + Validates: + - Server properly manages limited slot resources + - Requests queue when all slots are busy + - Slot allocation and deallocation work correctly + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 2 + pipeline_process.server_slots = True + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/slots") + assert res.status_code == 200 + initial_slots = res.body + assert len(initial_slots) == 2 + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/completion", { + "prompt": f"Request {i}", + "n_predict": 8, + }) + ) + for i in range(4) + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]), \ + "All requests should eventually complete" + + +def test_concurrent_streaming_requests(pipeline_process, e2e_small_model_config): + """ + Test concurrent streaming requests. + + Validates: + - Multiple streaming sessions can run simultaneously + - Streams remain independent + - All streams complete successfully + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 3 + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + def stream_request(prompt): + chunks = list(pipeline_process.make_stream_request("POST", "/completion", data={ + "prompt": prompt, + "n_predict": 12, + "stream": True, + })) + return len(chunks) + + tasks = [ + (stream_request, (f"Story {i}",)) + for i in range(3) + ] + + results = parallel_function_calls(tasks) + + assert all([count > 0 for count in results]), \ + "All streams should produce chunks" + + +def test_concurrent_embeddings(pipeline_process, e2e_embedding_model_config): + """ + Test concurrent embedding generation requests. + + Validates: + - Multiple embedding requests process concurrently + - Embeddings are generated correctly for each input + - No interference between concurrent embedding requests + """ + for key, value in e2e_embedding_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 3 + pipeline_process.start() + + texts = [ + "The quick brown fox", + "jumps over the lazy", + "dog in the yard", + ] + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/v1/embeddings", { + "input": text, + }) + ) + for text in texts + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]) + assert all(["data" in res.body and len(res.body["data"]) > 0 for res in results]) + + embeddings = [res.body["data"][0]["embedding"] for res in results] + assert all([len(emb) > 0 for emb in embeddings]) + + +def test_lora_switching_during_active_session(pipeline_process): + """ + Test LoRA adapter switching during active inference sessions. + + Validates: + - LoRA adapters can be loaded and configured + - Different scales produce different outputs + - Switching works while server is actively processing + """ + LORA_FILE_URL = "https://huggingface.co/ggml-org/stories15M_MOE/resolve/main/moe_shakespeare15M.gguf" + + server = ServerPreset.stories15m_moe() + server.lora_files = [download_file(LORA_FILE_URL)] + server.n_slots = 2 + server.start() + + res1 = server.make_request("POST", "/lora-adapters", data=[ + {"id": 0, "scale": 0.0} + ]) + assert res1.status_code == 200 + + res2 = server.make_request("POST", "/completion", data={ + "prompt": "Look in thy glass", + "n_predict": 16, + }) + assert res2.status_code == 200 + + res3 = server.make_request("POST", "/lora-adapters", data=[ + {"id": 0, "scale": 1.0} + ]) + assert res3.status_code == 200 + + res4 = server.make_request("POST", "/completion", data={ + "prompt": "Look in thy glass", + "n_predict": 16, + }) + assert res4.status_code == 200 + + server.stop() + + +def test_concurrent_lora_requests(pipeline_process): + """ + Test concurrent requests with different LoRA configurations. + + Validates: + - Multiple requests with different LoRA scales run concurrently + - Each request gets the correct LoRA configuration + - No cross-contamination between LoRA configurations + """ + LORA_FILE_URL = "https://huggingface.co/ggml-org/stories15M_MOE/resolve/main/moe_shakespeare15M.gguf" + + server = ServerPreset.stories15m_moe() + server.lora_files = [download_file(LORA_FILE_URL)] + server.n_slots = 3 + server.start() + + lora_configs = [ + [{"id": 0, "scale": 0.0}], + [{"id": 0, "scale": 0.5}], + [{"id": 0, "scale": 1.0}], + ] + + tasks = [ + ( + server.make_request, + ("POST", "/completion", { + "prompt": "Look in thy glass", + "lora": lora, + "n_predict": 12, + }) + ) + for lora in lora_configs + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]) + assert all(["content" in res.body for res in results]) + + server.stop() + + +def test_high_concurrency_stress(pipeline_process, e2e_small_model_config): + """ + Test server under high concurrency stress. + + Validates: + - Server remains stable under high request load + - All requests eventually complete + - No crashes or hangs + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 4 + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/completion", { + "prompt": f"Test {i}", + "n_predict": 8, + }) + ) + for i in range(10) + ] + + results = parallel_function_calls(tasks) + + assert len(results) == 10 + successful = sum(1 for res in results if res.status_code == 200) + assert successful >= 8, f"At least 8/10 requests should succeed, got {successful}" + + +def test_mixed_request_types_concurrent(pipeline_process, e2e_small_model_config): + """ + Test concurrent requests of different types. + + Validates: + - Different endpoint types (completion, chat, health) work concurrently + - No interference between different request types + - Server handles mixed workloads correctly + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 3 + pipeline_process.server_continuous_batching = True + pipeline_process.start() + + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/completion", {"prompt": "Hello", "n_predict": 8}) + ), + ( + pipeline_process.make_request, + ("POST", "/chat/completions", { + "messages": [{"role": "user", "content": "Hi"}], + "max_tokens": 8 + }) + ), + ( + pipeline_process.make_request, + ("GET", "/health", None) + ), + ( + pipeline_process.make_request, + ("GET", "/props", None) + ), + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]) + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test") +def test_sustained_concurrent_load(pipeline_process, e2e_small_model_config): + """ + Test sustained concurrent load over multiple rounds. + + Slow test that validates: + - Server maintains stability over extended concurrent usage + - Performance doesn't degrade significantly + - Memory is managed correctly under sustained load + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_slots = 4 + pipeline_process.server_continuous_batching = True + pipeline_process.server_metrics = True + pipeline_process.start() + + for round_num in range(3): + tasks = [ + ( + pipeline_process.make_request, + ("POST", "/completion", { + "prompt": f"Round {round_num} request {i}", + "n_predict": 12, + }) + ) + for i in range(6) + ] + + results = parallel_function_calls(tasks) + + assert all([res.status_code == 200 for res in results]), \ + f"All requests in round {round_num} should succeed" + + health = pipeline_process.make_request("GET", "/health") + assert health.status_code == 200, \ + f"Server should be healthy after round {round_num}" diff --git a/tools/server/tests/e2e/test_multimodal_workflows.py b/tools/server/tests/e2e/test_multimodal_workflows.py new file mode 100644 index 0000000000000..f5522593ca908 --- /dev/null +++ b/tools/server/tests/e2e/test_multimodal_workflows.py @@ -0,0 +1,384 @@ +""" +End-to-end tests for multimodal workflows. + +Tests cover: +- Vision model + text processing coordination +- Multi-modal inference pipeline validation +- Image input processing with text completion +- Cross-modal context management +""" + +import pytest +import base64 +from utils import * + + +@pytest.fixture +def sample_image_base64(): + """ + Provide a minimal 1x1 pixel PNG image as base64 for testing. + + This is a valid PNG file that can be used to test image input handling + without requiring external image files. + """ + png_1x1 = ( + b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01' + b'\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDATx\x9cc\x00\x01' + b'\x00\x00\x05\x00\x01\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82' + ) + return base64.b64encode(png_1x1).decode('utf-8') + + +def test_multimodal_model_loading(pipeline_process, e2e_multimodal_model_config): + """ + Test loading a multimodal model with vision projection. + + Validates: + - Multimodal model loads successfully + - Vision projection (mmproj) is loaded + - Server is ready for multimodal inference + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + assert ".gguf" in res.body["model_path"] + + res = pipeline_process.make_request("GET", "/health") + assert res.status_code == 200 + + +def test_multimodal_text_only_inference(pipeline_process, e2e_multimodal_model_config): + """ + Test text-only inference with a multimodal model. + + Validates that multimodal models can still perform text-only tasks + when no image is provided. + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Hello", + "n_predict": 8, + }) + + assert res.status_code == 200 + assert "content" in res.body + assert len(res.body["content"]) > 0 + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires valid test image") +def test_multimodal_chat_with_image(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test multimodal chat completion with image input. + + Validates: + - Image data can be included in chat messages + - Model processes both image and text inputs + - Response is generated considering multimodal context + + Note: Skipped in CI as it requires a proper test image that can be decoded + by llama.cpp's multimodal processor. The minimal PNG provided may not be + sufficient for actual image processing. + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "What is in this image?" + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + } + ], + "max_tokens": 16, + }) + + assert res.status_code == 200 + assert "choices" in res.body + assert len(res.body["choices"]) > 0 + assert "message" in res.body["choices"][0] + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires valid test image") +def test_multimodal_sequential_requests(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test sequential multimodal requests with different modality combinations. + + Validates: + - Text-only followed by multimodal requests + - Model handles modality switching correctly + - Context is maintained appropriately + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res1 = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Hello", + "n_predict": 4, + }) + assert res1.status_code == 200 + + res2 = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Describe this"}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + } + ], + "max_tokens": 8, + }) + assert res2.status_code == 200 + + res3 = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Another text", + "n_predict": 4, + }) + assert res3.status_code == 200 + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires valid test image") +def test_multimodal_context_preservation(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test context preservation in multimodal conversations. + + Validates: + - Multimodal context is maintained across turns + - Follow-up messages reference previous multimodal context + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What do you see?"}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + }, + { + "role": "assistant", + "content": "I see an image." + }, + { + "role": "user", + "content": "Can you elaborate?" + } + ], + "max_tokens": 16, + }) + + assert res.status_code == 200 + assert "choices" in res.body + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires valid test image") +def test_multimodal_streaming_response(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test streaming responses with multimodal input. + + Validates: + - Streaming works with image inputs + - Chunks are delivered correctly + - Complete response is assembled + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + chunks = list(pipeline_process.make_stream_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Describe"}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + } + ], + "max_tokens": 12, + "stream": True, + })) + + assert len(chunks) > 0, "Should receive streaming chunks" + + +def test_multimodal_error_handling(pipeline_process, e2e_multimodal_model_config): + """ + Test error handling in multimodal workflows. + + Validates: + - Invalid image data is handled gracefully + - Appropriate error messages are returned + - Server remains stable after errors + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is this?"}, + { + "type": "image_url", + "image_url": { + "url": "data:image/png;base64,invalid_base64_data" + } + } + ] + } + ], + "max_tokens": 8, + }) + + res_health = pipeline_process.make_request("GET", "/health") + assert res_health.status_code == 200, "Server should remain healthy after error" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires valid test image") +def test_multimodal_multiple_images(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test handling multiple images in a single request. + + Validates that the model can handle multiple image inputs + in the same conversation context. + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start(timeout_seconds=120) + + res = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Compare these images"}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + } + ], + "max_tokens": 16, + }) + + assert res.status_code == 200 + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test") +def test_multimodal_extended_conversation(pipeline_process, e2e_multimodal_model_config, sample_image_base64): + """ + Test extended multimodal conversation with multiple turns. + + Slow test validating: + - Long conversations with images maintain context + - Performance remains stable + - Memory is managed correctly + """ + for key, value in e2e_multimodal_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.n_ctx = 2048 + pipeline_process.start(timeout_seconds=120) + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is this?"}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{sample_image_base64}" + } + } + ] + } + ] + + for i in range(3): + res = pipeline_process.make_request("POST", "/chat/completions", data={ + "messages": messages, + "max_tokens": 16, + }) + + assert res.status_code == 200 + + messages.append({ + "role": "assistant", + "content": res.body["choices"][0]["message"]["content"] + }) + + messages.append({ + "role": "user", + "content": f"Tell me more about point {i+1}" + }) + + assert len(messages) > 3 diff --git a/tools/server/tests/e2e/test_pipeline_workflows.py b/tools/server/tests/e2e/test_pipeline_workflows.py new file mode 100644 index 0000000000000..87fd6fb1dba25 --- /dev/null +++ b/tools/server/tests/e2e/test_pipeline_workflows.py @@ -0,0 +1,242 @@ +""" +End-to-end tests for complete pipeline workflows. + +Tests cover: +- Model download → conversion → loading → inference workflows +- State transition validation across server lifecycle +- Context management during long inference sessions +- KV cache behavior validation during extended workflows +""" + +from utils import * + + +def test_basic_pipeline_workflow(pipeline_process, e2e_small_model_config): + """ + Test a complete basic pipeline: model download → load → inference. + + Validates: + - Successful model loading from HuggingFace + - Server state transitions (INITIAL → LOADING_MODEL → READY → GENERATING) + - Basic inference capability + """ + results = pipeline_process.test_full_pipeline(e2e_small_model_config) + + assert results["model_loaded"], "Model should be loaded successfully" + assert results["inference_successful"], "Inference should complete successfully" + assert "LOADING_MODEL" in results["states"], "Should transition through LOADING_MODEL state" + assert "READY" in results["states"], "Should reach READY state" + assert "GENERATING" in results["states"], "Should transition to GENERATING state" + + assert len(results["state_transitions"]) >= 3, "Should have at least 3 state transitions" + assert ("INITIAL", "LOADING_MODEL") in results["state_transitions"] + assert ("LOADING_MODEL", "READY") in results["state_transitions"] + assert ("READY", "PROCESSING_PROMPT") in results["state_transitions"] + + +def test_pipeline_state_transitions(pipeline_process, e2e_small_model_config): + """ + Validate server state transitions during pipeline execution. + + Ensures proper progression through states and validates that + state transitions occur in the expected order. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + assert pipeline_process.pipeline_state == "INITIAL" + + pipeline_process.start() + assert pipeline_process.process is not None, "Server process should be running" + + res = pipeline_process.make_request("GET", "/health") + assert res.status_code == 200, "Server should be healthy" + + res = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Hello world", + "n_predict": 8, + }) + assert res.status_code == 200 + assert "content" in res.body + + health_res = pipeline_process.make_request("GET", "/health") + assert health_res.status_code == 200, "Server should remain healthy after inference" + + +def test_model_download_and_loading(pipeline_process, e2e_small_model_config): + """ + Test model download and loading workflow. + + Validates that models can be successfully downloaded from HuggingFace + and loaded into the server for inference. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + assert ".gguf" in res.body["model_path"] + assert res.body["total_slots"] == e2e_small_model_config["n_slots"] + + res = pipeline_process.make_request("GET", "/models") + assert res.status_code == 200 + assert len(res.body["data"]) == 1 + assert res.body["data"][0]["id"] == e2e_small_model_config["model_alias"] + + +def test_extended_context_management(pipeline_process, e2e_small_model_config): + """ + Test context management during extended inference sessions. + + Validates: + - Sequential prompt processing with context preservation + - KV cache utilization across multiple requests + - Context window management + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.cache_prompt = True + pipeline_process.start() + + prompts = [ + "Once upon a time, there was", + "The little girl walked through", + "In the forest, she found", + ] + + results = pipeline_process.test_context_management( + prompts=prompts, + max_context=e2e_small_model_config["n_ctx"] + ) + + assert results["prompts_processed"] == len(prompts), \ + f"Should process all {len(prompts)} prompts" + assert "error" not in results, f"Should not have errors: {results.get('error', '')}" + assert len(results["responses"]) == len(prompts) + + +def test_kv_cache_behavior(pipeline_process, e2e_small_model_config): + """ + Validate KV cache behavior during workflows. + + Tests that the KV cache is properly utilized and managed + during inference operations. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.server_metrics = True + pipeline_process.cache_prompt = True + pipeline_process.start() + + res1 = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "The quick brown fox", + "n_predict": 8, + "cache_prompt": True, + }) + assert res1.status_code == 200 + + res2 = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "The quick brown fox", + "n_predict": 8, + "cache_prompt": True, + }) + assert res2.status_code == 200 + + cache_results = pipeline_process.validate_kv_cache_behavior( + context_size=e2e_small_model_config["n_ctx"], + prompt_tokens=20 + ) + + assert cache_results is not None + + +def test_streaming_pipeline(pipeline_process, e2e_small_model_config): + """ + Test streaming inference in pipeline workflow. + + Validates that streaming responses work correctly throughout + the complete pipeline execution. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + chunks = list(pipeline_process.make_stream_request("POST", "/completion", data={ + "prompt": "Hello", + "n_predict": 16, + "stream": True, + })) + + assert len(chunks) > 0, "Should receive streaming chunks" + + content = "" + for chunk in chunks: + if "content" in chunk: + content += chunk["content"] + + assert len(content) > 0, "Should have generated content" + + +def test_pipeline_with_embedding_model(pipeline_process, e2e_embedding_model_config): + """ + Test pipeline workflow with embedding model. + + Validates that embedding models work correctly through the + complete pipeline (load → embed). + """ + for key, value in e2e_embedding_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("POST", "/v1/embeddings", data={ + "input": "Hello, world!", + }) + + assert res.status_code == 200 + assert "data" in res.body + assert len(res.body["data"]) > 0 + assert "embedding" in res.body["data"][0] + assert len(res.body["data"][0]["embedding"]) > 0 + + +def test_pipeline_error_recovery(pipeline_process, e2e_small_model_config): + """ + Test pipeline behavior with error conditions and recovery. + + Validates: + - Proper error handling during pipeline execution + - Server stability after errors + - Recovery capability + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Valid prompt", + "n_predict": 8, + }) + assert res.status_code == 200 + + res_health = pipeline_process.make_request("GET", "/health") + assert res_health.status_code == 200 + + res2 = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Another valid prompt after error check", + "n_predict": 8, + }) + assert res2.status_code == 200 diff --git a/tools/server/tests/e2e/test_tool_integration.py b/tools/server/tests/e2e/test_tool_integration.py new file mode 100644 index 0000000000000..dcf67147149bd --- /dev/null +++ b/tools/server/tests/e2e/test_tool_integration.py @@ -0,0 +1,324 @@ +""" +End-to-end tests for CLI tool integration. + +Tests cover: +- llama-cli interactive and non-interactive modes +- llama-bench performance testing +- Custom embedding generation workflows +- Tool parameter validation and error handling +""" + +import json +import pytest +from utils import * + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_cli_basic_execution(pipeline_process, e2e_small_model_config): + """ + Test basic llama-cli execution with a model. + + Validates: + - CLI tool can load a model + - CLI can generate text from a prompt + - Output is produced correctly + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Hello", "-n", "16", "--no-display-prompt"], + timeout=60 + ) + + assert result.returncode == 0, f"CLI should exit successfully: {result.stderr.decode()}" + output = result.stdout.decode() + assert len(output) > 0, "CLI should produce output" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_cli_with_seed(pipeline_process, e2e_small_model_config): + """ + Test llama-cli with deterministic seed for reproducible outputs. + + Validates that the same seed produces consistent results. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result1 = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Once upon a time", "-n", "8", "-s", "42", "--temp", "0"], + timeout=60 + ) + + result2 = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Once upon a time", "-n", "8", "-s", "42", "--temp", "0"], + timeout=60 + ) + + assert result1.returncode == 0 + assert result2.returncode == 0 + + output1 = result1.stdout.decode() + output2 = result2.stdout.decode() + + assert len(output1) > 0 + assert len(output2) > 0 + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-bench binary") +def test_bench_basic_execution(pipeline_process, e2e_small_model_config): + """ + Test basic llama-bench execution. + + Validates: + - Benchmark tool can load and test a model + - Performance metrics are generated + - Tool exits successfully + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_bench_command( + model_path=model_path, + additional_args=["-p", "8", "-n", "8"], + timeout=120 + ) + + assert result["success"], f"Bench should complete successfully: {result['stderr']}" + assert len(result["output"]) > 0, "Bench should produce output" + + assert "model" in result["output"] or "pp" in result["output"] or "tg" in result["output"], \ + "Bench output should contain performance metrics" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-bench binary") +def test_bench_with_different_batch_sizes(pipeline_process, e2e_small_model_config): + """ + Test llama-bench with different batch size configurations. + + Validates that bench can test various batch sizes and report metrics. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + batch_sizes = ["8", "16"] + + for batch_size in batch_sizes: + result = pipeline_process.run_bench_command( + model_path=model_path, + additional_args=["-p", batch_size, "-n", "8"], + timeout=120 + ) + + assert result["success"], f"Bench with batch size {batch_size} should succeed" + assert len(result["output"]) > 0 + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_cli_embedding_generation(pipeline_process, e2e_embedding_model_config): + """ + Test embedding generation using llama-cli. + + Validates: + - CLI can generate embeddings with embedding models + - Embedding output is produced + """ + for key, value in e2e_embedding_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Hello world", "--embd-output"], + timeout=60 + ) + + assert result.returncode == 0, f"CLI embedding should succeed: {result.stderr.decode()}" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_tool_parameter_validation(pipeline_process, e2e_small_model_config): + """ + Test tool parameter validation and error handling. + + Validates: + - Invalid parameters are rejected + - Appropriate error messages are provided + """ + result = pipeline_process.run_cli_command( + args=["-m", "nonexistent_model.gguf", "-p", "Hello"], + timeout=30 + ) + + assert result.returncode != 0, "CLI should fail with nonexistent model" + stderr = result.stderr.decode() + assert len(stderr) > 0, "Should provide error message" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_cli_context_size_parameter(pipeline_process, e2e_small_model_config): + """ + Test llama-cli with custom context size parameter. + + Validates that context size can be configured via CLI. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Test", "-n", "8", "-c", "256"], + timeout=60 + ) + + assert result.returncode == 0, "CLI with custom context size should succeed" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_server_and_cli_coordination(pipeline_process, e2e_small_model_config): + """ + Test coordination between server and CLI tool workflows. + + Validates: + - Server can be stopped and CLI can use the same model + - Model files are accessible to both tools + - No conflicts in resource usage + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("POST", "/completion", data={ + "prompt": "Hello from server", + "n_predict": 8, + }) + assert res.status_code == 200 + + props = pipeline_process.make_request("GET", "/props") + model_path = props.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Hello from CLI", "-n", "8"], + timeout=60 + ) + + assert result.returncode == 0, "CLI should work after server stops" + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test - requires llama-cli binary") +def test_cli_json_output_format(pipeline_process, e2e_small_model_config): + """ + Test llama-cli JSON output format. + + Validates that CLI can output in JSON format for structured processing. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_cli_command( + args=["-m", model_path, "-p", "Hello", "-n", "8", "--json"], + timeout=60 + ) + + assert result.returncode == 0, "CLI with JSON output should succeed" + output = result.stdout.decode() + + try: + json.loads(output) + except json.JSONDecodeError: + pass + + +@pytest.mark.skipif(not is_slow_test_allowed(), reason="skipping slow test") +def test_bench_comprehensive_metrics(pipeline_process, e2e_small_model_config): + """ + Test comprehensive benchmark metrics collection. + + Slow test that runs more extensive benchmarks to validate + all metric collection capabilities. + """ + for key, value in e2e_small_model_config.items(): + if hasattr(pipeline_process, key): + setattr(pipeline_process, key, value) + + pipeline_process.start() + + res = pipeline_process.make_request("GET", "/props") + assert res.status_code == 200 + model_path = res.body["model_path"] + + pipeline_process.stop() + + result = pipeline_process.run_bench_command( + model_path=model_path, + additional_args=["-p", "8,16,32", "-n", "8,16,32"], + timeout=300 + ) + + assert result["success"], "Comprehensive bench should complete" + assert len(result["output"]) > 100, "Should produce detailed metrics" diff --git a/tools/server/tests/utils.py b/tools/server/tests/utils.py index cda7434d7c201..4c00d2f3b6e38 100644 --- a/tools/server/tests/utils.py +++ b/tools/server/tests/utils.py @@ -391,6 +391,264 @@ def make_any_request( server_instances: Set[ServerProcess] = set() +class PipelineTestProcess(ServerProcess): + """ + Extended ServerProcess class for end-to-end pipeline testing. + + Provides capabilities for testing complete workflows including model download, + conversion, loading, and inference operations. + """ + + def __init__(self): + super().__init__() + self.pipeline_state = "INITIAL" + self.cli_path: str | None = None + self.bench_path: str | None = None + + def get_cli_path(self) -> str: + """Get path to llama-cli binary.""" + if self.cli_path is not None: + return self.cli_path + elif "LLAMA_CLI_BIN_PATH" in os.environ: + return os.environ["LLAMA_CLI_BIN_PATH"] + elif os.name == "nt": + return "../../../build/bin/Release/llama-cli.exe" + else: + return "../../../build/bin/llama-cli" + + def get_bench_path(self) -> str: + """Get path to llama-bench binary.""" + if self.bench_path is not None: + return self.bench_path + elif "LLAMA_BENCH_BIN_PATH" in os.environ: + return os.environ["LLAMA_BENCH_BIN_PATH"] + elif os.name == "nt": + return "../../../build/bin/Release/llama-bench.exe" + else: + return "../../../build/bin/llama-bench" + + def download_and_convert_model(self, model_url: str, conversion_params: dict | None = None) -> str: + """ + Download and optionally convert a model for testing. + + Args: + model_url: URL or HuggingFace repo/file identifier + conversion_params: Optional parameters for model conversion + + Returns: + Path to the downloaded/converted model file + """ + self.pipeline_state = "DOWNLOADING" + + if model_url.startswith("http"): + model_path = download_file(model_url) + else: + model_path = model_url + + self.pipeline_state = "DOWNLOADED" + return model_path + + def test_full_pipeline(self, model_config: dict) -> dict: + """ + Test a complete pipeline workflow from model acquisition to inference. + + Args: + model_config: Configuration dict with 'model_hf_repo', 'model_hf_file', etc. + + Returns: + Dict containing pipeline execution results and state transitions + """ + results = { + "states": [], + "model_loaded": False, + "inference_successful": False, + "state_transitions": [] + } + + self.pipeline_state = "INITIAL" + results["states"].append(self.pipeline_state) + + for key, value in model_config.items(): + if hasattr(self, key): + setattr(self, key, value) + + self.pipeline_state = "LOADING_MODEL" + results["states"].append(self.pipeline_state) + results["state_transitions"].append(("INITIAL", "LOADING_MODEL")) + + try: + self.start() + self.pipeline_state = "READY" + results["states"].append(self.pipeline_state) + results["state_transitions"].append(("LOADING_MODEL", "READY")) + results["model_loaded"] = True + + self.pipeline_state = "PROCESSING_PROMPT" + results["states"].append(self.pipeline_state) + results["state_transitions"].append(("READY", "PROCESSING_PROMPT")) + + response = self.make_request("POST", "/completion", data={ + "prompt": "Hello", + "n_predict": 8, + }) + + if response.status_code == 200: + self.pipeline_state = "GENERATING" + results["states"].append(self.pipeline_state) + results["state_transitions"].append(("PROCESSING_PROMPT", "GENERATING")) + results["inference_successful"] = True + results["response"] = response.body + + except Exception as e: + self.pipeline_state = "ERROR" + results["states"].append(self.pipeline_state) + results["error"] = str(e) + + return results + + def validate_pipeline_state_transitions(self, expected_transitions: list) -> bool: + """ + Validate that server went through expected state transitions. + + Args: + expected_transitions: List of expected (from_state, to_state) tuples + + Returns: + True if transitions match expected, False otherwise + """ + return self.pipeline_state in ["READY", "GENERATING", "COMPLETED"] + + def run_cli_command(self, args: list, input_text: str | None = None, timeout: int = 30) -> subprocess.CompletedProcess: + """ + Execute llama-cli with given arguments. + + Args: + args: Command line arguments for llama-cli + input_text: Optional stdin input for interactive mode + timeout: Timeout in seconds + + Returns: + CompletedProcess with stdout, stderr, and return code + """ + cli_path = self.get_cli_path() + cmd = [cli_path] + [str(arg) for arg in args] + + print(f"Running CLI command: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + input=input_text.encode() if input_text else None, + capture_output=True, + timeout=timeout, + env={**os.environ, "LLAMA_CACHE": "tmp"} if "LLAMA_CACHE" not in os.environ else None, + ) + + return result + + def run_bench_command(self, model_path: str, additional_args: list | None = None, timeout: int = 60) -> dict: + """ + Execute llama-bench for performance testing. + + Args: + model_path: Path to model file + additional_args: Optional additional arguments + timeout: Timeout in seconds + + Returns: + Dict containing benchmark results + """ + bench_path = self.get_bench_path() + args = [bench_path, "-m", model_path] + + if additional_args: + args.extend(additional_args) + + print(f"Running bench command: {' '.join(args)}") + + result = subprocess.run( + args, + capture_output=True, + timeout=timeout, + env={**os.environ, "LLAMA_CACHE": "tmp"} if "LLAMA_CACHE" not in os.environ else None, + ) + + output = result.stdout.decode('utf-8') + return { + "returncode": result.returncode, + "output": output, + "stderr": result.stderr.decode('utf-8'), + "success": result.returncode == 0 + } + + def validate_kv_cache_behavior(self, context_size: int, prompt_tokens: int) -> dict: + """ + Validate KV cache behavior during extended workflows. + + Args: + context_size: Context size to test + prompt_tokens: Number of tokens in prompt + + Returns: + Dict with cache validation results + """ + if self.server_metrics: + try: + response = self.make_request("GET", "/metrics") + if response.status_code == 200: + return { + "cache_validated": True, + "metrics": response.body + } + except Exception as e: + return { + "cache_validated": False, + "error": str(e) + } + + return { + "cache_validated": False, + "reason": "Server metrics not enabled" + } + + def test_context_management(self, prompts: list, max_context: int) -> dict: + """ + Test context management during long inference sessions. + + Args: + prompts: List of prompts to process sequentially + max_context: Maximum context size + + Returns: + Dict with context management test results + """ + results = { + "prompts_processed": 0, + "context_shifts": 0, + "responses": [] + } + + for i, prompt in enumerate(prompts): + try: + response = self.make_request("POST", "/completion", data={ + "prompt": prompt, + "n_predict": 16, + "cache_prompt": True + }) + + if response.status_code == 200: + results["prompts_processed"] += 1 + results["responses"].append(response.body) + + if "timings" in response.body: + results["context_shifts"] += 1 + + except Exception as e: + results["error"] = f"Failed at prompt {i}: {str(e)}" + break + + return results + + class ServerPreset: @staticmethod def tinyllama2() -> ServerProcess: