Skip to content

Commit 2b9bff9

Browse files
authored
[SDK] Do not frequently create and destroy http client threads (open-telemetry#3198)
1 parent d9ad23e commit 2b9bff9

File tree

4 files changed

+119
-16
lines changed

4 files changed

+119
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ Increment the:
1515

1616
## [Unreleased]
1717

18+
* [SDK] Do not frequently create and destroy http client threads
19+
[#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198)
20+
1821
## [1.18 2024-11-25]
1922

2023
* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter

ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,25 +322,16 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
322322

323323
inline CURLM *GetMultiHandle() noexcept { return multi_handle_; }
324324

325-
void MaybeSpawnBackgroundThread();
325+
// return true if create background thread, false is already exist background thread
326+
bool MaybeSpawnBackgroundThread();
326327

327328
void ScheduleAddSession(uint64_t session_id);
328329
void ScheduleAbortSession(uint64_t session_id);
329330
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);
330331

331-
void WaitBackgroundThreadExit()
332-
{
333-
std::unique_ptr<std::thread> background_thread;
334-
{
335-
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
336-
background_thread.swap(background_thread_);
337-
}
332+
void SetBackgroundWaitFor(std::chrono::milliseconds ms);
338333

339-
if (background_thread && background_thread->joinable())
340-
{
341-
background_thread->join();
342-
}
343-
}
334+
void WaitBackgroundThreadExit();
344335

345336
private:
346337
void wakeupBackgroundThread();
@@ -366,6 +357,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
366357
std::unique_ptr<std::thread> background_thread_;
367358
std::chrono::milliseconds scheduled_delay_milliseconds_;
368359

360+
std::chrono::milliseconds background_thread_wait_for_;
361+
std::atomic<bool> is_shutdown_{false};
362+
369363
nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
370364
};
371365

ext/src/http/client/curl/http_client_curl.cc

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,13 @@ HttpClient::HttpClient()
189189
next_session_id_{0},
190190
max_sessions_per_connection_{8},
191191
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
192+
background_thread_wait_for_{std::chrono::minutes{1}},
192193
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
193194
{}
194195

195196
HttpClient::~HttpClient()
196197
{
198+
is_shutdown_.store(true, std::memory_order_release);
197199
while (true)
198200
{
199201
std::unique_ptr<std::thread> background_thread;
@@ -211,6 +213,7 @@ HttpClient::~HttpClient()
211213
}
212214
if (background_thread->joinable())
213215
{
216+
wakeupBackgroundThread(); // if delay quit, wake up first
214217
background_thread->join();
215218
}
216219
}
@@ -335,29 +338,33 @@ void HttpClient::CleanupSession(uint64_t session_id)
335338
}
336339
}
337340

338-
void HttpClient::MaybeSpawnBackgroundThread()
341+
bool HttpClient::MaybeSpawnBackgroundThread()
339342
{
340343
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
341344
if (background_thread_)
342345
{
343-
return;
346+
return false;
344347
}
345348

346349
background_thread_.reset(new std::thread(
347350
[](HttpClient *self) {
348351
int still_running = 1;
352+
std::chrono::system_clock::time_point last_free_job_timepoint =
353+
std::chrono::system_clock::now();
354+
bool need_wait_more = false;
349355
while (true)
350356
{
351357
CURLMsg *msg;
352358
int queued;
353359
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);
360+
354361
// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
355362
// can not curl_multi_perform it again
356363
if (mc != CURLM_OK)
357364
{
358365
self->resetMultiHandle();
359366
}
360-
else if (still_running)
367+
else if (still_running || need_wait_more)
361368
{
362369
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
363370
// timeout to do the rest jobs
@@ -416,6 +423,32 @@ void HttpClient::MaybeSpawnBackgroundThread()
416423
still_running = 1;
417424
}
418425

426+
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
427+
if (still_running > 0)
428+
{
429+
last_free_job_timepoint = now;
430+
need_wait_more = false;
431+
continue;
432+
}
433+
434+
std::chrono::milliseconds wait_for = std::chrono::milliseconds::zero();
435+
436+
#if LIBCURL_VERSION_NUM >= 0x074400
437+
// only available with curl_multi_poll+curl_multi_wakeup, because curl_multi_wait would
438+
// cause CPU busy, curl_multi_wait+sleep could not wakeup quickly
439+
wait_for = self->background_thread_wait_for_;
440+
#endif
441+
if (self->is_shutdown_.load(std::memory_order_acquire))
442+
{
443+
wait_for = std::chrono::milliseconds::zero();
444+
}
445+
446+
if (now - last_free_job_timepoint < wait_for)
447+
{
448+
need_wait_more = true;
449+
continue;
450+
}
451+
419452
if (still_running == 0)
420453
{
421454
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
@@ -454,6 +487,7 @@ void HttpClient::MaybeSpawnBackgroundThread()
454487
}
455488
},
456489
this));
490+
return true;
457491
}
458492

