Skip to content

Commit 00a4b13

Browse files
authored
Merge branch 'main' into fix/doc_comment_api
2 parents 0c0f2f7 + 9451f0e commit 00a4b13

File tree

7 files changed

+221
-67
lines changed

7 files changed

+221
-67
lines changed

.github/workflows/codeql-analysis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ jobs:
3939
sudo -E ./ci/setup_googletest.sh
4040
sudo -E ./ci/setup_ci_environment.sh
4141
- name: Initialize CodeQL
42-
uses: github/codeql-action/init@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16
42+
uses: github/codeql-action/init@60168efe1c415ce0f5521ea06d5c2062adbeed1b # v3.28.17
4343
with:
4444
languages: cpp
4545
- name: Autobuild
46-
uses: github/codeql-action/autobuild@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16
46+
uses: github/codeql-action/autobuild@60168efe1c415ce0f5521ea06d5c2062adbeed1b # v3.28.17
4747
- name: Perform CodeQL Analysis
48-
uses: github/codeql-action/analyze@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16
48+
uses: github/codeql-action/analyze@60168efe1c415ce0f5521ea06d5c2062adbeed1b # v3.28.17

.github/workflows/ossf-scorecard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ jobs:
4747
# Upload the results to GitHub's code scanning dashboard (optional).
4848
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
4949
- name: "Upload to code-scanning"
50-
uses: github/codeql-action/upload-sarif@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16
50+
uses: github/codeql-action/upload-sarif@60168efe1c415ce0f5521ea06d5c2062adbeed1b # v3.28.17
5151
with:
5252
sarif_file: results.sarif

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Increment the:
3333
* [API] Add Enabled method to Tracer
3434
[#3357](https://github.com/open-telemetry/opentelemetry-cpp/pull/3357)
3535

36+
* [SDK] Optimize PeriodicExportingMetricReader thread usage
37+
[#3383](https://github.com/open-telemetry/opentelemetry-cpp/pull/3383)
38+
3639
## [1.20 2025-04-01]
3740

3841
* [BUILD] Update opentelemetry-proto version

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

Lines changed: 23 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <ostream>
1111
#include <ratio>
1212
#include <thread>
13-
#include <type_traits>
1413
#include <utility>
1514

1615
#include "opentelemetry/common/timestamp.h"
@@ -24,13 +23,6 @@
2423
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
2524
#include "opentelemetry/version.h"
2625

27-
#if defined(_MSC_VER)
28-
# pragma warning(suppress : 5204)
29-
# include <future>
30-
#else
31-
# include <future>
32-
#endif
33-
3426
#if OPENTELEMETRY_HAVE_EXCEPTIONS
3527
# include <exception>
3628
#endif
@@ -98,11 +90,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
9890
worker_thread_instrumentation_->OnStart();
9991
}
10092
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
101-
10293
do
10394
{
10495
auto start = std::chrono::steady_clock::now();
105-
10696
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
10797
if (worker_thread_instrumentation_ != nullptr)
10898
{
@@ -134,7 +124,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
134124
worker_thread_instrumentation_->BeforeWait();
135125
}
136126
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
137-
138127
std::unique_lock<std::mutex> lk(cv_m_);
139128
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
140129
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
@@ -151,7 +140,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
151140
worker_thread_instrumentation_->AfterWait();
152141
}
153142
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
154-
155143
} while (IsShutdown() != true);
156144

157145
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
@@ -164,61 +152,39 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
164152

