Skip to content

Commit ae90dc5

Browse files
authored
don't let DatadogAgent::flush run concurrently (#21)
* don't let DatadogAgent::flush run concurrently * less optimal, more zen: now concurrent access is safe
1 parent c608269 commit ae90dc5

File tree

2 files changed

+9
-14
lines changed

2 files changed

+9
-14
lines changed

src/datadog/datadog_agent.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ Expected<void> DatadogAgent::send(
159159
std::vector<std::unique_ptr<SpanData>>&& spans,
160160
const std::shared_ptr<TraceSampler>& response_handler) {
161161
std::lock_guard<std::mutex> lock(mutex_);
162-
incoming_trace_chunks_.push_back(
163-
TraceChunk{std::move(spans), response_handler});
162+
trace_chunks_.push_back(TraceChunk{std::move(spans), response_handler});
164163
return nullopt;
165164
}
166165

@@ -184,19 +183,19 @@ nlohmann::json DatadogAgent::config_json() const {
184183
}
185184

186185
void DatadogAgent::flush() {
187-
outgoing_trace_chunks_.clear();
186+
std::vector<TraceChunk> trace_chunks;
188187
{
189188
std::lock_guard<std::mutex> lock(mutex_);
190189
using std::swap;
191-
swap(incoming_trace_chunks_, outgoing_trace_chunks_);
190+
swap(trace_chunks, trace_chunks_);
192191
}
193192

194-
if (outgoing_trace_chunks_.empty()) {
193+
if (trace_chunks.empty()) {
195194
return;
196195
}
197196

198197
std::string body;
199-
auto encode_result = msgpack_encode(body, outgoing_trace_chunks_);
198+
auto encode_result = msgpack_encode(body, trace_chunks);
200199
if (auto* error = encode_result.if_error()) {
201200
logger_->log_error(*error);
202201
return;
@@ -206,19 +205,18 @@ void DatadogAgent::flush() {
206205
// multiple tracers, and thus multiple trace samplers might need to have
207206
// their rates updated. Unlikely, but possible.
208207
std::unordered_set<std::shared_ptr<TraceSampler>> response_handlers;
209-
for (auto& chunk : outgoing_trace_chunks_) {
208+
for (auto& chunk : trace_chunks) {
210209
response_handlers.insert(std::move(chunk.response_handler));
211210
}
212211

213212
// This is the callback for setting request headers.
214213
// It's invoked synchronously (before `post` returns).
215-
auto set_request_headers = [this](DictWriter& headers) {
214+
auto set_request_headers = [&](DictWriter& headers) {
216215
headers.set("Content-Type", "application/msgpack");
217216
headers.set("Datadog-Meta-Lang", "cpp");
218217
headers.set("Datadog-Meta-Lang-Version", std::to_string(__cplusplus));
219218
headers.set("Datadog-Meta-Tracer-Version", tracer_version);
220-
headers.set("X-Datadog-Trace-Count",
221-
std::to_string(outgoing_trace_chunks_.size()));
219+
headers.set("X-Datadog-Trace-Count", std::to_string(trace_chunks.size()));
222220
};
223221

224222
// This is the callback for the HTTP response. It's invoked

src/datadog/datadog_agent.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,7 @@ class DatadogAgent : public Collector {
3434
std::mutex mutex_;
3535
Clock clock_;
3636
std::shared_ptr<Logger> logger_;
37-
// `incoming_trace_chunks_` are what `send` appends to.
38-
std::vector<TraceChunk> incoming_trace_chunks_;
39-
// `outgoing_trace_chunks_` are what `flush` consumes from.
40-
std::vector<TraceChunk> outgoing_trace_chunks_;
37+
std::vector<TraceChunk> trace_chunks_;
4138
HTTPClient::URL traces_endpoint_;
4239
std::shared_ptr<HTTPClient> http_client_;
4340
std::shared_ptr<EventScheduler> event_scheduler_;

0 commit comments

Comments
 (0)