459493
void HttpClient::ScheduleAddSession(uint64_t session_id)
@@ -502,6 +536,28 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
502536
wakeupBackgroundThread();
503537
}
504538

539+
void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
540+
{
541+
background_thread_wait_for_ = ms;
542+
}
543+
544+
void HttpClient::WaitBackgroundThreadExit()
545+
{
546+
is_shutdown_.store(true, std::memory_order_release);
547+
std::unique_ptr<std::thread> background_thread;
548+
{
549+
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
550+
background_thread.swap(background_thread_);
551+
}
552+
553+
if (background_thread && background_thread->joinable())
554+
{
555+
wakeupBackgroundThread();
556+
background_thread->join();
557+
}
558+
is_shutdown_.store(false, std::memory_order_release);
559+
}
560+
505561
void HttpClient::wakeupBackgroundThread()
506562
{
507563
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs

ext/test/http/curl_http_test.cc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
#include <curl/curlver.h>
45
#include <gtest/gtest.h>
56
#include <string.h>
67
#include <atomic>
@@ -11,6 +12,7 @@
1112
#include <memory>
1213
#include <mutex>
1314
#include <string>
15+
#include <thread>
1416
#include <utility>
1517
#include <vector>
1618

@@ -531,3 +533,51 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
531533
}
532534
}
533535
}
536+
537+
TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
538+
{
539+
auto http_client = http_client::HttpClientFactory::Create();
540+
std::static_pointer_cast<curl::HttpClient>(http_client)->MaybeSpawnBackgroundThread();
541+
// start background first, then test it could wakeup
542+
auto session = http_client->CreateSession("http://127.0.0.1:19000/get/");
543+
auto request = session->CreateRequest();
544+
request->SetUri("get/");
545+
auto handler = std::make_shared<GetEventHandler>();
546+
session->SendRequest(handler);
547+
std::this_thread::sleep_for(std::chrono::milliseconds{10}); // let it enter poll state
548+
auto beg = std::chrono::system_clock::now();
549+
http_client->FinishAllSessions();
550+
http_client.reset();
551+
// when background_thread_wait_for_ is used, it should have no side effect on elegant quit
552+
// wait should be less than scheduled_delay_milliseconds_
553+
// Due to load on CI hosts (some take 10ms), we assert it is less than 20ms
554+
auto cost = std::chrono::system_clock::now() - beg;
555+
ASSERT_TRUE(cost < std::chrono::milliseconds{20})
556+
<< "cost ms: " << std::chrono::duration_cast<std::chrono::milliseconds>(cost).count()
557+
<< " libcurl version: 0x" << std::hex << LIBCURL_VERSION_NUM;
558+
ASSERT_TRUE(handler->is_called_);
559+
ASSERT_TRUE(handler->got_response_);
560+
}
561+
562+
TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
563+
{
564+
{
565+
curl::HttpClient http_client;
566+
http_client.MaybeSpawnBackgroundThread();
567+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
568+
#if LIBCURL_VERSION_NUM >= 0x074200
569+
ASSERT_FALSE(http_client.MaybeSpawnBackgroundThread());
570+
#else
571+
// low version curl do not support delay quit, so old background would quit
572+
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
573+
#endif
574+
}
575+
{
576+
curl::HttpClient http_client;
577+
http_client.SetBackgroundWaitFor(std::chrono::milliseconds::zero());
578+
http_client.MaybeSpawnBackgroundThread();
579+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
580+
// we can disable delay quit by set wait for 0
581+
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
582+
}
583+
}

0 commit comments

Comments
 (0)