diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 91719577564a9..bc3b3a7d9289f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -187,6 +187,18 @@ llama_build_and_test(test-regex-partial.cpp) llama_build_and_test(test-thread-safety.cpp ARGS -hf ggml-org/models -hff tinyllamas/stories15M-q4_0.gguf -ngl 99 -p "The meaning of life is" -n 128 -c 256 -ub 32 -np 4 -t 2) +llama_build_and_test(test-concurrent-stress.cpp + NAME test-concurrent-stress + ARGS -hf ggml-org/models -hff tinyllamas/stories15M-q4_0.gguf + -ngl 99 -p "Test prompt" -n 64 -c 256 -np 4 -t 4 + LABEL concurrent) + +llama_build_and_test(test-kv-cache-concurrent.cpp + NAME test-kv-cache-concurrent + ARGS -hf ggml-org/models -hff tinyllamas/stories15M-q4_0.gguf + -ngl 99 -p "Test prompt" -n 64 -c 256 -np 8 -t 4 + LABEL concurrent) + # this fails on windows (github hosted runner) due to curl DLL not found (exit code 0xc0000135) if (NOT WIN32) llama_build_and_test(test-arg-parser.cpp) diff --git a/tests/test-concurrent-stress.cpp b/tests/test-concurrent-stress.cpp new file mode 100644 index 0000000000000..15ec5c47cb845 --- /dev/null +++ b/tests/test-concurrent-stress.cpp @@ -0,0 +1,300 @@ +#include "arg.h" +#include "common.h" +#include "log.h" +#include "llama.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +struct test_result { + std::atomic contexts_created{0}; + std::atomic contexts_destroyed{0}; + std::atomic batches_processed{0}; + std::atomic errors{0}; + std::atomic corruption_detected{false}; +}; + +static void test_rapid_context_cycles( + llama_model * model, + llama_context_params base_params, + test_result & result, + int thread_id, + int iterations +) { + const int64_t t_start = ggml_time_us(); + + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 10); + + for (int i = 0; i < iterations; i++) { + llama_context * ctx = llama_init_from_model(model, base_params); + + if (!ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_free(ctx); + result.contexts_destroyed++; + } + + const int64_t t_end = ggml_time_us(); + LOG_INF("thread %d: completed %d context cycles in %.2f ms\n", + thread_id, iterations, (t_end - t_start) / 1000.0); +} + + +static void test_backend_resource_stress( + llama_model * model, + llama_context_params base_params, + test_result & result, + int thread_id, + int iterations +) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 8); + + for (int i = 0; i < iterations; i++) { + llama_context_params ctx_params = base_params; + + ctx_params.n_ctx = 128 + (i % 4) * 64; + ctx_params.n_batch = 32 + (i % 3) * 16; + + llama_context * ctx = llama_init_from_model(model, ctx_params); + if (!ctx) { + LOG_ERR("thread %d: failed to create context with varying params on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_free(ctx); + result.contexts_destroyed++; + } +} + +int main(int argc, char ** argv) { + common_params params; + + if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_COMMON)) { + return 1; + } + + common_init(); + + llama_backend_init(); + llama_numa_init(params.numa); + + auto mparams = common_model_params_to_llama(params); + llama_model * model = llama_model_load_from_file(params.model.path.c_str(), mparams); + if (!model) { + LOG_ERR("failed to load model\n"); + return 1; + } + + auto cparams = common_context_params_to_llama(params); + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("failed to create context\n"); + llama_model_free(model); + return 1; + } + + const int n_threads = params.cpuparams.n_threads; + const int iterations_per_thread = 20; + + LOG_INF("Starting concurrent stress tests with %d threads, %d iterations per thread\n", + n_threads, iterations_per_thread); + + LOG_INF("\n=== Test 1: Rapid Context Creation/Destruction Cycles ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_rapid_context_cycles, model, cparams, + std::ref(result), i, iterations_per_thread); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 1 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + LOG_INF(" Avg time per context: %.2f ms\n", + (t_end - t_start) / 1000.0 / result.contexts_created.load()); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during context cycles\n", result.errors.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: No context leaks or errors detected\n"); + } + + LOG_INF("\n=== Test 2: Parallel Context Operations ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + auto parallel_context_ops = [&](int thread_id) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 5); + + for (int i = 0; i < iterations_per_thread / 4; i++) { + llama_context * thread_ctx = llama_init_from_model(model, cparams); + if (!thread_ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + std::vector tokens = common_tokenize(thread_ctx, "Test prompt", true, true); + if (!tokens.empty()) { + llama_batch batch = llama_batch_init(tokens.size(), 0, 1); + for (size_t j = 0; j < tokens.size(); j++) { + common_batch_add(batch, tokens[j], j, {0}, false); + } + + if (llama_decode(thread_ctx, batch) == 0) { + result.batches_processed++; + } + + llama_batch_free(batch); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_free(thread_ctx); + result.contexts_destroyed++; + } + }; + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(parallel_context_ops, i); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 2 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Batches processed: %d\n", result.batches_processed.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during parallel operations\n", result.errors.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: All parallel context operations completed successfully\n"); + } + + LOG_INF("\n=== Test 3: Backend Resource Allocation Stress ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_backend_resource_stress, model, cparams, + std::ref(result), i, iterations_per_thread / 4); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 3 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Resource leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during resource stress test\n", result.errors.load()); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: No resource leaks detected\n"); + } + + llama_free(ctx); + llama_model_free(model); + + LOG_INF("\n=== All Concurrent Stress Tests PASSED ===\n"); + return 0; +} diff --git a/tests/test-kv-cache-concurrent.cpp b/tests/test-kv-cache-concurrent.cpp new file mode 100644 index 0000000000000..4c751e7982a44 --- /dev/null +++ b/tests/test-kv-cache-concurrent.cpp @@ -0,0 +1,429 @@ +#include "arg.h" +#include "common.h" +#include "log.h" +#include "llama.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +struct test_result { + std::atomic contexts_created{0}; + std::atomic contexts_destroyed{0}; + std::atomic prepare_success{0}; + std::atomic update_success{0}; + std::atomic seq_ops_success{0}; + std::atomic errors{0}; +}; + +static void test_concurrent_kv_prepare( + llama_model * model, + llama_context_params cparams, + const std::vector & tokens, + test_result & result, + int thread_id, + int iterations +) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 5); + + for (int i = 0; i < iterations; i++) { + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + const int n_batch = llama_n_batch(ctx); + llama_batch batch = llama_batch_init(n_batch, 0, 1); + + const int n_tokens = std::min((int)tokens.size(), n_batch); + for (int j = 0; j < n_tokens; j++) { + common_batch_add(batch, tokens[j], j, {0}, false); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + if (llama_decode(ctx, batch) == 0) { + result.prepare_success++; + } else { + result.errors++; + } + + llama_batch_free(batch); + llama_free(ctx); + result.contexts_destroyed++; + } +} + +static void test_concurrent_kv_update( + llama_model * model, + llama_context_params base_params, + const std::vector & tokens, + test_result & result, + int thread_id, + int iterations +) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 5); + + for (int i = 0; i < iterations; i++) { + llama_context_params cparams = base_params; + cparams.n_ctx = 128 + (i % 3) * 64; + + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + const int n_batch = llama_n_batch(ctx); + llama_batch batch = llama_batch_init(std::min((int)tokens.size(), n_batch), 0, 1); + + for (size_t j = 0; j < std::min(tokens.size(), (size_t)n_batch); j++) { + common_batch_add(batch, tokens[j], j, {0}, false); + } + + if (llama_decode(ctx, batch) == 0) { + result.update_success++; + } else { + result.errors++; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_batch_free(batch); + llama_free(ctx); + result.contexts_destroyed++; + } +} + +static void test_concurrent_seq_operations( + llama_model * model, + llama_context_params cparams, + const std::vector & tokens, + test_result & result, + int thread_id, + int iterations +) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 5); + + for (int i = 0; i < iterations; i++) { + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + const int n_batch = llama_n_batch(ctx); + llama_batch batch = llama_batch_init(std::min((int)tokens.size(), n_batch), 0, 1); + + for (size_t j = 0; j < std::min(tokens.size(), (size_t)n_batch); j++) { + common_batch_add(batch, tokens[j], j, {0}, false); + } + + if (llama_decode(ctx, batch) == 0) { + llama_memory_seq_cp(llama_get_memory(ctx), 0, 1, -1, -1); + llama_memory_seq_rm(llama_get_memory(ctx), 0, -1, -1); + llama_memory_seq_rm(llama_get_memory(ctx), 1, -1, -1); + result.seq_ops_success++; + } else { + result.errors++; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_batch_free(batch); + llama_free(ctx); + result.contexts_destroyed++; + } +} + +static void test_concurrent_mixed_operations( + llama_model * model, + llama_context_params cparams, + const std::vector & tokens, + test_result & result, + int thread_id, + int iterations +) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 3); + + for (int i = 0; i < iterations; i++) { + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + result.errors++; + continue; + } + + result.contexts_created++; + + const int n_batch = llama_n_batch(ctx); + llama_batch batch = llama_batch_init(std::min((int)tokens.size(), n_batch), 0, 1); + + for (size_t j = 0; j < std::min(tokens.size(), (size_t)n_batch); j++) { + common_batch_add(batch, tokens[j], j, {0}, false); + } + + if (llama_decode(ctx, batch) == 0) { + result.prepare_success++; + result.update_success++; + result.seq_ops_success++; + } else { + result.errors++; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + llama_batch_free(batch); + llama_free(ctx); + result.contexts_destroyed++; + } +} + +int main(int argc, char ** argv) { + common_params params; + + if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_COMMON)) { + return 1; + } + + common_init(); + + llama_backend_init(); + llama_numa_init(params.numa); + + auto mparams = common_model_params_to_llama(params); + llama_model * model = llama_model_load_from_file(params.model.path.c_str(), mparams); + if (!model) { + LOG_ERR("failed to load model\n"); + return 1; + } + + auto cparams = common_context_params_to_llama(params); + llama_context * ctx = llama_init_from_model(model, cparams); + if (!ctx) { + LOG_ERR("failed to create context\n"); + llama_model_free(model); + return 1; + } + + std::vector tokens; + const char * test_prompt = "Once upon a time in a distant galaxy, there was a brave explorer"; + tokens = common_tokenize(ctx, test_prompt, true, true); + + if (tokens.empty()) { + LOG_ERR("failed to tokenize test prompt\n"); + llama_free(ctx); + llama_model_free(model); + return 1; + } + + LOG_INF("Test prompt tokenized to %zu tokens\n", tokens.size()); + + llama_free(ctx); + + const int n_threads = std::min(params.cpuparams.n_threads, 8); + const int iterations_per_thread = 15; + + LOG_INF("Starting KV cache concurrent tests with %d threads, %d iterations per thread\n", + n_threads, iterations_per_thread); + + LOG_INF("\n=== Test 1: Concurrent KV Cache Prepare Operations ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_concurrent_kv_prepare, model, cparams, std::cref(tokens), + std::ref(result), i, iterations_per_thread); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 1 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Successful prepare operations: %d\n", result.prepare_success.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during concurrent prepare\n", result.errors.load()); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: All concurrent prepare operations successful\n"); + } + + LOG_INF("\n=== Test 2: Concurrent KV Cache Update Operations ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_concurrent_kv_update, model, cparams, std::cref(tokens), + std::ref(result), i, iterations_per_thread); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 2 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Successful update operations: %d\n", result.update_success.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during concurrent update\n", result.errors.load()); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: All concurrent update operations successful\n"); + } + + LOG_INF("\n=== Test 3: Concurrent Sequence Operations ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_concurrent_seq_operations, model, cparams, std::cref(tokens), + std::ref(result), i, iterations_per_thread); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 3 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Successful sequence operations: %d\n", result.seq_ops_success.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during concurrent sequence ops\n", result.errors.load()); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: All concurrent sequence operations successful\n"); + } + + LOG_INF("\n=== Test 4: Mixed Concurrent Operations ===\n"); + { + test_result result; + std::vector threads; + + const int64_t t_start = ggml_time_us(); + + for (int i = 0; i < n_threads; i++) { + threads.emplace_back(test_concurrent_mixed_operations, model, cparams, std::cref(tokens), + std::ref(result), i, iterations_per_thread / 2); + } + + for (auto & t : threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Test 4 Results:\n"); + LOG_INF(" Contexts created: %d\n", result.contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", result.contexts_destroyed.load()); + LOG_INF(" Successful prepare operations: %d\n", result.prepare_success.load()); + LOG_INF(" Successful update operations: %d\n", result.update_success.load()); + LOG_INF(" Successful sequence operations: %d\n", result.seq_ops_success.load()); + LOG_INF(" Errors: %d\n", result.errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (result.contexts_created != result.contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + result.contexts_created.load(), result.contexts_destroyed.load()); + llama_model_free(model); + return 1; + } + + if (result.errors > 0) { + LOG_ERR("FAIL: %d errors occurred during mixed operations\n", result.errors.load()); + llama_model_free(model); + return 1; + } + + LOG_INF("PASS: All mixed concurrent operations successful\n"); + } + + llama_model_free(model); + + LOG_INF("\n=== All KV Cache Concurrent Tests PASSED ===\n"); + return 0; +} diff --git a/tests/test-thread-safety.cpp b/tests/test-thread-safety.cpp index 853495b00d9d2..ea273b9e5ac36 100644 --- a/tests/test-thread-safety.cpp +++ b/tests/test-thread-safety.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "llama.h" #include "arg.h" #include "common.h" @@ -151,5 +153,74 @@ int main(int argc, char ** argv) { } LOG_INF("All threads finished without errors.\n"); + + LOG_INF("\n=== Additional Stress Tests ===\n"); + + LOG_INF("\n=== Test 2: Rapid Context Recreation Stress Test ===\n"); + { + std::atomic contexts_created{0}; + std::atomic contexts_destroyed{0}; + std::atomic errors{0}; + + const int stress_iterations = 10; + auto * model_stress = models[0].get(); + + auto stress_test_func = [&](int thread_id) { + std::random_device rd; + std::mt19937 gen(rd() + thread_id); + std::uniform_int_distribution<> delay_dist(1, 5); + + for (int i = 0; i < stress_iterations; i++) { + llama_context_ptr stress_ctx { llama_init_from_model(model_stress, cparams) }; + + if (!stress_ctx) { + LOG_ERR("thread %d: failed to create context on iteration %d\n", thread_id, i); + errors++; + continue; + } + + contexts_created++; + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen))); + + contexts_destroyed++; + } + }; + + const int64_t t_start = ggml_time_us(); + + std::vector stress_threads; + const int n_stress_threads = std::min(4, num_contexts); + for (int i = 0; i < n_stress_threads; i++) { + stress_threads.emplace_back(stress_test_func, i); + } + + for (auto & t : stress_threads) { + t.join(); + } + + const int64_t t_end = ggml_time_us(); + + LOG_INF("Stress test results:\n"); + LOG_INF(" Contexts created: %d\n", contexts_created.load()); + LOG_INF(" Contexts destroyed: %d\n", contexts_destroyed.load()); + LOG_INF(" Errors: %d\n", errors.load()); + LOG_INF(" Total time: %.2f ms\n", (t_end - t_start) / 1000.0); + + if (contexts_created != contexts_destroyed) { + LOG_ERR("FAIL: Context leak detected! Created: %d, Destroyed: %d\n", + contexts_created.load(), contexts_destroyed.load()); + return 1; + } + + if (errors > 0) { + LOG_ERR("FAIL: %d errors occurred during stress test\n", errors.load()); + return 1; + } + + LOG_INF("PASS: Stress test completed without leaks or errors\n"); + } + + LOG_INF("\n=== All Thread Safety Tests PASSED ===\n"); return 0; } diff --git a/tools/server/tests/unit/test_completion.py b/tools/server/tests/unit/test_completion.py index 11483e679a505..d69fb0247ee1b 100644 --- a/tools/server/tests/unit/test_completion.py +++ b/tools/server/tests/unit/test_completion.py @@ -533,3 +533,124 @@ def test_cancel_request(): time.sleep(1) # wait for HTTP_POLLING_SECONDS res = server.make_request("GET", "/slots") assert res.body[0]["is_processing"] == False + + +@pytest.mark.parametrize("n_slots,n_requests", [ + (8, 20), + (4, 50), +]) +def test_completion_high_volume_concurrent(n_slots: int, n_requests: int): + global server + server.n_slots = n_slots + server.temperature = 0.0 + server.start() + + PROMPTS = [ + "Write a very long book.", + "Write another a poem.", + "What is LLM?", + "The sky is blue and I love it.", + "Write another very long music lyrics.", + "Write a very long joke.", + "Tell me a story about a dragon.", + "Explain quantum computing.", + "What is the meaning of life?", + "Describe a beautiful sunset.", + ] + + def check_slots_status(): + should_all_slots_busy = n_requests >= n_slots + time.sleep(0.1) + res = server.make_request("GET", "/slots") + n_busy = sum([1 for slot in res.body if slot["is_processing"]]) + if should_all_slots_busy: + assert n_busy >= n_slots // 2 + else: + assert n_busy <= n_slots + + tasks = [] + for i in range(n_requests): + prompt = PROMPTS[i % len(PROMPTS)] + tasks.append((server.make_request, ("POST", "/completion", { + "prompt": prompt, + "seed": 42 + i, + "temperature": 0.8, + "n_predict": 32, + }))) + + tasks.append((check_slots_status, ())) + results = parallel_function_calls(tasks) + + success_count = 0 + for i in range(n_requests): + res = results[i] + if res.status_code == 200 and "content" in res.body: + success_count += 1 + + assert success_count >= n_requests * 0.9 + + +def test_completion_parallel_decoding(): + global server + server.n_slots = 4 + server.temperature = 0.0 + server.start() + + prompts = [ + "Once upon a time", + "In a galaxy far away", + "The quick brown fox", + "Deep in the forest", + ] + + tasks = [] + for i, prompt in enumerate(prompts): + tasks.append((server.make_request, ("POST", "/completion", { + "prompt": prompt, + "seed": 42 + i, + "n_predict": 64, + "temperature": 0.5, + }))) + + results = parallel_function_calls(tasks) + + for i, res in enumerate(results): + assert res.status_code == 200 + assert "content" in res.body + assert len(res.body["content"]) > 10 + + +def test_completion_cache_consistency_concurrent(): + global server + server.n_slots = 4 + server.temperature = 0.0 + server.start() + + common_prefix = "The meaning of life is" + prompts = [ + common_prefix + " to be happy", + common_prefix + " to find purpose", + common_prefix + " different for everyone", + common_prefix + " a mystery", + ] + + tasks = [] + for i, prompt in enumerate(prompts): + tasks.append((server.make_request, ("POST", "/completion", { + "prompt": prompt, + "seed": 42, + "n_predict": 32, + "temperature": 0.0, + "cache_prompt": True, + }))) + + results = parallel_function_calls(tasks) + + cache_hit_count = 0 + for res in results: + assert res.status_code == 200 + assert "content" in res.body + if "tokens_cached" in res.body and res.body["tokens_cached"] > 0: + cache_hit_count += 1 + + assert cache_hit_count >= 0