Skip to content

Commit a5b6817

Browse files
committed
update
1 parent 762b73d commit a5b6817

File tree

2 files changed

+61
-19
lines changed

2 files changed

+61
-19
lines changed

ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,19 +328,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
328328
void ScheduleAbortSession(uint64_t session_id);
329329
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);
330330

331-
void WaitBackgroundThreadExit()
332-
{
333-
std::unique_ptr<std::thread> background_thread;
334-
{
335-
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
336-
background_thread.swap(background_thread_);
337-
}
331+
void SetBackgroundWaitFor(std::chrono::milliseconds ms);
338332

339-
if (background_thread && background_thread->joinable())
340-
{
341-
background_thread->join();
342-
}
343-
}
333+
void WaitBackgroundThreadExit();
344334

345335
private:
346336
void wakeupBackgroundThread();
@@ -366,6 +356,10 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
366356
std::unique_ptr<std::thread> background_thread_;
367357
std::chrono::milliseconds scheduled_delay_milliseconds_;
368358

359+
std::condition_variable background_thread_waiter_cv_;
360+
std::mutex background_thread_waiter_lock_;
361+
std::chrono::milliseconds background_thread_wait_for_;
362+
369363
nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
370364
};
371365

ext/src/http/client/curl/http_client_curl.cc

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,15 @@ HttpClient::HttpClient()
189189
next_session_id_{0},
190190
max_sessions_per_connection_{8},
191191
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
192+
background_thread_wait_for_{std::chrono::minutes{1}},
192193
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
193194
{}
194195

195196
HttpClient::~HttpClient()
196197
{
197198
while (true)
198199
{
200+
background_thread_wait_for_ = std::chrono::milliseconds{0};
199201
std::unique_ptr<std::thread> background_thread;
200202
{
201203
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
@@ -211,6 +213,7 @@ HttpClient::~HttpClient()
211213
}
212214
if (background_thread->joinable())
213215
{
216+
background_thread_waiter_cv_.notify_all();
214217
background_thread->join();
215218
}
216219
}
@@ -236,6 +239,7 @@ std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSes
236239
std::lock_guard<std::mutex> lock_guard{sessions_m_};
237240
sessions_.insert({session_id, session});
238241

242+
background_thread_waiter_cv_.notify_all();
239243
// FIXME: Session may leak if it do not call SendRequest
240244
return session;
241245
}
@@ -416,6 +420,23 @@ void HttpClient::MaybeSpawnBackgroundThread()
416420
still_running = 1;
417421
}
418422

423+
if (still_running > 0)
424+
{
425+
continue;
426+
}
427+
428+
// If there is no pending jobs, Exit flush thread if there is not data to flush more
429+
// than one minute.
430+
if (self->background_thread_wait_for_ != std::chrono::milliseconds{0})
431+
{
432+
std::unique_lock<std::mutex> lk{self->background_thread_waiter_lock_};
433+
if (self->background_thread_waiter_cv_.wait_for(
434+
lk, self->background_thread_wait_for_) != std::cv_status::timeout)
435+
{
436+
continue;
437+
}
438+
}
439+
419440
if (still_running == 0)
420441
{
421442
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
@@ -440,16 +461,18 @@ void HttpClient::MaybeSpawnBackgroundThread()
440461
still_running = 1;
441462
}
442463

464+
if (still_running > 0)
465+
{
466+
continue;
467+
}
468+
443469
// If there is no pending jobs, we can stop the background thread.
444-
if (still_running == 0)
470+
if (self->background_thread_)
445471
{
446-
if (self->background_thread_)
447-
{
448-
self->background_thread_->detach();
449-
self->background_thread_.reset();
450-
}
451-
break;
472+
self->background_thread_->detach();
473+
self->background_thread_.reset();
452474
}
475+
break;
453476
}
454477
}
455478
},
@@ -502,6 +525,30 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
502525
wakeupBackgroundThread();
503526
}
504527

528+
void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
529+
{
530+
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
531+
background_thread_wait_for_ = ms;
532+
}
533+
534+
void HttpClient::WaitBackgroundThreadExit()
535+
{
536+
std::unique_ptr<std::thread> background_thread;
537+
{
538+
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
539+
background_thread.swap(background_thread_);
540+
}
541+
542+
if (background_thread && background_thread->joinable())
543+
{
544+
auto wait_for = background_thread_wait_for_;
545+
background_thread_wait_for_ = std::chrono::milliseconds{0};
546+
background_thread_waiter_cv_.notify_all();
547+
background_thread->join();
548+
background_thread_wait_for_ = wait_for;
549+
}
550+
}
551+
505552
void HttpClient::wakeupBackgroundThread()
506553
{
507554
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs
@@ -513,6 +560,7 @@ void HttpClient::wakeupBackgroundThread()
513560
curl_multi_wakeup(multi_handle_);
514561
}
515562
#endif
563+
background_thread_waiter_cv_.notify_all();
516564
}
517565

518566
bool HttpClient::doAddSessions()

0 commit comments

Comments
 (0)