1010#include < ostream>
1111#include < ratio>
1212#include < thread>
13- #include < type_traits>
1413#include < utility>
1514
1615#include " opentelemetry/common/timestamp.h"
2423#include " opentelemetry/sdk/metrics/push_metric_exporter.h"
2524#include " opentelemetry/version.h"
2625
27- #if defined(_MSC_VER)
28- # pragma warning(suppress : 5204)
29- # include < future>
30- #else
31- # include < future>
32- #endif
33-
3426#if OPENTELEMETRY_HAVE_EXCEPTIONS
3527# include < exception>
3628#endif
@@ -98,11 +90,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
9890 worker_thread_instrumentation_->OnStart ();
9991 }
10092#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
101-
10293 do
10394 {
10495 auto start = std::chrono::steady_clock::now ();
105-
10696#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
10797 if (worker_thread_instrumentation_ != nullptr )
10898 {
@@ -134,7 +124,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
134124 worker_thread_instrumentation_->BeforeWait ();
135125 }
136126#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
137-
138127 std::unique_lock<std::mutex> lk (cv_m_);
139128 cv_.wait_for (lk, remaining_wait_interval_ms, [this ]() {
140129 if (is_force_wakeup_background_worker_.load (std::memory_order_acquire))
@@ -151,7 +140,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
151140 worker_thread_instrumentation_->AfterWait ();
152141 }
153142#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
154-
155143 } while (IsShutdown () != true );
156144
157145#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
@@ -164,61 +152,39 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
164152
165153bool PeriodicExportingMetricReader::CollectAndExportOnce ()
166154{
167- std::atomic<bool > cancel_export_for_timeout{false };
168-
169155 std::uint64_t notify_force_flush = force_flush_pending_sequence_.load (std::memory_order_acquire);
170- std::unique_ptr<std::thread> task_thread;
171-
172156#if OPENTELEMETRY_HAVE_EXCEPTIONS
173157 try
174158 {
175159#endif
176- std::promise<void > sender;
177- auto receiver = sender.get_future ();
178-
179- task_thread.reset (
180- new std::thread ([this , &cancel_export_for_timeout, sender = std::move (sender)] {
181160#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
182- if (collect_thread_instrumentation_ != nullptr )
183- {
184- collect_thread_instrumentation_->OnStart ();
185- collect_thread_instrumentation_->BeforeLoad ();
186- }
161+ if (collect_thread_instrumentation_ != nullptr )
162+ {
163+ collect_thread_instrumentation_->OnStart ();
164+ collect_thread_instrumentation_->BeforeLoad ();
165+ }
187166#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
188-
189- this ->Collect ([this , &cancel_export_for_timeout](ResourceMetrics &metric_data) {
190- if (cancel_export_for_timeout.load (std::memory_order_acquire))
191- {
192- OTEL_INTERNAL_LOG_ERROR (
193- " [Periodic Exporting Metric Reader] Collect took longer configured time: "
194- << this ->export_timeout_millis_ .count () << " ms, and timed out" );
195- return false ;
196- }
197- this ->exporter_ ->Export (metric_data);
198- return true ;
199- });
200-
201- const_cast <std::promise<void > &>(sender).set_value ();
167+ auto start = std::chrono::steady_clock::now ();
168+ this ->Collect ([this , &start](ResourceMetrics &metric_data) {
169+ auto end = std::chrono::steady_clock::now ();
170+ if ((end - start) > this ->export_timeout_millis_ )
171+ {
172+ OTEL_INTERNAL_LOG_ERROR (
173+ " [Periodic Exporting Metric Reader] Collect took longer configured time: "
174+ << this ->export_timeout_millis_ .count () << " ms, and timed out" );
175+ return false ;
176+ }
177+ this ->exporter_ ->Export (metric_data);
178+ return true ;
179+ });
202180
203181#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
204- if (collect_thread_instrumentation_ != nullptr )
205- {
206- collect_thread_instrumentation_->AfterLoad ();
207- collect_thread_instrumentation_->OnEnd ();
208- }
209- #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
210- }));
211-
212- std::future_status status;
213- do
182+ if (collect_thread_instrumentation_ != nullptr )
214183 {
215- status = receiver.wait_for (std::chrono::milliseconds (export_timeout_millis_));
216- if (status == std::future_status::timeout)
217- {
218- cancel_export_for_timeout.store (true , std::memory_order_release);
219- break ;
220- }
221- } while (status != std::future_status::ready);
184+ collect_thread_instrumentation_->AfterLoad ();
185+ collect_thread_instrumentation_->OnEnd ();
186+ }
187+ #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
222188#if OPENTELEMETRY_HAVE_EXCEPTIONS
223189 }
224190 catch (std::exception &e)
@@ -235,11 +201,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
235201 }
236202#endif
237203
238- if (task_thread && task_thread->joinable ())
239- {
240- task_thread->join ();
241- }
242-
243204 std::uint64_t notified_sequence = force_flush_notified_sequence_.load (std::memory_order_acquire);
244205 while (notify_force_flush > notified_sequence)
245206 {
0 commit comments