165153
bool PeriodicExportingMetricReader::CollectAndExportOnce()
166154
{
167-
std::atomic<bool> cancel_export_for_timeout{false};
168-
169155
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
170-
std::unique_ptr<std::thread> task_thread;
171-
172156
#if OPENTELEMETRY_HAVE_EXCEPTIONS
173157
try
174158
{
175159
#endif
176-
std::promise<void> sender;
177-
auto receiver = sender.get_future();
178-
179-
task_thread.reset(
180-
new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] {
181160
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
182-
if (collect_thread_instrumentation_ != nullptr)
183-
{
184-
collect_thread_instrumentation_->OnStart();
185-
collect_thread_instrumentation_->BeforeLoad();
186-
}
161+
if (collect_thread_instrumentation_ != nullptr)
162+
{
163+
collect_thread_instrumentation_->OnStart();
164+
collect_thread_instrumentation_->BeforeLoad();
165+
}
187166
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
188-
189-
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
190-
if (cancel_export_for_timeout.load(std::memory_order_acquire))
191-
{
192-
OTEL_INTERNAL_LOG_ERROR(
193-
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
194-
<< this->export_timeout_millis_.count() << " ms, and timed out");
195-
return false;
196-
}
197-
this->exporter_->Export(metric_data);
198-
return true;
199-
});
200-
201-
const_cast<std::promise<void> &>(sender).set_value();
167+
auto start = std::chrono::steady_clock::now();
168+
this->Collect([this, &start](ResourceMetrics &metric_data) {
169+
auto end = std::chrono::steady_clock::now();
170+
if ((end - start) > this->export_timeout_millis_)
171+
{
172+
OTEL_INTERNAL_LOG_ERROR(
173+
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
174+
<< this->export_timeout_millis_.count() << " ms, and timed out");
175+
return false;
176+
}
177+
this->exporter_->Export(metric_data);
178+
return true;
179+
});
202180

203181
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
204-
if (collect_thread_instrumentation_ != nullptr)
205-
{
206-
collect_thread_instrumentation_->AfterLoad();
207-
collect_thread_instrumentation_->OnEnd();
208-
}
209-
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
210-
}));
211-
212-
std::future_status status;
213-
do
182+
if (collect_thread_instrumentation_ != nullptr)
214183
{
215-
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
216-
if (status == std::future_status::timeout)
217-
{
218-
cancel_export_for_timeout.store(true, std::memory_order_release);
219-
break;
220-
}
221-
} while (status != std::future_status::ready);
184+
collect_thread_instrumentation_->AfterLoad();
185+
collect_thread_instrumentation_->OnEnd();
186+
}
187+
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
222188
#if OPENTELEMETRY_HAVE_EXCEPTIONS
223189
}
224190
catch (std::exception &e)
@@ -235,11 +201,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
235201
}
236202
#endif
237203

238-
if (task_thread && task_thread->joinable())
239-
{
240-
task_thread->join();
241-
}
242-
243204
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
244205
while (notify_force_flush > notified_sequence)
245206
{

sdk/test/metrics/BUILD

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,23 @@ cc_test(
4949
],
5050
)
5151

