Skip to content

Commit 290cc27

Browse files
authored
fix(curl): enforce drain to clear requests in-flight. (#215)
1 parent 6a724a4 commit 290cc27

File tree

3 files changed

+77
-37
lines changed

3 files changed

+77
-37
lines changed

src/datadog/curl.cpp

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ class CurlImpl {
234234
std::chrono::steady_clock::time_point deadline);
235235

236236
void drain(std::chrono::steady_clock::time_point deadline);
237+
238+
void clear_requests();
237239
};
238240

239241
namespace {
@@ -401,13 +403,32 @@ Expected<void> CurlImpl::post(
401403
return Error{Error::CURL_REQUEST_SETUP_FAILED, curl_.easy_strerror(error)};
402404
}
403405

406+
void CurlImpl::clear_requests() {
407+
for (const auto &handle : request_handles_) {
408+
char *user_data;
409+
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) ==
410+
CURLE_OK) {
411+
delete reinterpret_cast<Request *>(user_data);
412+
}
413+
414+
log_on_error(curl_.multi_remove_handle(multi_handle_, handle));
415+
curl_.easy_cleanup(handle);
416+
}
417+
418+
request_handles_.clear();
419+
}
420+
404421
void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) {
405422
log_on_error(curl_.multi_wakeup(multi_handle_));
406423

407424
std::unique_lock<std::mutex> lock(mutex_);
408425
no_requests_.wait_until(lock, deadline, [this]() {
426+
curl_.multi_wakeup(multi_handle_);
409427
return num_active_handles_ == 0 && new_handles_.empty();
410428
});
429+
430+
log_on_error(curl_.multi_wakeup(multi_handle_));
431+
clear_requests();
411432
}
412433

413434
std::size_t CurlImpl::on_read_header(char *data, std::size_t,
@@ -542,19 +563,8 @@ void CurlImpl::run() {
542563
max_wait_milliseconds, nullptr));
543564
}
544565

545-
// We're shutting down. Clean up any remaining request handles.
546-
for (const auto &handle : request_handles_) {
547-
char *user_data;
548-
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) ==
549-
CURLE_OK) {
550-
delete reinterpret_cast<Request *>(user_data);
551-
}
552-
553-
log_on_error(curl_.multi_remove_handle(multi_handle_, handle));
554-
curl_.easy_cleanup(handle);
555-
}
556-
557-
request_handles_.clear();
566+
// We're shutting down. Clean up any remaining request handles.
567+
clear_requests();
558568
}
559569

