Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
void ScheduleAbortSession(uint64_t session_id);
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);

void WaitBackgroundThreadExit()
{
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}
void SetBackgroundWaitFor(std::chrono::milliseconds ms);

if (background_thread && background_thread->joinable())
{
background_thread->join();
}
}
void WaitBackgroundThreadExit();

private:
void wakeupBackgroundThread();
Expand All @@ -366,6 +356,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::unique_ptr<std::thread> background_thread_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

std::chrono::milliseconds background_thread_wait_for_;
std::atomic<bool> is_shutdown;

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
};

Expand Down
57 changes: 55 additions & 2 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,15 @@ HttpClient::HttpClient()
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
background_thread_wait_for_{std::chrono::minutes{1}},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}
{
is_shutdown.store(false);
}

HttpClient::~HttpClient()
{
is_shutdown.store(true, std::memory_order_release);
while (true)
{
std::unique_ptr<std::thread> background_thread;
Expand Down Expand Up @@ -346,18 +350,22 @@ void HttpClient::MaybeSpawnBackgroundThread()
background_thread_.reset(new std::thread(
[](HttpClient *self) {
int still_running = 1;
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
bool need_wait_more = false;
while (true)
{
CURLMsg *msg;
int queued;
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);

// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
// can not curl_multi_perform it again
if (mc != CURLM_OK)
{
self->resetMultiHandle();
}
else if (still_running)
else if (still_running || need_wait_more)
{
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
// timeout to do the rest jobs
Expand Down Expand Up @@ -416,6 +424,30 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
if (still_running > 0)
{
last_free_job_timepoint = now;
need_wait_more = false;
continue;
}

std::chrono::milliseconds wait_for;
#if LIBCURL_VERSION_NUM >= 0x074200
// only avaliable with curl_multi_poll
wait_for = self->background_thread_wait_for_;
#endif
if (self->is_shutdown.load(std::memory_order_acquire))
{
wait_for = std::chrono::milliseconds{0};
}

if (now - last_free_job_timepoint < wait_for)
{
need_wait_more = true;
continue;
}

if (still_running == 0)
{
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
Expand Down Expand Up @@ -502,6 +534,27 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
wakeupBackgroundThread();
}

void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
{
background_thread_wait_for_ = ms;
}

void HttpClient::WaitBackgroundThreadExit()
{
is_shutdown.store(true, std::memory_order_release);
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}

if (background_thread && background_thread->joinable())
{
background_thread->join();
}
is_shutdown.store(false, std::memory_order_release);
}

void HttpClient::wakeupBackgroundThread()
{
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs
Expand Down
18 changes: 18 additions & 0 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,21 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
}
}
}

TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
{
auto http_client = http_client::HttpClientFactory::Create();
std::dynamic_pointer_cast<curl::HttpClient>(http_client)->MaybeSpawnBackgroundThread();
auto beg = std::chrono::system_clock::now();
auto session = http_client->CreateSession("http://127.0.0.1:19000/get/");
auto request = session->CreateRequest();
request->SetUri("get/");
auto handler = std::make_shared<GetEventHandler>();
session->SendRequest(handler);
http_client->FinishAllSessions();
http_client.reset();
// when use background_thread_wait_for_ should have no side effort on elegant quit
ASSERT_TRUE(std::chrono::system_clock::now() - beg < std::chrono::milliseconds{5});
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}
Loading