Skip to content

Commit 6350ea9

Browse files
committed
feat(core): Use prepare for flare to signal dump element
1 parent f0e32d2 commit 6350ea9

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
44
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
55
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
6+
import static java.util.Comparator.comparingLong;
67

78
import datadog.communication.ddagent.SharedCommunicationObjects;
89
import datadog.trace.api.Config;
@@ -15,6 +16,7 @@
1516
import java.util.List;
1617
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.function.Predicate;
1820
import java.util.zip.ZipOutputStream;
1921
import org.jctools.queues.MessagePassingQueue;
2022
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
@@ -141,18 +143,18 @@ public void accept(Element pendingTrace) {
141143
private static final class DumpDrain
142144
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
143145
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
144-
private static final List<Element> data = new ArrayList<>();
146+
private static final List<Element> DATA = new ArrayList<>();
145147
private int index = 0;
146148

147149
@Override
148150
public void accept(Element pendingTrace) {
149-
data.add(pendingTrace);
151+
DATA.add(pendingTrace);
150152
}
151153

152154
@Override
153155
public Element get() {
154-
if (index < data.size()) {
155-
return data.get(index++);
156+
if (index < DATA.size()) {
157+
return DATA.get(index++);
156158
}
157159
return null; // Should never reach here or else queue may break according to
158160
// MessagePassingQueue docs
@@ -250,7 +252,7 @@ public void run() {
250252

251253
if (pendingTrace instanceof DumpElement) {
252254
queue.drain(DumpDrain.DUMP_DRAIN);
253-
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.data.size());
255+
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size());
254256
dumpCounter.incrementAndGet();
255257
continue;
256258
}
@@ -276,7 +278,7 @@ public void run() {
276278
// Trace has been unmodified long enough, go ahead and write whatever is finished.
277279
pendingTrace.write();
278280
} else {
279-
// Trace is too new. Requeue it and sleep to avoid a hot loop.
281+
// Trace is too new. Requeue it and sleep to avoid a hot loop.
280282
enqueue(pendingTrace);
281283
Thread.sleep(SLEEP_TIME_MS);
282284
}
@@ -346,22 +348,19 @@ public static PendingTraceBuffer discarding() {
346348

347349
public abstract void enqueue(Element pendingTrace);
348350

349-
public static class TracerDump implements TracerFlare.Reporter {
350-
351+
private static class TracerDump implements TracerFlare.Reporter {
352+
private static final Comparator<Element> TRACE_BY_START_TIME =
353+
comparingLong(trace -> trace.getRootSpan().getStartTime());
354+
private static final Predicate<Element> NOT_PENDING_TRACE =
355+
element -> !(element instanceof PendingTrace);
351356
private final DelayingPendingTraceBuffer buffer;
352-
private final Comparator<Element> TRACE_BY_START_TIME =
353-
Comparator.comparingLong(trace -> trace.getRootSpan().getStartTime());
354357

355-
public TracerDump(DelayingPendingTraceBuffer buffer) {
358+
private TracerDump(DelayingPendingTraceBuffer buffer) {
356359
this.buffer = buffer;
357360
}
358361

359362
@Override
360-
public void addReportToFlare(ZipOutputStream zip) throws IOException {
361-
TracerFlare.addText(zip, "trace_dump.txt", getDumpText());
362-
}
363-
364-
private String getDumpText() {
363+
public void prepareForFlare() {
365364
if (buffer.worker.isAlive()) {
366365
int count = buffer.dumpCounter.get();
367366
int loop = 1;
@@ -376,28 +375,30 @@ private String getDumpText() {
376375
newCount = buffer.dumpCounter.get();
377376
}
378377
}
378+
}
379379

380-
DelayingPendingTraceBuffer.DumpDrain.data.removeIf(
381-
(trace) ->
382-
!(trace
383-
instanceof
384-
PendingTrace)); // Removing elements from the drain that are not instances of
385-
// PendingTrace
380+
@Override
381+
public void addReportToFlare(ZipOutputStream zip) throws IOException {
382+
TracerFlare.addText(zip, "trace_dump.txt", getDumpText());
383+
}
386384

387-
DelayingPendingTraceBuffer.DumpDrain.data.sort(
388-
(TRACE_BY_START_TIME).reversed()); // Storing oldest traces first
385+
private String getDumpText() {
386+
// Removing elements from the drain that are not instances of PendingTrace
387+
DelayingPendingTraceBuffer.DumpDrain.DATA.removeIf(NOT_PENDING_TRACE);
388+
// Storing oldest traces first
389+
DelayingPendingTraceBuffer.DumpDrain.DATA.sort((TRACE_BY_START_TIME).reversed());
389390

390391
StringBuilder dumpText = new StringBuilder();
391-
for (Element e : DelayingPendingTraceBuffer.DumpDrain.data) {
392+
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DATA) {
392393
if (e instanceof PendingTrace) {
393394
PendingTrace trace = (PendingTrace) e;
394395
for (DDSpan span : trace.getSpans()) {
395396
dumpText.append(span.toString()).append('\n');
396397
}
397398
}
398399
}
399-
DelayingPendingTraceBuffer.DumpDrain.data
400-
.clear(); // releasing memory used for ArrayList in drain
400+
// Releasing memory used for ArrayList in drain
401+
DelayingPendingTraceBuffer.DumpDrain.DATA.clear();
401402
return dumpText.toString();
402403
}
403404
}

0 commit comments

Comments
 (0)