560570
void CurlImpl::handle_message(const CURLMsg &message) {

src/datadog/telemetry/telemetry_impl.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const telemetry::Distribution request_duration{"telemetry_api.ms", "telemetry",
5353

5454
namespace {
5555

56+
constexpr std::chrono::steady_clock::duration request_timeout = 2s;
57+
5658
HTTPClient::URL make_telemetry_endpoint(HTTPClient::URL url) {
5759
append(url.path, "/telemetry/proxy/api/v2/apmtelemetry");
5860
return url;
@@ -206,10 +208,8 @@ Telemetry::Telemetry(FinalizedConfiguration config,
206208
clock_(std::move(clock)),
207209
scheduler_(event_scheduler),
208210
host_info_(get_host_info()) {
209-
// Callback for successful telemetry HTTP requests, to examine HTTP
210-
// status.
211211
send_telemetry("app-started", app_started());
212-
http_client_->drain(clock_().tick + 2s);
212+
http_client_->drain(clock_().tick + request_timeout);
213213
schedule_tasks();
214214
}
215215

@@ -232,7 +232,7 @@ Telemetry::~Telemetry() {
232232
// The app-closing message is bundled with a message containing the
233233
// final metric values.
234234
send_telemetry("app-closing", app_closing());
235-
http_client_->drain(clock_().tick + 2s);
235+
http_client_->drain(clock_().tick + request_timeout);
236236
}
237237
}
238238

@@ -361,10 +361,10 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
361361
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
362362
payload.size());
363363

364-
auto post_result =
365-
http_client_->post(telemetry_endpoint_, set_telemetry_headers,
366-
std::move(payload), std::move(telemetry_on_response),
367-
std::move(telemetry_on_error), clock_().tick + 5s);
364+
auto post_result = http_client_->post(
365+
telemetry_endpoint_, set_telemetry_headers, std::move(payload),
366+
std::move(telemetry_on_response), std::move(telemetry_on_error),
367+
clock_().tick + request_timeout);
368368
if (auto* error = post_result.if_error()) {
369369
increment_counter(internal_metrics::errors,
370370
{"type:network", "endpoint:agent"});

test/test_curl.cpp

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
4343
// message to the event loop. This allows races to be explored between request
4444
// registration and `Curl` shutdown.
4545
bool delay_message_ = false;
46+
std::function<CURLMcode()> on_multi_perform = nullptr;
4647

4748
void easy_cleanup(CURL *handle) override {
4849
destroyed_handles_.insert(handle);
@@ -100,12 +101,17 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
100101
message_.data.result = message_result_;
101102
return &message_;
102103
}
104+
103105
CURLMcode multi_perform(CURLM *, int *running_handles) override {
104106
if (!added_handle_) {
105107
*running_handles = 0;
106108
return CURLM_OK;
107109
}
108110

111+
if (on_multi_perform != nullptr) {
112+
return on_multi_perform();
113+
}
114+
109115
// If any of these `REQUIRE`s fail, an exception will be thrown and the
110116
// test will abort. The runtime will print the exception first, though.
111117
REQUIRE(on_header_);
@@ -144,7 +150,38 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
144150
}
145151
};
146152

147-
TEST_CASE("parse response headers and body", "[curl]") {
153+
#define CURL_TEST(x) TEST_CASE(x, "[curl]")
154+
155+
const auto ignore = [](auto &&...) {};
156+
157+
using namespace std::chrono_literals;
158+
159+
CURL_TEST("API") {
160+
const auto clock = default_clock;
161+
const auto logger = std::make_shared<MockLogger>();
162+
SingleRequestMockCurlLibrary library;
163+
const auto client = std::make_shared<Curl>(logger, clock, library);
164+
165+
SECTION("drain remove requests in-flight") {
166+
/// Prevent to process the request.
167+
library.on_multi_perform = [] { return CURLM_OK; };
168+
169+
const HTTPClient::URL url = {"http", "whatever", ""};
170+
171+
const auto result = client->post(url, ignore, "whatever", ignore, ignore,
172+
clock().tick + 60min);
173+
174+
REQUIRE(result);
175+
REQUIRE(library.created_handles_.size() == 1);
176+
REQUIRE(library.destroyed_handles_.size() == 0);
177+
178+
client->drain(clock().tick + 1s);
179+
CHECK(library.created_handles_.size() == 1);
180+
CHECK(library.destroyed_handles_.size() == 1);
181+
}
182+
}
183+
184+
CURL_TEST("parse response headers and body") {
148185
const auto clock = default_clock;
149186
const auto logger = std::make_shared<MockLogger>();
150187
SingleRequestMockCurlLibrary library;
@@ -177,7 +214,7 @@ TEST_CASE("parse response headers and body", "[curl]") {
177214
std::exception_ptr exception;
178215
const HTTPClient::URL url = {"http", "whatever", ""};
179216
const auto result = client->post(
180-
url, [](const auto &) {}, "whatever",
217+
url, ignore, "whatever",
181218
[&](int status, const DictReader &headers, std::string body) {
182219
try {
183220
REQUIRE(status == 200);
@@ -211,7 +248,7 @@ TEST_CASE("parse response headers and body", "[curl]") {
211248
}
212249
}
213250

214-
TEST_CASE("bad multi-handle means error mode", "[curl]") {
251+
CURL_TEST("bad multi-handle means error mode") {
215252
// If libcurl fails to allocate a multi-handle, then the HTTP client enters a
216253
// mode where calls to `post` always return an error.
217254
class MockCurlLibrary : public CurlLibrary {
@@ -224,7 +261,6 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") {
224261
const auto client = std::make_shared<Curl>(logger, clock, library);
225262
REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED);
226263

227-
const auto ignore = [](auto &&...) {};
228264
const HTTPClient::URL url = {"http", "whatever", ""};
229265
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
230266
const auto result =
@@ -233,7 +269,7 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") {
233269
REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING);
234270
}
235271

236-
TEST_CASE("bad std::thread means error mode", "[curl]") {
272+
CURL_TEST("bad std::thread means error mode") {
237273
// If `Curl` is unable to start its event loop thread, then it enters a mode
238274
// where calls to `post` always return an error.
239275
const auto clock = default_clock;
@@ -246,7 +282,6 @@ TEST_CASE("bad std::thread means error mode", "[curl]") {
246282
});
247283
REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED);
248284

249-
const auto ignore = [](auto &&...) {};
250285
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
251286
const HTTPClient::URL url = {"http", "whatever", ""};
252287
const auto result =
@@ -255,7 +290,7 @@ TEST_CASE("bad std::thread means error mode", "[curl]") {
255290
REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING);
256291
}
257292

258-
TEST_CASE("fail to allocate request handle", "[curl]") {
293+
CURL_TEST("fail to allocate request handle") {
259294
// Each call to `Curl::post` allocates a new "easy handle." If that fails,
260295
// then `post` immediately returns an error.
261296
class MockCurlLibrary : public CurlLibrary {
@@ -268,7 +303,6 @@ TEST_CASE("fail to allocate request handle", "[curl]") {
268303
MockCurlLibrary library;
269304
const auto client = std::make_shared<Curl>(logger, clock, library);
270305

271-
const auto ignore = [](auto &&...) {};
272306
const HTTPClient::URL url = {"http", "whatever", ""};
273307
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
274308
const auto result =
@@ -277,7 +311,7 @@ TEST_CASE("fail to allocate request handle", "[curl]") {
277311
REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED);
278312
}
279313

280-
TEST_CASE("setopt failures", "[curl]") {
314+
CURL_TEST("setopt failures") {
281315
// Each call to `Curl::post` allocates a new "easy handle" and sets various
282316
// options on it. Any of those setters can fail. When one does, `post`
283317
// immediately returns an error.
@@ -386,7 +420,6 @@ TEST_CASE("setopt failures", "[curl]") {
386420
const auto logger = std::make_shared<NullLogger>();
387421
const auto client = std::make_shared<Curl>(logger, clock, library);
388422

389-
const auto ignore = [](auto &&...) {};
390423
HTTPClient::URL url;
391424
if (test_case.which_fails == CURLOPT_UNIX_SOCKET_PATH) {
392425
url.scheme = "unix";
@@ -404,7 +437,7 @@ TEST_CASE("setopt failures", "[curl]") {
404437
REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED);
405438
}
406439

407-
TEST_CASE("handles are always cleaned up", "[curl]") {
440+
CURL_TEST("handles are always cleaned up") {
408441
const auto clock = default_clock;
409442
const auto logger = std::make_shared<MockLogger>();
410443
SingleRequestMockCurlLibrary library;
@@ -416,7 +449,7 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
416449
const HTTPClient::URL url = {"http", "whatever", ""};
417450
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
418451
const auto result = client->post(
419-
url, [](const auto &) {}, "whatever",
452+
url, ignore, "whatever",
420453
[&](int status, const DictReader & /*headers*/, std::string body) {
421454
try {
422455
REQUIRE(status == 200);
@@ -439,7 +472,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
439472
SECTION("when an error occurs") {
440473
Optional<Error> post_error;
441474
const HTTPClient::URL url = {"http", "whatever", ""};
442-
const auto ignore = [](auto &&...) {};
443475
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
444476
library.message_result_ = CURLE_COULDNT_CONNECT; // any error would do
445477
const auto result = client->post(
@@ -453,7 +485,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
453485

454486
SECTION("when we shut down while a request is in flight") {
455487
const HTTPClient::URL url = {"http", "whatever", ""};
456-
const auto ignore = [](auto &&...) {};
457488
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
458489
library.delay_message_ = true;
459490
const auto result =
@@ -469,11 +500,10 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
469500
REQUIRE(library.created_handles_ == library.destroyed_handles_);
470501
}
471502

472-
TEST_CASE("post() deadline exceeded before request start", "[curl]") {
503+
CURL_TEST("post() deadline exceeded before request start") {
473504
const auto clock = default_clock;
474505
Curl client{std::make_shared<NullLogger>(), clock};
475506

476-
const auto ignore = [](auto &&...) {};
477507
const HTTPClient::URL url = {"http", "whatever", ""};
478508
const std::string body;
479509
const auto deadline = clock().tick - std::chrono::milliseconds(1);

0 commit comments

Comments
 (0)