Skip to content

Commit d7086f7

Browse files
authored
Merge branch 'main' into dmehala/origin-detection
2 parents 6274290 + e14a5a4 commit d7086f7

File tree

11 files changed

+400
-146
lines changed

11 files changed

+400
-146
lines changed

.github/workflows/codeql-analysis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
2626

2727
- name: Initialize CodeQL
28-
uses: github/codeql-action/init@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13
28+
uses: github/codeql-action/init@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16
2929
with:
3030
languages: ${{ matrix.language }}
3131

@@ -36,4 +36,4 @@ jobs:
3636
run: bin/cmake-build --preset=ci-codeql
3737

3838
- name: Perform CodeQL Analysis
39-
uses: github/codeql-action/analyze@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13
39+
uses: github/codeql-action/analyze@28deaeda66b76a05916b6923827895f2b14ab387 # v3.28.16

src/datadog/curl.cpp

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class CurlImpl {
214214
};
215215

216216
void run();
217-
void handle_message(const CURLMsg &, std::unique_lock<std::mutex> &);
217+
void handle_message(const CURLMsg &);
218218
CURLcode log_on_error(CURLcode result);
219219
CURLMcode log_on_error(CURLMcode result);
220220

@@ -386,16 +386,13 @@ Expected<void> CurlImpl::post(
386386
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
387387
}
388388

389-
std::list<CURL *> node;
390-
node.push_back(handle.get());
391389
{
392390
std::lock_guard<std::mutex> lock(mutex_);
393-
new_handles_.splice(new_handles_.end(), node);
394-
395-
(void)headers.release();
396-
(void)handle.release();
397-
(void)request.release();
391+
new_handles_.emplace_back(handle.get());
398392
}
393+
std::ignore = headers.release();
394+
std::ignore = handle.release();
395+
std::ignore = request.release();
399396

400397
log_on_error(curl_.multi_wakeup(multi_handle_));
401398

@@ -405,6 +402,8 @@ Expected<void> CurlImpl::post(
405402
}
406403

407404
void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) {
405+
log_on_error(curl_.multi_wakeup(multi_handle_));
406+
408407
std::unique_lock<std::mutex> lock(mutex_);
409408
no_requests_.wait_until(lock, deadline, [this]() {
410409
return num_active_handles_ == 0 && new_handles_.empty();
@@ -465,31 +464,32 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
465464
}
466465

467466
void CurlImpl::run() {
468-
int num_messages_remaining;
469-
CURLMsg *message;
470-
constexpr int max_wait_milliseconds = 10000;
471-
std::unique_lock<std::mutex> lock(mutex_);
467+
int num_active_handles = 0;
468+
int num_messages_remaining = 0;
472469

473-
for (;;) {
474-
log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles_));
475-
if (num_active_handles_ == 0) {
476-
no_requests_.notify_all();
477-
}
470+
bool shutting_down = false;
471+
CURLMsg *message = nullptr;
472+
constexpr int max_wait_milliseconds = 10'000;
478473

479-
// If a request is done or errored out, curl will enqueue a "message" for
480-
// us to handle. Handle any pending messages.
481-
while ((message = curl_.multi_info_read(multi_handle_,
482-
&num_messages_remaining))) {
483-
handle_message(*message, lock);
484-
}
485-
lock.unlock();
486-
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
487-
max_wait_milliseconds, nullptr));
474+
std::unique_lock<std::mutex> lock(mutex_, std::defer_lock);
475+
std::list<CURL *> handles_to_process;
476+
477+
while (true) {
488478
lock.lock();
479+
shutting_down = shutting_down_;
480+
481+
handles_to_process.splice(handles_to_process.begin(), new_handles_);
482+
assert(new_handles_.empty());
483+
484+
num_active_handles_ =
485+
num_active_handles + static_cast<int>(handles_to_process.size());
486+
lock.unlock();
487+
488+
no_requests_.notify_all();
489489

490490
// New requests might have been added while we were sleeping.
491-
for (; !new_handles_.empty(); new_handles_.pop_front()) {
492-
CURL *handle = new_handles_.front();
491+
for (; !handles_to_process.empty(); handles_to_process.pop_front()) {
492+
CURL *handle = handles_to_process.front();
493493
char *user_data;
494494
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
495495
CURLE_OK) {
@@ -528,9 +528,18 @@ void CurlImpl::run() {
528528
request_handles_.insert(handle);
529529
}
530530

531-
if (shutting_down_) {
532-
break;
531+
if (shutting_down) break;
532+
533+
log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles));
534+
535+
// If a request is done or errored out, curl will enqueue a "message" for
536+
// us to handle. Handle any pending messages.
537+
while ((message = curl_.multi_info_read(multi_handle_,
538+
&num_messages_remaining))) {
539+
handle_message(*message);
533540
}
541+
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
542+
max_wait_milliseconds, nullptr));
534543
}
535544

