-
Notifications
You must be signed in to change notification settings - Fork 494
Add stress testing framework, with basic metrics example to demonstrate. #3241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2564cc6
9433197
11bd32c
22a178b
449f360
a385503
f9b0814
5f9d0da
32d06ff
57d99c3
5a222fd
03ffa54
ab07553
a2f17b1
b56f996
eead3a0
4bfadb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
# Add subdirectories for common and metrics components | ||
add_subdirectory(common) | ||
add_subdirectory(metrics) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
add_library(stress STATIC stress.cc) | ||
|
||
# Include directory for the throughput library | ||
target_include_directories(stress PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) | ||
|
||
# Set C++ standard | ||
set_target_properties( | ||
stress | ||
PROPERTIES CXX_STANDARD 17 | ||
CXX_STANDARD_REQUIRED YES | ||
CXX_EXTENSIONS NO) |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,142 @@ | ||||||||
// Copyright The OpenTelemetry Authors | ||||||||
// SPDX-License-Identifier: Apache-2.0 | ||||||||
|
||||||||
#include "stress.h" | ||||||||
|
||||||||
// Global flags | ||||||||
std::atomic<bool> STOP( | ||||||||
false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C) | ||||||||
std::atomic<bool> READY(false); // Global flag to synchronize thread start | ||||||||
|
||||||||
// StressTest constructor | ||||||||
Stress::Stress(std::function<void()> func, size_t numThreads) | ||||||||
: func_(std::move(func)), stats_(numThreads), numThreads_(numThreads) | ||||||||
{} | ||||||||
|
||||||||
// Main function to start the stress test | ||||||||
void Stress::run() | ||||||||
{ | ||||||||
std::cout << "Starting stress test with " << numThreads_ << " threads...\n"; | ||||||||
auto startTime = std::chrono::steady_clock::now(); | ||||||||
|
||||||||
READY.store(false, std::memory_order_release); | ||||||||
|
||||||||
std::thread controllerThread(&Stress::monitorThroughput, this); | ||||||||
|
||||||||
threads_.reserve(numThreads_); | ||||||||
for (size_t i = 0; i < numThreads_; ++i) | ||||||||
{ | ||||||||
threads_.emplace_back(&Stress::workerThread, this, i); | ||||||||
} | ||||||||
|
||||||||
READY.store(true, std::memory_order_release); | ||||||||
|
||||||||
for (auto &thread : threads_) | ||||||||
{ | ||||||||
if (thread.joinable()) | ||||||||
{ | ||||||||
thread.join(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
if (controllerThread.joinable()) | ||||||||
{ | ||||||||
controllerThread.join(); | ||||||||
} | ||||||||
|
||||||||
auto endTime = std::chrono::steady_clock::now(); | ||||||||
auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime); | ||||||||
|
||||||||
uint64_t totalCount = 0; | ||||||||
for (const auto &stat : stats_) | ||||||||
{ | ||||||||
totalCount += stat.count.load(std::memory_order_relaxed); | ||||||||
} | ||||||||
|
||||||||
std::cout << "\nTest completed:\n" | ||||||||
<< "Total iterations: " << formatNumber(totalCount) << "\n" | ||||||||
<< "Duration: " << duration.count() << " seconds\n" | ||||||||
<< "Average throughput: " << formatNumber(totalCount / duration.count()) | ||||||||
<< " iterations/sec\n"; | ||||||||
} | ||||||||
|
||||||||
// Worker thread function | ||||||||
void Stress::workerThread(size_t threadIndex) | ||||||||
{ | ||||||||
#ifdef __linux__ | ||||||||
cpu_set_t cpuset; | ||||||||
CPU_ZERO(&cpuset); | ||||||||
CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset); | ||||||||
pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); | ||||||||
#endif | ||||||||
|
||||||||
while (!STOP.load(std::memory_order_acquire)) | ||||||||
{ | ||||||||
func_(); | ||||||||
stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// Monitoring thread function | ||||||||
void Stress::monitorThroughput() | ||||||||
{ | ||||||||
uint64_t lastTotalCount = 0; | ||||||||
auto lastTime = std::chrono::steady_clock::now(); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. throughputHistory grows without bound in long-running tests, potentially causing high memory usage. Consider capping its size or computing rolling statistics without storing all entries.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||
std::vector<uint64_t> throughputHistory; | ||||||||
|
||||||||
while (!STOP.load(std::memory_order_acquire)) | ||||||||
{ | ||||||||
std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE)); | ||||||||
|
||||||||
auto currentTime = std::chrono::steady_clock::now(); | ||||||||
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(currentTime - lastTime).count(); | ||||||||
|
||||||||
uint64_t totalCount = 0; | ||||||||
for (const auto &stat : stats_) | ||||||||
{ | ||||||||
totalCount += stat.count.load(std::memory_order_relaxed); | ||||||||
} | ||||||||
|
||||||||
uint64_t currentCount = totalCount - lastTotalCount; | ||||||||
lastTotalCount = totalCount; | ||||||||
lastTime = currentTime; | ||||||||
|
||||||||
if (elapsed > 0) | ||||||||
{ | ||||||||
uint64_t throughput = currentCount / elapsed; | ||||||||
throughputHistory.push_back(throughput); | ||||||||
|
||||||||
double avg = 0; | ||||||||
uint64_t min = throughput; | ||||||||
uint64_t max = throughput; | ||||||||
|
||||||||
for (uint64_t t : throughputHistory) | ||||||||
{ | ||||||||
avg += t; | ||||||||
min = std::min(min, t); | ||||||||
max = std::max(max, t); | ||||||||
} | ||||||||
avg /= throughputHistory.size(); | ||||||||
|
||||||||
std::cout << "\rThroughput: " << formatNumber(throughput) | ||||||||
<< " it/s | Avg: " << formatNumber(static_cast<uint64_t>(avg)) | ||||||||
<< " | Min: " << formatNumber(min) << " | Max: " << formatNumber(max) << std::flush; | ||||||||
} | ||||||||
} | ||||||||
std::cout << std::endl; | ||||||||
} | ||||||||
|
||||||||
// Helper function to format numbers with commas | ||||||||
std::string Stress::formatNumber(uint64_t num) | ||||||||
{ | ||||||||
std::ostringstream oss; | ||||||||
oss.imbue(std::locale("")); | ||||||||
oss << std::fixed << num; | ||||||||
return oss.str(); | ||||||||
} | ||||||||
|
||||||||
// Signal handler to set the STOP flag when receiving a termination signal | ||||||||
void Stress::stop() | ||||||||
{ | ||||||||
STOP.store(true, std::memory_order_release); | ||||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,99 @@ | ||||||
// Copyright The OpenTelemetry Authors | ||||||
// SPDX-License-Identifier: Apache-2.0 | ||||||
|
||||||
/** | ||||||
* A multi-threaded stress test framework to measure throughput and performance of a given workload. | ||||||
* | ||||||
* ## Overview | ||||||
* Multi-threaded stress test framework designed to execute a specified function | ||||||
* in parallel across multiple threads and measure its throughput. The results are displayed | ||||||
* dynamically, including current throughput, average throughput, and minimum/maximum throughput | ||||||
* during the test. | ||||||
* | ||||||
* ## Key Features | ||||||
* - **Multi-threading**: Uses std::thread to execute the workload in parallel across a user-defined | ||||||
* number of threads. | ||||||
* - **Thread Safety**: Tracks iteration counts per thread using an aligned and padded structure | ||||||
* (WorkerStats) to avoid false sharing and ensure efficient thread-safe updates. | ||||||
* - **Dynamic Metrics**: Continuously calculates and displays throughput (iterations/sec) over | ||||||
* sliding time windows. | ||||||
* - **Graceful Termination**: Captures signals (e.g., Ctrl+C) to cleanly stop all threads and | ||||||
* summarize the results. | ||||||
* - **Thread Affinity (Linux-only)**: Optionally binds threads to specific CPU cores for consistent | ||||||
* performance. | ||||||
* | ||||||
* ## Implementation Details | ||||||
* - **Worker Threads**: | ||||||
* - Each worker thread executes the workload function (func) in a loop until a global STOP flag | ||||||
* is set. | ||||||
* - Each thread maintains its own iteration count to minimize contention. | ||||||
* | ||||||
* - **Throughput Monitoring**: | ||||||
* - A separate controller thread monitors throughput by periodically summing up iteration counts | ||||||
* across threads. | ||||||
* - Throughput is calculated over a sliding window (SLIDING_WINDOW_SIZE) and displayed | ||||||
* dynamically. | ||||||
* | ||||||
* - **Thread Synchronization**: | ||||||
* - The STOP flag, an std::atomic<bool>, ensures all threads stop gracefully when signaled. | ||||||
* - Memory ordering (e.g., std::memory_order_relaxed, std::memory_order_acquire/release) is used | ||||||
* to optimize performance while maintaining correctness. | ||||||
* | ||||||
* - **Final Summary**: | ||||||
* - At the end of the test, the program calculates and prints the total iterations, duration, and | ||||||
* average throughput. | ||||||
*/ | ||||||
|
||||||
#pragma once | ||||||
|
||||||
#include <atomic> | ||||||
#include <chrono> | ||||||
#include <csignal> | ||||||
#include <cstdint> | ||||||
#include <functional> | ||||||
#include <iostream> | ||||||
#include <sstream> | ||||||
#include <thread> | ||||||
#include <vector> | ||||||
|
||||||
// Configuration constants | ||||||
constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds) | ||||||
constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment | ||||||
|
||||||
// WorkerStats structure for tracking iteration counts per thread | ||||||
struct alignas(CACHE_LINE_SIZE) WorkerStats | ||||||
{ | ||||||
std::atomic<uint64_t> count{0}; // Count of iterations for a specific thread | ||||||
char padding[CACHE_LINE_SIZE - | ||||||
sizeof(std::atomic<uint64_t>)]; // Padding to ensure proper alignment | ||||||
}; | ||||||
|
||||||
// StressTest class | ||||||
class Stress | ||||||
{ | ||||||
public: | ||||||
// Constructor | ||||||
Stress(std::function<void()> func, size_t numThreads = std::thread::hardware_concurrency()); | ||||||
|
||||||
// Main function to start the stress test | ||||||
void run(); | ||||||
|
||||||
// function to stop the test | ||||||
void stop(); | ||||||
|
||||||
private: | ||||||
std::function<void()> func_; // Function to be executed by each thread | ||||||
std::vector<std::thread> threads_; // Vector to hold worker threads | ||||||
std::vector<WorkerStats> stats_; // Vector to hold statistics for each thread | ||||||
const size_t numThreads_; // Number of threads to run | ||||||
std::atomic<bool> stopFlag_{false}; // signal to stop the test | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The member variable stopFlag_ is never used, as the global STOP flag is used instead. Remove stopFlag_ or integrate it into the control flow.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
|
||||||
// Worker thread function | ||||||
void workerThread(size_t threadIndex); | ||||||
|
||||||
// Monitoring thread function to calculate and display throughput | ||||||
void monitorThroughput(); | ||||||
|
||||||
// Helper function to format numbers with commas for readability | ||||||
static std::string formatNumber(uint64_t num); | ||||||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
# Define the metrics executable | ||
add_executable(stress_metrics metrics.cc) | ||
|
||
# Link throughput library and OpenTelemetry Metrics API | ||
target_link_libraries( | ||
stress_metrics PRIVATE stress opentelemetry_metrics # OpenTelemetry Metrics | ||
# SDK | ||
) | ||
|
||
# Include directories for throughput | ||
target_include_directories(stress_metrics | ||
PRIVATE ${CMAKE_SOURCE_DIR}/stress/common) | ||
|
||
# Set properties | ||
set_target_properties( | ||
stress_metrics | ||
PROPERTIES CXX_STANDARD 17 | ||
CXX_STANDARD_REQUIRED YES | ||
CXX_EXTENSIONS NO) | ||
|
||
# Optional: Installation | ||
if(OPENTELEMETRY_INSTALL) | ||
install( | ||
TARGETS stress_metrics | ||
EXPORT "${PROJECT_NAME}-target" | ||
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) | ||
endif() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The READY flag is declared but never used. Either remove it or use it to synchronize thread start.
Copilot uses AI. Check for mistakes.