Skip to content

Commit 83958bc

Browse files
authored
perf: improve mutex contention (#208)
The telemetry refactoring exposed high mutex contention in our NGINX integration during shutdown/reload phase. This commit addresses mutex contention in the curl integration used by the default http client.
1 parent a7d71b5 commit 83958bc

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

src/datadog/curl.cpp

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class CurlImpl {
214214
};
215215

216216
void run();
217-
void handle_message(const CURLMsg &, std::unique_lock<std::mutex> &);
217+
void handle_message(const CURLMsg &);
218218
CURLcode log_on_error(CURLcode result);
219219
CURLMcode log_on_error(CURLMcode result);
220220

@@ -386,16 +386,13 @@ Expected<void> CurlImpl::post(
386386
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
387387
}
388388

389-
std::list<CURL *> node;
390-
node.push_back(handle.get());
391389
{
392390
std::lock_guard<std::mutex> lock(mutex_);
393-
new_handles_.splice(new_handles_.end(), node);
394-
395-
(void)headers.release();
396-
(void)handle.release();
397-
(void)request.release();
391+
new_handles_.emplace_back(handle.get());
398392
}
393+
std::ignore = headers.release();
394+
std::ignore = handle.release();
395+
std::ignore = request.release();
399396

400397
log_on_error(curl_.multi_wakeup(multi_handle_));
401398

@@ -405,6 +402,8 @@ Expected<void> CurlImpl::post(
405402
}
406403

407404
void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) {
405+
log_on_error(curl_.multi_wakeup(multi_handle_));
406+
408407
std::unique_lock<std::mutex> lock(mutex_);
409408
no_requests_.wait_until(lock, deadline, [this]() {
410409
return num_active_handles_ == 0 && new_handles_.empty();
@@ -465,31 +464,32 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
465464
}
466465

467466
void CurlImpl::run() {
468-
int num_messages_remaining;
469-
CURLMsg *message;
470-
constexpr int max_wait_milliseconds = 10000;
471-
std::unique_lock<std::mutex> lock(mutex_);
467+
int num_active_handles = 0;
468+
int num_messages_remaining = 0;
472469

473-
for (;;) {
474-
log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles_));
475-
if (num_active_handles_ == 0) {
476-
no_requests_.notify_all();
477-
}
470+
bool shutting_down = false;
471+
CURLMsg *message = nullptr;
472+
constexpr int max_wait_milliseconds = 10'000;
478473

479-
// If a request is done or errored out, curl will enqueue a "message" for
480-
// us to handle. Handle any pending messages.
481-
while ((message = curl_.multi_info_read(multi_handle_,
482-
&num_messages_remaining))) {
483-
handle_message(*message, lock);
484-
}
485-
lock.unlock();
486-
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
487-
max_wait_milliseconds, nullptr));
474+
std::unique_lock<std::mutex> lock(mutex_, std::defer_lock);
475+
std::list<CURL *> handles_to_process;
476+
477+
while (true) {
488478
lock.lock();
479+
shutting_down = shutting_down_;
480+
481+
handles_to_process.splice(handles_to_process.begin(), new_handles_);
482+
assert(new_handles_.empty());
483+
484+
num_active_handles_ =
485+
num_active_handles + static_cast<int>(handles_to_process.size());
486+
lock.unlock();
487+
488+
no_requests_.notify_all();
489489

490490
// New requests might have been added while we were sleeping.
491-
for (; !new_handles_.empty(); new_handles_.pop_front()) {
492-
CURL *handle = new_handles_.front();
491+
for (; !handles_to_process.empty(); handles_to_process.pop_front()) {
492+
CURL *handle = handles_to_process.front();
493493
char *user_data;
494494
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
495495
CURLE_OK) {
@@ -528,9 +528,18 @@ void CurlImpl::run() {
528528
request_handles_.insert(handle);
529529
}
530530

531-
if (shutting_down_) {
532-
break;
531+
if (shutting_down) break;
532+
533+
log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles));
534+
535+
// If a request is done or errored out, curl will enqueue a "message" for
536+
// us to handle. Handle any pending messages.
537+
while ((message = curl_.multi_info_read(multi_handle_,
538+
&num_messages_remaining))) {
539+
handle_message(*message);
533540
}
541+
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
542+
max_wait_milliseconds, nullptr));
534543
}
535544

536545
// We're shutting down. Clean up any remaining request handles.
@@ -548,8 +557,7 @@ void CurlImpl::run() {
548557
request_handles_.clear();
549558
}
550559

551-
void CurlImpl::handle_message(const CURLMsg &message,
552-
std::unique_lock<std::mutex> &lock) {
560+
void CurlImpl::handle_message(const CURLMsg &message) {
553561
if (message.msg != CURLMSG_DONE) {
554562
return;
555563
}
@@ -571,21 +579,17 @@ void CurlImpl::handle_message(const CURLMsg &message,
571579
error_message += curl_.easy_strerror(result);
572580
error_message += "): ";
573581
error_message += request.error_buffer;
574-
lock.unlock();
575582
request.on_error(
576583
Error{Error::CURL_REQUEST_FAILURE, std::move(error_message)});
577-
lock.lock();
578584
} else {
579585
long status;
580586
if (log_on_error(curl_.easy_getinfo_response_code(request_handle,
581587
&status)) != CURLE_OK) {
582588
status = -1;
583589
}
584590
HeaderReader reader(&request.response_headers_lower);
585-
lock.unlock();
586591
request.on_response(static_cast<int>(status), reader,
587592
std::move(request.response_body));
588-
lock.lock();
589593
}
590594

591595
log_on_error(curl_.multi_remove_handle(multi_handle_, request_handle));

test/test_curl.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
2626
WriteCallback on_write_ = nullptr;
2727
CURL *added_handle_ = nullptr;
2828
CURLMsg message_;
29+
enum class state {
30+
unknown,
31+
added,
32+
performed,
33+
finished
34+
} state_ = state::unknown;
2935
// Since `SingleRequestMockCurlLibrary` supports at most one request,
3036
// `created_handles_` and `destroyed_handles_` will have size zero or one.
3137
std::unordered_multiset<CURL *> created_handles_;
@@ -76,6 +82,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
7682

7783
CURLMcode multi_add_handle(CURLM *, CURL *easy_handle) override {
7884
added_handle_ = easy_handle;
85+
state_ = state::added;
7986
return CURLM_OK;
8087
}
8188
CURLMsg *multi_info_read(CURLM *, int *msgs_in_queue) override {
@@ -85,7 +92,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
8592
}
8693

8794
*msgs_in_queue = added_handle_ != nullptr;
88-
if (*msgs_in_queue == 0) {
95+
if (*msgs_in_queue == 0 || state_ != state::performed) {
8996
return nullptr;
9097
}
9198
message_.msg = CURLMSG_DONE;
@@ -127,6 +134,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
127134
REQUIRE(on_write_(body.data() + body.size() / 2, 1, remaining,
128135
user_data_on_write_) == remaining);
129136

137+
state_ = state::performed;
130138
return CURLM_OK;
131139
}
132140
CURLMcode multi_remove_handle(CURLM *, CURL *easy_handle) override {

0 commit comments

Comments
 (0)