| 
 | 1 | +// Copyright The OpenTelemetry Authors  | 
 | 2 | +// SPDX-License-Identifier: Apache-2.0  | 
 | 3 | + | 
 | 4 | +#include <gtest/gtest.h>  | 
 | 5 | + | 
 | 6 | +#include <stdint.h>  | 
 | 7 | +#include <atomic>  | 
 | 8 | +#include <chrono>  | 
 | 9 | +#include <random>  | 
 | 10 | +#include <thread>  | 
 | 11 | +#include <utility>  | 
 | 12 | +#include <vector>  | 
 | 13 | + | 
 | 14 | +#include "common.h"  | 
 | 15 | +#include "opentelemetry/context/context.h"  | 
 | 16 | +#include "opentelemetry/metrics/meter.h"  | 
 | 17 | +#include "opentelemetry/metrics/sync_instruments.h"  | 
 | 18 | +#include "opentelemetry/nostd/function_ref.h"  | 
 | 19 | +#include "opentelemetry/nostd/shared_ptr.h"  | 
 | 20 | +#include "opentelemetry/nostd/unique_ptr.h"  | 
 | 21 | +#include "opentelemetry/nostd/variant.h"  | 
 | 22 | +#include "opentelemetry/sdk/common/exporter_utils.h"  | 
 | 23 | +#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"  | 
 | 24 | +#include "opentelemetry/sdk/metrics/data/metric_data.h"  | 
 | 25 | +#include "opentelemetry/sdk/metrics/data/point_data.h"  | 
 | 26 | +#include "opentelemetry/sdk/metrics/export/metric_producer.h"  | 
 | 27 | +#include "opentelemetry/sdk/metrics/instruments.h"  | 
 | 28 | +#include "opentelemetry/sdk/metrics/meter_provider.h"  | 
 | 29 | +#include "opentelemetry/sdk/metrics/metric_reader.h"  | 
 | 30 | +#include "opentelemetry/sdk/metrics/push_metric_exporter.h"  | 
 | 31 | + | 
 | 32 | +using namespace opentelemetry;  | 
 | 33 | +using namespace opentelemetry::sdk::instrumentationscope;  | 
 | 34 | +using namespace opentelemetry::sdk::metrics;  | 
 | 35 | + | 
 | 36 | +class MockMetricExporterForStress : public opentelemetry::sdk::metrics::PushMetricExporter  | 
 | 37 | +{  | 
 | 38 | +public:  | 
 | 39 | +  MockMetricExporterForStress() = default;  | 
 | 40 | + | 
 | 41 | +  opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(  | 
 | 42 | +      opentelemetry::sdk::metrics::InstrumentType) const noexcept override  | 
 | 43 | +  {  | 
 | 44 | +    return AggregationTemporality::kDelta;  | 
 | 45 | +  }  | 
 | 46 | + | 
 | 47 | +  opentelemetry::sdk::common::ExportResult Export(  | 
 | 48 | +      const opentelemetry::sdk::metrics::ResourceMetrics &) noexcept override  | 
 | 49 | +  {  | 
 | 50 | +    return opentelemetry::sdk::common::ExportResult::kSuccess;  | 
 | 51 | +  }  | 
 | 52 | + | 
 | 53 | +  bool ForceFlush(std::chrono::microseconds) noexcept override { return true; }  | 
 | 54 | + | 
 | 55 | +  bool Shutdown(std::chrono::microseconds) noexcept override { return true; }  | 
 | 56 | +};  | 
 | 57 | + | 
 | 58 | +TEST(HistogramStress, UnsignedInt64)  | 
 | 59 | +{  | 
 | 60 | +  MeterProvider mp;  | 
 | 61 | +  auto m = mp.GetMeter("meter1", "version1", "schema1");  | 
 | 62 | + | 
 | 63 | +  std::unique_ptr<MockMetricExporterForStress> exporter(new MockMetricExporterForStress());  | 
 | 64 | +  std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};  | 
 | 65 | +  mp.AddMetricReader(reader);  | 
 | 66 | + | 
 | 67 | +  auto h = m->CreateUInt64Histogram("histogram1", "histogram1_description", "histogram1_unit");  | 
 | 68 | + | 
 | 69 | +  //  | 
 | 70 | +  // Start a dedicated thread to collect the metrics  | 
 | 71 | +  //  | 
 | 72 | +  std::vector<HistogramPointData> actuals;  | 
 | 73 | +  auto stop_collecting = std::make_shared<std::atomic<bool>>(false);  | 
 | 74 | +  auto collect_thread  = std::thread([&reader, &actuals, stop_collecting]() {  | 
 | 75 | +    while (!*stop_collecting)  | 
 | 76 | +    {  | 
 | 77 | +      std::this_thread::sleep_for(std::chrono::milliseconds(1000));  | 
 | 78 | +      reader->Collect([&](ResourceMetrics &rm) {  | 
 | 79 | +        for (const ScopeMetrics &smd : rm.scope_metric_data_)  | 
 | 80 | +        {  | 
 | 81 | +          for (const MetricData &md : smd.metric_data_)  | 
 | 82 | +          {  | 
 | 83 | +            for (const PointDataAttributes &dp : md.point_data_attr_)  | 
 | 84 | +            {  | 
 | 85 | +              actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));  | 
 | 86 | +            }  | 
 | 87 | +          }  | 
 | 88 | +        }  | 
 | 89 | +        return true;  | 
 | 90 | +      });  | 
 | 91 | +    }  | 
 | 92 | +  });  | 
 | 93 | + | 
 | 94 | +  //  | 
 | 95 | +  // Start logging threads  | 
 | 96 | +  //  | 
 | 97 | +  int record_thread_count = std::thread::hardware_concurrency() - 1;  | 
 | 98 | +  if (record_thread_count <= 0)  | 
 | 99 | +  {  | 
 | 100 | +    record_thread_count = 1;  | 
 | 101 | +  }  | 
 | 102 | + | 
 | 103 | +  std::vector<std::thread> threads(record_thread_count);  | 
 | 104 | +  constexpr int iterations_per_thread = 2000000;  | 
 | 105 | +  auto expected_sum                   = std::make_shared<std::atomic<uint64_t>>(0);  | 
 | 106 | + | 
 | 107 | +  for (int i = 0; i < record_thread_count; ++i)  | 
 | 108 | +  {  | 
 | 109 | +    threads[i] = std::thread([&] {  | 
 | 110 | +      std::random_device rd;  | 
 | 111 | +      std::mt19937 random_engine(rd());  | 
 | 112 | +      std::uniform_int_distribution<> gen_random(1, 20000);  | 
 | 113 | + | 
 | 114 | +      for (int j = 0; j < iterations_per_thread; ++j)  | 
 | 115 | +      {  | 
 | 116 | +        int64_t val = gen_random(random_engine);  | 
 | 117 | +        expected_sum->fetch_add(val, std::memory_order_relaxed);  | 
 | 118 | +        h->Record(val, {});  | 
 | 119 | +      }  | 
 | 120 | +    });  | 
 | 121 | +  }  | 
 | 122 | + | 
 | 123 | +  for (int i = 0; i < record_thread_count; ++i)  | 
 | 124 | +  {  | 
 | 125 | +    threads[i].join();  | 
 | 126 | +  }  | 
 | 127 | + | 
 | 128 | +  //  | 
 | 129 | +  // Stop the dedicated collection thread  | 
 | 130 | +  //  | 
 | 131 | +  *stop_collecting = true;  | 
 | 132 | +  collect_thread.join();  | 
 | 133 | + | 
 | 134 | +  //  | 
 | 135 | +  // run the the final collection  | 
 | 136 | +  //  | 
 | 137 | +  reader->Collect([&](ResourceMetrics &rm) {  | 
 | 138 | +    for (const ScopeMetrics &smd : rm.scope_metric_data_)  | 
 | 139 | +    {  | 
 | 140 | +      for (const MetricData &md : smd.metric_data_)  | 
 | 141 | +      {  | 
 | 142 | +        for (const PointDataAttributes &dp : md.point_data_attr_)  | 
 | 143 | +        {  | 
 | 144 | +          actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));  | 
 | 145 | +        }  | 
 | 146 | +      }  | 
 | 147 | +    }  | 
 | 148 | +    return true;  | 
 | 149 | +  });  | 
 | 150 | + | 
 | 151 | +  //  | 
 | 152 | +  // Aggregate the results  | 
 | 153 | +  //  | 
 | 154 | +  int64_t expected_count  = record_thread_count * iterations_per_thread;  | 
 | 155 | +  int64_t collected_count = 0;  | 
 | 156 | +  int64_t collected_sum   = 0;  | 
 | 157 | +  for (const auto &actual : actuals)  | 
 | 158 | +  {  | 
 | 159 | +    int64_t collected_bucket_sum = 0;  | 
 | 160 | +    for (const auto &count : actual.counts_)  | 
 | 161 | +    {  | 
 | 162 | +      collected_bucket_sum += count;  | 
 | 163 | +    }  | 
 | 164 | +    ASSERT_EQ(collected_bucket_sum, actual.count_);  | 
 | 165 | + | 
 | 166 | +    collected_sum += opentelemetry::nostd::get<int64_t>(actual.sum_);  | 
 | 167 | +    collected_count += actual.count_;  | 
 | 168 | +  }  | 
 | 169 | + | 
 | 170 | +  ASSERT_EQ(expected_count, collected_count);  | 
 | 171 | +  ASSERT_EQ(*expected_sum, collected_sum);  | 
 | 172 | +}  | 
0 commit comments