Skip to content

Commit d2fc878

Browse files
committed
saving changes
1 parent 7504373 commit d2fc878

File tree

2 files changed

+146
-54
lines changed

2 files changed

+146
-54
lines changed

dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -303,60 +303,6 @@ public DelayingPendingTraceBuffer(
303303
config, bufferSize, sharedCommunicationObjects, healthMetrics)
304304
: null;
305305
}
306-
307-
static class TracerDump implements TracerFlare.Reporter {
308-
309-
private final DelayingPendingTraceBuffer buffer;
310-
private final Comparator<Element> TRACE_BY_START_TIME =
311-
Comparator.comparingLong(trace -> trace.getRootSpan().getStartTime());
312-
313-
public TracerDump(DelayingPendingTraceBuffer buffer) {
314-
this.buffer = buffer;
315-
}
316-
317-
@Override
318-
public void addReportToFlare(ZipOutputStream zip) throws IOException {
319-
TracerFlare.addText(zip, "trace_dump.txt", getDumpText());
320-
}
321-
322-
private String getDumpText() {
323-
if (buffer.worker.isAlive()) {
324-
int count = buffer.dumpCounter.get();
325-
int loop = 1;
326-
boolean signaled = buffer.queue.offer(DumpElement.DUMP_ELEMENT);
327-
while (!buffer.closed && !signaled) {
328-
buffer.yieldOrSleep(loop++);
329-
signaled = buffer.queue.offer(DumpElement.DUMP_ELEMENT);
330-
}
331-
int newCount = buffer.dumpCounter.get();
332-
while (!buffer.closed && count >= newCount) {
333-
buffer.yieldOrSleep(loop++);
334-
newCount = buffer.dumpCounter.get();
335-
}
336-
}
337-
338-
DumpDrain.data.removeIf(
339-
(trace) ->
340-
!(trace
341-
instanceof
342-
PendingTrace)); // Removing elements from the drain that are not instances of
343-
// PendingTrace
344-
345-
DumpDrain.data.sort((TRACE_BY_START_TIME).reversed()); // Storing oldest traces first
346-
347-
StringBuilder dumpText = new StringBuilder();
348-
for (Element e : DumpDrain.data) {
349-
if (e instanceof PendingTrace) {
350-
PendingTrace trace = (PendingTrace) e;
351-
for (DDSpan span : trace.getSpans()) {
352-
dumpText.append(span.toString()).append('\n');
353-
}
354-
}
355-
}
356-
DumpDrain.data.clear(); // releasing memory used for ArrayList in drain
357-
return dumpText.toString();
358-
}
359-
}
360306
}
361307

362308
static class DiscardingPendingTraceBuffer extends PendingTraceBuffer {
@@ -399,4 +345,58 @@ public static PendingTraceBuffer discarding() {
399345
public abstract void flush();
400346

401347
public abstract void enqueue(Element pendingTrace);
348+
349+
public static class TracerDump implements TracerFlare.Reporter {
350+
351+
private final DelayingPendingTraceBuffer buffer;
352+
private final Comparator<Element> TRACE_BY_START_TIME =
353+
Comparator.comparingLong(trace -> trace.getRootSpan().getStartTime());
354+
355+
public TracerDump(DelayingPendingTraceBuffer buffer) {
356+
this.buffer = buffer;
357+
}
358+
359+
@Override
360+
public void addReportToFlare(ZipOutputStream zip) throws IOException {
361+
TracerFlare.addText(zip, "trace_dump.txt", getDumpText());
362+
}
363+
364+
private String getDumpText() {
365+
if (buffer.worker.isAlive()) {
366+
int count = buffer.dumpCounter.get();
367+
int loop = 1;
368+
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT);
369+
while (!buffer.closed && !signaled) {
370+
buffer.yieldOrSleep(loop++);
371+
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT);
372+
}
373+
int newCount = buffer.dumpCounter.get();
374+
while (!buffer.closed && count >= newCount) {
375+
buffer.yieldOrSleep(loop++);
376+
newCount = buffer.dumpCounter.get();
377+
}
378+
}
379+
380+
DelayingPendingTraceBuffer.DumpDrain.data.removeIf(
381+
(trace) ->
382+
!(trace
383+
instanceof
384+
PendingTrace)); // Removing elements from the drain that are not instances of
385+
// PendingTrace
386+
387+
DelayingPendingTraceBuffer.DumpDrain.data.sort((TRACE_BY_START_TIME).reversed()); // Storing oldest traces first
388+
389+
StringBuilder dumpText = new StringBuilder();
390+
for (Element e : DelayingPendingTraceBuffer.DumpDrain.data) {
391+
if (e instanceof PendingTrace) {
392+
PendingTrace trace = (PendingTrace) e;
393+
for (DDSpan span : trace.getSpans()) {
394+
dumpText.append(span.toString()).append('\n');
395+
}
396+
}
397+
}
398+
DelayingPendingTraceBuffer.DumpDrain.data.clear(); // releasing memory used for ArrayList in drain
399+
return dumpText.toString();
400+
}
401+
}
402402
}

dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import datadog.communication.monitor.Monitoring
55
import datadog.trace.SamplingPriorityMetadataChecker
66
import datadog.trace.api.DDSpanId
77
import datadog.trace.api.DDTraceId
8+
import datadog.trace.api.flare.TracerFlare
89
import datadog.trace.api.sampling.PrioritySampling
910
import datadog.trace.api.time.SystemTimeSource
1011
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext
@@ -20,8 +21,11 @@ import spock.util.concurrent.PollingConditions
2021
import java.util.concurrent.CountDownLatch
2122
import java.util.concurrent.TimeUnit
2223
import java.util.concurrent.atomic.AtomicInteger
24+
import java.util.zip.ZipInputStream
25+
import java.util.zip.ZipOutputStream
2326

2427
import static datadog.trace.core.PendingTraceBuffer.BUFFER_SIZE
28+
import static java.nio.charset.StandardCharsets.UTF_8
2529

2630
@Timeout(5)
2731
class PendingTraceBufferTest extends DDSpecification {
@@ -443,6 +447,71 @@ class PendingTraceBufferTest extends DDSpecification {
443447
}
444448
}
445449

450+
def "testing tracer flare dump"() {
451+
setup:
452+
// Don't start the buffer thread
453+
TracerFlare.addReporter {} // exercises default methods
454+
def dumpReporter = Mock(PendingTraceBuffer.TracerDump)
455+
TracerFlare.addReporter(dumpReporter)
456+
457+
when:
458+
def pendingTrace = factory.create(DDTraceId.ONE)
459+
def span = newSpanOf(pendingTrace)
460+
def entries = buildAndExtractZip()
461+
462+
then:
463+
1 * dumpReporter.prepareForFlare()
464+
465+
then:
466+
1 * dumpReporter.addReportToFlare(_)
467+
468+
then:
469+
1 * dumpReporter.cleanupAfterFlare()
470+
471+
and:
472+
entries.size() == 2
473+
entries["trace_dump.txt"] == "example text"
474+
entries["flare_errors.txt"] =~
475+
/^(java.lang.IllegalStateException: (bin|txt) \(expected\)\n){2}$/
476+
// then:
477+
// 1 * tracer.captureTraceConfig() >> traceConfig
478+
// pendingTrace.rootSpanWritten
479+
// pendingTrace.isEnqueued == 0
480+
// buffer.queue.size() == 0
481+
// _ * bufferSpy.longRunningSpansEnabled()
482+
// 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("")
483+
// 1 * tracer.write({ it.size() == 1 })
484+
// 1 * tracer.getPartialFlushMinSpans() >> 10000
485+
// 1 * traceConfig.getServiceMapping() >> [:]
486+
// 2 * tracer.getTimeWithNanoTicks(_)
487+
// 1 * tracer.onRootSpanPublished(_)
488+
// 0 * _
489+
//
490+
// when: "fail to fill the buffer"
491+
// for (i in 1..buffer.queue.capacity()) {
492+
// addContinuation(newSpanOf(span)).finish()
493+
// }
494+
//
495+
// then:
496+
// pendingTrace.isEnqueued == 1
497+
// buffer.queue.size() == 1
498+
// buffer.queue.capacity() * bufferSpy.enqueue(_)
499+
// _ * bufferSpy.longRunningSpansEnabled()
500+
// _ * tracer.getPartialFlushMinSpans() >> 10000
501+
// _ * traceConfig.getServiceMapping() >> [:]
502+
// _ * tracer.getTimeWithNanoTicks(_)
503+
// 0 * _
504+
//
505+
// when: "process the buffer"
506+
// buffer.start()
507+
//
508+
// then:
509+
// new PollingConditions(timeout: 3, initialDelay: 0, delay: 0.5, factor: 1).eventually {
510+
// assert pendingTrace.isEnqueued == 0
511+
// }
512+
}
513+
514+
446515
def addContinuation(DDSpan span) {
447516
def scope = scopeManager.activate(span, ScopeSource.INSTRUMENTATION, true)
448517
continuations << scope.capture()
@@ -502,4 +571,27 @@ class PendingTraceBufferTest extends DDSpecification {
502571
PropagationTags.factory().empty())
503572
return DDSpan.create("test", 0, context, null)
504573
}
574+
575+
def buildAndExtractZip() {
576+
TracerFlare.prepareForFlare()
577+
def out = new ByteArrayOutputStream()
578+
try (ZipOutputStream zip = new ZipOutputStream(out)) {
579+
TracerFlare.addReportsToFlare(zip)
580+
} finally {
581+
TracerFlare.cleanupAfterFlare()
582+
}
583+
584+
def entries = [:]
585+
586+
def zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray()))
587+
def entry
588+
while (entry = zip.nextEntry) {
589+
def bytes = new ByteArrayOutputStream()
590+
bytes << zip
591+
entries.put(entry.name, entry.name.endsWith(".bin")
592+
? bytes.toByteArray() : new String(bytes.toByteArray(), UTF_8))
593+
}
594+
595+
return entries
596+
}
505597
}

0 commit comments

Comments
 (0)