52+
cc_test(
53+
name = "stress_tests",
54+
timeout = "long",
55+
srcs = glob(["*_test_stress.cc"]),
56+
copts = [
57+
"-DUNIT_TESTING",
58+
],
59+
tags = [
60+
"metrics",
61+
"test",
62+
],
63+
deps = [
64+
"metrics_common_test_utils",
65+
"@com_google_googletest//:gtest_main",
66+
],
67+
)
68+
5269
otel_cc_benchmark(
5370
name = "attributes_processor_benchmark",
5471
srcs = [

sdk/test/metrics/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ foreach(
3333
metric_reader_test
3434
observable_registry_test
3535
periodic_exporting_metric_reader_test
36-
instrument_metadata_validator_test)
36+
instrument_metadata_validator_test
37+
metric_test_stress)
3738
add_executable(${testname} "${testname}.cc")
3839
target_link_libraries(
3940
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#include <gtest/gtest.h>
5+
6+
#include <stdint.h>
7+
#include <atomic>
8+
#include <chrono>
9+
#include <random>
10+
#include <thread>
11+
#include <utility>
12+
#include <vector>
13+
14+
#include "common.h"
15+
#include "opentelemetry/context/context.h"
16+
#include "opentelemetry/metrics/meter.h"
17+
#include "opentelemetry/metrics/sync_instruments.h"
18+
#include "opentelemetry/nostd/function_ref.h"
19+
#include "opentelemetry/nostd/shared_ptr.h"
20+
#include "opentelemetry/nostd/unique_ptr.h"
21+
#include "opentelemetry/nostd/variant.h"
22+
#include "opentelemetry/sdk/common/exporter_utils.h"
23+
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
24+
#include "opentelemetry/sdk/metrics/data/metric_data.h"
25+
#include "opentelemetry/sdk/metrics/data/point_data.h"
26+
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
27+
#include "opentelemetry/sdk/metrics/instruments.h"
28+
#include "opentelemetry/sdk/metrics/meter_provider.h"
29+
#include "opentelemetry/sdk/metrics/metric_reader.h"
30+
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
31+
32+
using namespace opentelemetry;
33+
using namespace opentelemetry::sdk::instrumentationscope;
34+
using namespace opentelemetry::sdk::metrics;
35+
36+
class MockMetricExporterForStress : public opentelemetry::sdk::metrics::PushMetricExporter
37+
{
38+
public:
39+
MockMetricExporterForStress() = default;
40+
41+
opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
42+
opentelemetry::sdk::metrics::InstrumentType) const noexcept override
43+
{
44+
return AggregationTemporality::kDelta;
45+
}
46+
47+
opentelemetry::sdk::common::ExportResult Export(
48+
const opentelemetry::sdk::metrics::ResourceMetrics &) noexcept override
49+
{
50+
return opentelemetry::sdk::common::ExportResult::kSuccess;
51+
}
52+
53+
bool ForceFlush(std::chrono::microseconds) noexcept override { return true; }
54+
55+
bool Shutdown(std::chrono::microseconds) noexcept override { return true; }
56+
};
57+
58+
TEST(HistogramStress, UnsignedInt64)
59+
{
60+
MeterProvider mp;
61+
auto m = mp.GetMeter("meter1", "version1", "schema1");
62+
63+
std::unique_ptr<MockMetricExporterForStress> exporter(new MockMetricExporterForStress());
64+
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
65+
mp.AddMetricReader(reader);
66+
67+
auto h = m->CreateUInt64Histogram("histogram1", "histogram1_description", "histogram1_unit");
68+
69+
//
70+
// Start a dedicated thread to collect the metrics
71+
//
72+
std::vector<HistogramPointData> actuals;
73+
auto stop_collecting = std::make_shared<std::atomic<bool>>(false);
74+
auto collect_thread = std::thread([&reader, &actuals, stop_collecting]() {
75+
while (!*stop_collecting)
76+
{
77+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
78+
reader->Collect([&](ResourceMetrics &rm) {
79+
for (const ScopeMetrics &smd : rm.scope_metric_data_)
80+
{
81+
for (const MetricData &md : smd.metric_data_)
82+
{
83+
for (const PointDataAttributes &dp : md.point_data_attr_)
84+
{
85+
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
86+
}
87+
}
88+
}
89+
return true;
90+
});
91+
}
92+
});
93+
94+
//
95+
// Start logging threads
96+
//
97+
int record_thread_count = std::thread::hardware_concurrency() - 1;
98+
if (record_thread_count <= 0)
99+
{
100+
record_thread_count = 1;
101+
}
102+
103+
std::vector<std::thread> threads(record_thread_count);
104+
constexpr int iterations_per_thread = 2000000;
105+
auto expected_sum = std::make_shared<std::atomic<uint64_t>>(0);
106+
107+
for (int i = 0; i < record_thread_count; ++i)
108+
{
109+
threads[i] = std::thread([&] {
110+
std::random_device rd;
111+
std::mt19937 random_engine(rd());
112+
std::uniform_int_distribution<> gen_random(1, 20000);
113+
114+
for (int j = 0; j < iterations_per_thread; ++j)
115+
{
116+
int64_t val = gen_random(random_engine);
117+
expected_sum->fetch_add(val, std::memory_order_relaxed);
118+
h->Record(val, {});
119+
}
120+
});
121+
}
122+
123+
for (int i = 0; i < record_thread_count; ++i)
124+
{
125+
threads[i].join();
126+
}
127+
128+
//
129+
// Stop the dedicated collection thread
130+
//
131+
*stop_collecting = true;
132+
collect_thread.join();
133+
134+
//
135+
// run the the final collection
136+
//
137+
reader->Collect([&](ResourceMetrics &rm) {
138+
for (const ScopeMetrics &smd : rm.scope_metric_data_)
139+
{
140+
for (const MetricData &md : smd.metric_data_)
141+
{
142+
for (const PointDataAttributes &dp : md.point_data_attr_)
143+
{
144+
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
145+
}
146+
}
147+
}
148+
return true;
149+
});
150+
151+
//
152+
// Aggregate the results
153+
//
154+
int64_t expected_count = record_thread_count * iterations_per_thread;
155+
int64_t collected_count = 0;
156+
int64_t collected_sum = 0;
157+
for (const auto &actual : actuals)
158+
{
159+
int64_t collected_bucket_sum = 0;
160+
for (const auto &count : actual.counts_)
161+
{
162+
collected_bucket_sum += count;
163+
}
164+
ASSERT_EQ(collected_bucket_sum, actual.count_);
165+
166+
collected_sum += opentelemetry::nostd::get<int64_t>(actual.sum_);
167+
collected_count += actual.count_;
168+
}
169+
170+
ASSERT_EQ(expected_count, collected_count);
171+
ASSERT_EQ(*expected_sum, collected_sum);
172+
}

0 commit comments

Comments
 (0)