536545
// We're shutting down. Clean up any remaining request handles.
@@ -548,8 +557,7 @@ void CurlImpl::run() {
548557
request_handles_.clear();
549558
}
550559

551-
void CurlImpl::handle_message(const CURLMsg &message,
552-
std::unique_lock<std::mutex> &lock) {
560+
void CurlImpl::handle_message(const CURLMsg &message) {
553561
if (message.msg != CURLMSG_DONE) {
554562
return;
555563
}
@@ -571,21 +579,17 @@ void CurlImpl::handle_message(const CURLMsg &message,
571579
error_message += curl_.easy_strerror(result);
572580
error_message += "): ";
573581
error_message += request.error_buffer;
574-
lock.unlock();
575582
request.on_error(
576583
Error{Error::CURL_REQUEST_FAILURE, std::move(error_message)});
577-
lock.lock();
578584
} else {
579585
long status;
580586
if (log_on_error(curl_.easy_getinfo_response_code(request_handle,
581587
&status)) != CURLE_OK) {
582588
status = -1;
583589
}
584590
HeaderReader reader(&request.response_headers_lower);
585-
lock.unlock();
586591
request.on_response(static_cast<int>(status), reader,
587592
std::move(request.response_body));
588-
lock.lock();
589593
}
590594

591595
log_on_error(curl_.multi_remove_handle(multi_handle_, request_handle));

src/datadog/datadog_agent.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,24 @@ void DatadogAgent::flush() {
246246
return;
247247
}
248248

249+
// Ideally:
250+
/*auto [encode_result, duration] = mesure([&trace_chunks] {*/
251+
/* std::string body;*/
252+
/* msgpack_encode(body, trace_chunks);*/
253+
/*});*/
254+
249255
std::string body;
256+
257+
auto beg = std::chrono::steady_clock::now();
250258
auto encode_result = msgpack_encode(body, trace_chunks);
259+
auto end = std::chrono::steady_clock::now();
260+
261+
telemetry::distribution::add(
262+
metrics::tracer::trace_chunk_serialization_duration,
263+
std::chrono::duration_cast<std::chrono::microseconds>(end - beg).count());
264+
telemetry::distribution::add(metrics::tracer::trace_chunk_serialized_bytes,
265+
static_cast<uint64_t>(body.size()));
266+
251267
if (auto* error = encode_result.if_error()) {
252268
logger_->log_error(*error);
253269
return;
@@ -335,11 +351,17 @@ void DatadogAgent::flush() {
335351
};
336352

337353
telemetry::counter::increment(metrics::tracer::api::requests);
354+
telemetry::distribution::add(metrics::tracer::api::bytes_sent,
355+
static_cast<uint64_t>(body.size()));
356+
338357
auto post_result =
339358
http_client_->post(traces_endpoint_, std::move(set_request_headers),
340359
std::move(body), std::move(on_response),
341360
std::move(on_error), clock_().tick + request_timeout_);
342361
if (auto* error = post_result.if_error()) {
362+
// NOTE(@dmehala): `technical` is a better kind of errors.
363+
telemetry::counter::increment(metrics::tracer::api::errors,
364+
{"type:network"});
343365
logger_->log_error(
344366
error->with_prefix("Unexpected error submitting traces: "));
345367
}

src/datadog/telemetry/telemetry_impl.cpp

Lines changed: 98 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,40 @@ using namespace datadog::tracing;
1717
using namespace std::chrono_literals;
1818

1919
namespace datadog::telemetry {
20+
namespace internal_metrics {
21+
22+
/// The number of logs created with a given log level. Useful for calculating
23+
/// impact for other features (automatic sending of logs). Levels should be one
24+
/// of `debug`, `info`, `warn`, `error`, `critical`.
25+
const telemetry::Counter logs_created{"logs_created", "general", true};
26+
27+
/// The number of requests sent to the api endpoint in the agent that errored,
28+
/// tagged by the error type (e.g. `type:timeout`, `type:network`,
29+
/// `type:status_code`) and Endpoint (`endpoint:agent`, `endpoint:agentless`).
30+
const telemetry::Counter errors{"telemetry_api.errors", "telemetry", true};
31+
32+
/// The number of requests sent to a telemetry endpoint, regardless of success,
33+
/// tagged by the endpoint (`endpoint:agent`, `endpoint:agentless`).
34+
const telemetry::Counter requests{"telemetry_api.requests", "telemetry", true};
35+
36+
/// The number of responses received from the endpoint, tagged with status code
37+
/// (`status_code:200`, `status_code:404`) and endpoint (`endpoint:agent`,
38+
/// `endpoint:agentless`).
39+
const telemetry::Counter responses{"telemetry_api.responses", "telemetry",
40+
true};
41+
42+
/// The size of the payload sent to the stats endpoint in bytes, tagged by the
43+
/// endpoint (`endpoint:agent`, `endpoint:agentless`).
44+
const telemetry::Distribution bytes_sent{"telemetry_api.bytes", "telemetry",
45+
true};
46+
47+
/// The time it takes to send the payload sent to the endpoint in ms, tagged by
48+
/// the endpoint (`endpoint:agent`, `endpoint:agentless`).
49+
const telemetry::Distribution request_duration{"telemetry_api.ms", "telemetry",
50+
true};
51+
52+
} // namespace internal_metrics
53+
2054
namespace {
2155

2256
HTTPClient::URL make_telemetry_endpoint(HTTPClient::URL url) {
@@ -174,26 +208,8 @@ Telemetry::Telemetry(FinalizedConfiguration config,
174208
host_info_(get_host_info()) {
175209
// Callback for successful telemetry HTTP requests, to examine HTTP
176210
// status.
177-
telemetry_on_response_ = [logger = logger_](
178-
int response_status,
179-
const DictReader& /*response_headers*/,
180-
std::string response_body) {
181-
if (response_status < 200 || response_status >= 300) {
182-
logger->log_error([&](auto& stream) {
183-
stream << "Unexpected telemetry response status " << response_status
184-
<< " with body (if any, starts on next line):\n"
185-
<< response_body;
186-
});
187-
}
188-
};
189-
190-
// Callback for unsuccessful telemetry HTTP requests.
191-
telemetry_on_error_ = [logger = logger_](Error error) {
192-
logger->log_error(error.with_prefix(
193-
"Error occurred during HTTP request for telemetry: "));
194-
};
195-
196211
send_telemetry("app-started", app_started());
212+
http_client_->drain(clock_().tick + 2s);
197213
schedule_tasks();
198214
}
199215

@@ -216,20 +232,23 @@ Telemetry::~Telemetry() {
216232
// The app-closing message is bundled with a message containing the
217233
// final metric values.
218234
send_telemetry("app-closing", app_closing());
219-
http_client_->drain(clock_().tick + 1s);
235+
http_client_->drain(clock_().tick + 2s);
220236
}
221237
}
222238

223239
Telemetry::Telemetry(Telemetry&& rhs)
224240
: config_(std::move(rhs.config_)),
225241
logger_(std::move(rhs.logger_)),
226-
telemetry_on_response_(std::move(rhs.telemetry_on_response_)),
227-
telemetry_on_error_(std::move(rhs.telemetry_on_error_)),
228242
telemetry_endpoint_(std::move(rhs.telemetry_endpoint_)),
229243
tracer_signature_(std::move(rhs.tracer_signature_)),
230244
http_client_(rhs.http_client_),
231245
clock_(std::move(rhs.clock_)),
232246
scheduler_(std::move(rhs.scheduler_)),
247+
counters_(std::move(rhs.counters_)),
248+
counters_snapshot_(std::move(rhs.counters_snapshot_)),
249+
rates_(std::move(rhs.rates_)),
250+
rates_snapshot_(std::move(rhs.rates_snapshot_)),
251+
distributions_(std::move(rhs.distributions_)),
233252
seq_id_(rhs.seq_id_),
234253
config_seq_ids_(rhs.config_seq_ids_),
235254
host_info_(rhs.host_info_) {
@@ -242,13 +261,17 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) {
242261
cancel_tasks(rhs.tasks_);
243262
std::swap(config_, rhs.config_);
244263
std::swap(logger_, rhs.logger_);
245-
std::swap(telemetry_on_response_, rhs.telemetry_on_response_);
246-
std::swap(telemetry_on_error_, rhs.telemetry_on_error_);
247264
std::swap(telemetry_endpoint_, rhs.telemetry_endpoint_);
248265
std::swap(http_client_, rhs.http_client_);
249266
std::swap(tracer_signature_, rhs.tracer_signature_);
250267
std::swap(http_client_, rhs.http_client_);
251268
std::swap(clock_, rhs.clock_);
269+
std::swap(scheduler_, rhs.scheduler_);
270+
std::swap(counters_, rhs.counters_);
271+
std::swap(counters_snapshot_, rhs.counters_snapshot_);
272+
std::swap(rates_, rhs.rates_);
273+
std::swap(rates_snapshot_, rhs.rates_snapshot_);
274+
std::swap(distributions_, rhs.distributions_);
252275
std::swap(seq_id_, rhs.seq_id_);
253276
std::swap(config_seq_ids_, rhs.config_seq_ids_);
254277
std::swap(host_info_, rhs.host_info_);
@@ -259,16 +282,19 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) {
259282

260283
void Telemetry::log_error(std::string message) {
261284
if (!config_.report_logs) return;
285+
increment_counter(internal_metrics::logs_created, {"level:error"});
262286
log(std::move(message), LogLevel::ERROR);
263287
}
264288

265289
void Telemetry::log_error(std::string message, std::string stacktrace) {
266290
if (!config_.report_logs) return;
291+
increment_counter(internal_metrics::logs_created, {"level:error"});
267292
log(std::move(message), LogLevel::ERROR, stacktrace);
268293
}
269294

270295
void Telemetry::log_warning(std::string message) {
271296
if (!config_.report_logs) return;
297+
increment_counter(internal_metrics::logs_created, {"level:warning"});
272298
log(std::move(message), LogLevel::WARNING);
273299
}
274300

@@ -293,10 +319,55 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
293319
}
294320
};
295321

296-
auto post_result = http_client_->post(
297-
telemetry_endpoint_, set_telemetry_headers, std::move(payload),
298-
telemetry_on_response_, telemetry_on_error_, clock_().tick + 5s);
322+
auto telemetry_on_response = [this, logger = logger_](
323+
int response_status,
324+
const DictReader& /*response_headers*/,
325+
std::string response_body) {
326+
if (response_status >= 500) {
327+
increment_counter(internal_metrics::responses,
328+
{"status_code:5xx", "endpoint:agent"});
329+
} else if (response_status >= 400) {
330+
increment_counter(internal_metrics::responses,
331+
{"status_code:4xx", "endpoint:agent"});
332+
} else if (response_status >= 300) {
333+
increment_counter(internal_metrics::responses,
334+
{"status_code:3xx", "endpoint:agent"});
335+
} else if (response_status >= 200) {
336+
increment_counter(internal_metrics::responses,
337+
{"status_code:2xx", "endpoint:agent"});
338+
} else if (response_status >= 100) {
339+
increment_counter(internal_metrics::responses,
340+
{"status_code:1xx", "endpoint:agent"});
341+
}
342+
343+
if (response_status < 200 || response_status >= 300) {
344+
logger->log_error([&](auto& stream) {
345+
stream << "Unexpected telemetry response status " << response_status
346+
<< " with body (if any, starts on next line):\n"
347+
<< response_body;
348+
});
349+
}
350+
};
351+
352+
// Callback for unsuccessful telemetry HTTP requests.
353+
auto telemetry_on_error = [this, logger = logger_](Error error) {
354+
increment_counter(internal_metrics::errors,
355+
{"type:network", "endpoint:agent"});
356+
logger->log_error(error.with_prefix(
357+
"Error occurred during HTTP request for telemetry: "));
358+
};
359+
360+
increment_counter(internal_metrics::requests, {"endpoint:agent"});
361+
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
362+
payload.size());
363+
364+
auto post_result =
365+
http_client_->post(telemetry_endpoint_, set_telemetry_headers,
366+
std::move(payload), std::move(telemetry_on_response),
367+
std::move(telemetry_on_error), clock_().tick + 5s);
299368
if (auto* error = post_result.if_error()) {
369+
increment_counter(internal_metrics::errors,
370+
{"type:network", "endpoint:agent"});
300371
logger_->log_error(
301372
error->with_prefix("Unexpected error submitting telemetry event: "));
302373
}

0 commit comments

Comments
 (0)