diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 519cb1fc1f..dcd7a323f1 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -49,6 +49,23 @@ cc_test( ], ) +cc_test( + name = "stress_tests", + timeout = "long", + srcs = glob(["*_test_stress.cc"]), + copts = [ + "-DUNIT_TESTING", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "metrics_common_test_utils", + "@com_google_googletest//:gtest_main", + ], +) + otel_cc_benchmark( name = "attributes_processor_benchmark", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 79b8c28a2f..7269330708 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -33,7 +33,8 @@ foreach( metric_reader_test observable_registry_test periodic_exporting_metric_reader_test - instrument_metadata_validator_test) + instrument_metadata_validator_test + metric_test_stress) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/metric_test_stress.cc b/sdk/test/metrics/metric_test_stress.cc new file mode 100644 index 0000000000..8337852649 --- /dev/null +++ b/sdk/test/metrics/metric_test_stress.cc @@ -0,0 +1,172 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "opentelemetry/context/context.h" +#include "opentelemetry/metrics/meter.h" +#include "opentelemetry/metrics/sync_instruments.h" +#include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/nostd/unique_ptr.h" +#include "opentelemetry/nostd/variant.h" +#include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" +#include "opentelemetry/sdk/metrics/data/metric_data.h" +#include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/export/metric_producer.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" +#include "opentelemetry/sdk/metrics/metric_reader.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationscope; +using namespace opentelemetry::sdk::metrics; + +class MockMetricExporterForStress : public opentelemetry::sdk::metrics::PushMetricExporter +{ +public: + MockMetricExporterForStress() = default; + + opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality( + opentelemetry::sdk::metrics::InstrumentType) const noexcept override + { + return AggregationTemporality::kDelta; + } + + opentelemetry::sdk::common::ExportResult Export( + const opentelemetry::sdk::metrics::ResourceMetrics &) noexcept override + { + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds) noexcept override { return true; } + + bool Shutdown(std::chrono::microseconds) noexcept override { return true; } +}; + +TEST(HistogramStress, UnsignedInt64) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporterForStress()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + auto h = m->CreateUInt64Histogram("histogram1", "histogram1_description", "histogram1_unit"); + + // + // Start a dedicated thread to collect the metrics + // + std::vector actuals; + auto stop_collecting = std::make_shared>(false); + auto collect_thread = std::thread([&reader, &actuals, stop_collecting]() { + while (!*stop_collecting) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + } + }); + + // + // Start logging threads + // + int record_thread_count = std::thread::hardware_concurrency() - 1; + if (record_thread_count <= 0) + { + record_thread_count = 1; + } + + std::vector threads(record_thread_count); + constexpr int iterations_per_thread = 2000000; + auto expected_sum = std::make_shared>(0); + + for (int i = 0; i < record_thread_count; ++i) + { + threads[i] = std::thread([&] { + std::random_device rd; + std::mt19937 random_engine(rd()); + std::uniform_int_distribution<> gen_random(1, 20000); + + for (int j = 0; j < iterations_per_thread; ++j) + { + int64_t val = gen_random(random_engine); + expected_sum->fetch_add(val, std::memory_order_relaxed); + h->Record(val, {}); + } + }); + } + + for (int i = 0; i < record_thread_count; ++i) + { + threads[i].join(); + } + + // + // Stop the dedicated collection thread + // + *stop_collecting = true; + collect_thread.join(); + + // + // run the the final collection + // + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + // + // Aggregate the results + // + int64_t expected_count = record_thread_count * iterations_per_thread; + int64_t collected_count = 0; + int64_t collected_sum = 0; + for (const auto &actual : actuals) + { + int64_t collected_bucket_sum = 0; + for (const auto &count : actual.counts_) + { + collected_bucket_sum += count; + } + ASSERT_EQ(collected_bucket_sum, actual.count_); + + collected_sum += opentelemetry::nostd::get(actual.sum_); + collected_count += actual.count_; + } + + ASSERT_EQ(expected_count, collected_count); + ASSERT_EQ(*expected_sum, collected_sum); +}