Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
void ScheduleAbortSession(uint64_t session_id);
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);

void WaitBackgroundThreadExit()
{
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}
void SetBackgroundWaitFor(std::chrono::milliseconds ms);

if (background_thread && background_thread->joinable())
{
background_thread->join();
}
}
void WaitBackgroundThreadExit();

private:
void wakeupBackgroundThread();
Expand All @@ -366,6 +356,10 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::unique_ptr<std::thread> background_thread_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

std::condition_variable background_thread_waiter_cv_;
std::mutex background_thread_waiter_lock_;
std::chrono::milliseconds background_thread_wait_for_;

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
};

Expand Down
62 changes: 55 additions & 7 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ HttpClient::HttpClient()
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
background_thread_wait_for_{std::chrono::minutes{1}},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}

HttpClient::~HttpClient()
{
while (true)
{
background_thread_wait_for_ = std::chrono::milliseconds{0};
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
Expand All @@ -211,6 +213,7 @@ HttpClient::~HttpClient()
}
if (background_thread->joinable())
{
background_thread_waiter_cv_.notify_all();
background_thread->join();
}
}
Expand All @@ -236,6 +239,7 @@ std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSes
std::lock_guard<std::mutex> lock_guard{sessions_m_};
sessions_.insert({session_id, session});

background_thread_waiter_cv_.notify_all();
// FIXME: Session may leak if it do not call SendRequest
return session;
}
Expand Down Expand Up @@ -416,6 +420,23 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

if (still_running > 0)
{
continue;
}

// If there is no pending jobs, Exit flush thread if there is not data to flush more
// than one minute.
if (self->background_thread_wait_for_ != std::chrono::milliseconds{0})
{
std::unique_lock<std::mutex> lk{self->background_thread_waiter_lock_};
if (self->background_thread_waiter_cv_.wait_for(
lk, self->background_thread_wait_for_) != std::cv_status::timeout)
{
continue;
}
}

if (still_running == 0)
{
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
Expand All @@ -440,16 +461,18 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

if (still_running > 0)
{
continue;
}

// If there is no pending jobs, we can stop the background thread.
if (still_running == 0)
if (self->background_thread_)
{
if (self->background_thread_)
{
self->background_thread_->detach();
self->background_thread_.reset();
}
break;
self->background_thread_->detach();
self->background_thread_.reset();
}
break;
}
}
},
Expand Down Expand Up @@ -502,6 +525,30 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
wakeupBackgroundThread();
}

void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread_wait_for_ = ms;
}

void HttpClient::WaitBackgroundThreadExit()
{
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}

if (background_thread && background_thread->joinable())
{
auto wait_for = background_thread_wait_for_;
background_thread_wait_for_ = std::chrono::milliseconds{0};
background_thread_waiter_cv_.notify_all();
background_thread->join();
background_thread_wait_for_ = wait_for;
}
}

void HttpClient::wakeupBackgroundThread()
{
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs
Expand All @@ -513,6 +560,7 @@ void HttpClient::wakeupBackgroundThread()
curl_multi_wakeup(multi_handle_);
}
#endif
background_thread_waiter_cv_.notify_all();
}

bool HttpClient::doAddSessions()
Expand Down
Loading