Skip to content
This repository was archived by the owner on Dec 23, 2023. It is now read-only.

Commit 2c844db

Browse files
authored
Start dropping spans when the span exporter thread holds more than 10K spans references. (#1893)
* Start dropping spans when the span exporter thread holds more than 10K spans references. * Fix checker framework. * Fix findbugs by extracting inlined class definitions. * Fix tests threads race condition. * Address review feedback.
1 parent c93f59c commit 2c844db

File tree

3 files changed

+226
-4
lines changed

3 files changed

+226
-4
lines changed

exporters/metrics/ocagent/src/test/java/io/opencensus/exporter/metrics/ocagent/OcAgentMetricsExporterIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public void testExportMetrics() throws InterruptedException, IOException {
263263
for (Metric metricProto : metricProtos) {
264264
actualMetrics.add(metricProto.getMetricDescriptor().getName());
265265
}
266-
assertThat(actualMetrics).containsExactlyElementsIn(expectedMetrics);
266+
assertThat(actualMetrics).containsAllIn(expectedMetrics);
267267
}
268268

269269
private static void registerAllViews() {

impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@
1818

1919
import com.google.common.annotations.VisibleForTesting;
2020
import io.opencensus.common.Duration;
21+
import io.opencensus.common.ToLongFunction;
2122
import io.opencensus.implcore.internal.DaemonThreadFactory;
2223
import io.opencensus.implcore.trace.RecordEventsSpanImpl;
24+
import io.opencensus.metrics.DerivedLongCumulative;
25+
import io.opencensus.metrics.DerivedLongGauge;
26+
import io.opencensus.metrics.LabelValue;
27+
import io.opencensus.metrics.MetricOptions;
28+
import io.opencensus.metrics.Metrics;
2329
import io.opencensus.trace.export.ExportComponent;
2430
import io.opencensus.trace.export.SpanData;
2531
import io.opencensus.trace.export.SpanExporter;
@@ -32,9 +38,37 @@
3238
import java.util.logging.Logger;
3339
import javax.annotation.concurrent.GuardedBy;
3440

41+
/*>>>
42+
import org.checkerframework.checker.nullness.qual.Nullable;
43+
*/
44+
3545
/** Implementation of the {@link SpanExporter}. */
3646
public final class SpanExporterImpl extends SpanExporter {
3747
private static final Logger logger = Logger.getLogger(ExportComponent.class.getName());
48+
private static final DerivedLongCumulative droppedSpans =
49+
Metrics.getMetricRegistry()
50+
.addDerivedLongCumulative(
51+
"oc_worker_spans_dropped",
52+
MetricOptions.builder()
53+
.setDescription("Number of spans dropped by the exporter thread.")
54+
.setUnit("1")
55+
.build());
56+
private static final DerivedLongCumulative pushedSpans =
57+
Metrics.getMetricRegistry()
58+
.addDerivedLongCumulative(
59+
"oc_worker_spans_pushed",
60+
MetricOptions.builder()
61+
.setDescription("Number of spans pushed by the exporter thread to the exporter.")
62+
.setUnit("1")
63+
.build());
64+
private static final DerivedLongGauge referencedSpans =
65+
Metrics.getMetricRegistry()
66+
.addDerivedLongGauge(
67+
"oc_worker_spans_referenced",
68+
MetricOptions.builder()
69+
.setDescription("Current number of spans referenced by the exporter thread.")
70+
.setUnit("1")
71+
.build());
3872

3973
private final Worker worker;
4074
private final Thread workerThread;
@@ -88,13 +122,64 @@ private SpanExporterImpl(Worker worker) {
88122
new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker);
89123
this.workerThread.start();
90124
this.worker = worker;
125+
droppedSpans.createTimeSeries(
126+
Collections.<LabelValue>emptyList(), this.worker, new ReportDroppedSpans());
127+
referencedSpans.createTimeSeries(
128+
Collections.<LabelValue>emptyList(), this.worker, new ReportReferencedSpans());
129+
pushedSpans.createTimeSeries(
130+
Collections.<LabelValue>emptyList(), this.worker, new ReportPushedSpans());
131+
}
132+
133+
private static class ReportDroppedSpans implements ToLongFunction</*@Nullable*/ Worker> {
134+
@Override
135+
public long applyAsLong(/*@Nullable*/ Worker worker) {
136+
if (worker == null) {
137+
return 0;
138+
}
139+
return worker.getDroppedSpans();
140+
}
141+
}
142+
143+
private static class ReportReferencedSpans implements ToLongFunction</*@Nullable*/ Worker> {
144+
@Override
145+
public long applyAsLong(/*@Nullable*/ Worker worker) {
146+
if (worker == null) {
147+
return 0;
148+
}
149+
return worker.getReferencedSpans();
150+
}
151+
}
152+
153+
private static class ReportPushedSpans implements ToLongFunction</*@Nullable*/ Worker> {
154+
@Override
155+
public long applyAsLong(/*@Nullable*/ Worker worker) {
156+
if (worker == null) {
157+
return 0;
158+
}
159+
return worker.getPushedSpans();
160+
}
91161
}
92162

93163
@VisibleForTesting
94164
Thread getServiceExporterThread() {
95165
return workerThread;
96166
}
97167

168+
@VisibleForTesting
169+
long getDroppedSpans() {
170+
return worker.getDroppedSpans();
171+
}
172+
173+
@VisibleForTesting
174+
long getReferencedSpans() {
175+
return worker.getReferencedSpans();
176+
}
177+
178+
@VisibleForTesting
179+
long getPushedSpans() {
180+
return worker.getPushedSpans();
181+
}
182+
98183
// Worker in a thread that batches multiple span data and calls the registered services to export
99184
// that data.
100185
//
@@ -110,14 +195,29 @@ private static final class Worker implements Runnable {
110195
@GuardedBy("monitor")
111196
private final List<RecordEventsSpanImpl> spans;
112197

113-
private final Map<String, Handler> serviceHandlers = new ConcurrentHashMap<String, Handler>();
198+
@GuardedBy("monitor")
199+
private long referencedSpans = 0;
200+
201+
@GuardedBy("monitor")
202+
private long droppedSpans = 0;
203+
204+
@GuardedBy("monitor")
205+
private long pushedSpans = 0;
206+
207+
private final Map<String, Handler> serviceHandlers = new ConcurrentHashMap<>();
114208
private final int bufferSize;
209+
private final long maxReferencedSpans;
115210
private final long scheduleDelayMillis;
116211

117212
// See SpanExporterImpl#addSpan.
118213
private void addSpan(RecordEventsSpanImpl span) {
119214
synchronized (monitor) {
215+
if (referencedSpans == maxReferencedSpans) {
216+
droppedSpans++;
217+
return;
218+
}
120219
this.spans.add(span);
220+
referencedSpans++;
121221
if (spans.size() >= bufferSize) {
122222
monitor.notifyAll();
123223
}
@@ -154,6 +254,11 @@ private void onBatchExport(List<SpanData> spanDataList) {
154254
private Worker(int bufferSize, Duration scheduleDelay) {
155255
spans = new ArrayList<>(bufferSize);
156256
this.bufferSize = bufferSize;
257+
// We notify the worker thread when bufferSize elements in the queue, so we will most likely
258+
// have to process more than bufferSize elements but less than 2 * bufferSize in that cycle.
259+
// During the processing time we want to allow the same amount of elements to be queued.
260+
// So we need to have 4 * bufferSize maximum elements referenced as an estimate.
261+
this.maxReferencedSpans = 4L * bufferSize;
157262
this.scheduleDelayMillis = scheduleDelay.toMillis();
158263
}
159264

@@ -195,6 +300,24 @@ private void flush() {
195300
exportBatches(spansCopy);
196301
}
197302

303+
private long getDroppedSpans() {
304+
synchronized (monitor) {
305+
return droppedSpans;
306+
}
307+
}
308+
309+
private long getReferencedSpans() {
310+
synchronized (monitor) {
311+
return referencedSpans;
312+
}
313+
}
314+
315+
private long getPushedSpans() {
316+
synchronized (monitor) {
317+
return pushedSpans;
318+
}
319+
}
320+
198321
@SuppressWarnings("argument.type.incompatible")
199322
private void exportBatches(ArrayList<RecordEventsSpanImpl> spanList) {
200323
ArrayList<SpanData> spanDataList = new ArrayList<>(bufferSize);
@@ -209,12 +332,23 @@ private void exportBatches(ArrayList<RecordEventsSpanImpl> spanList) {
209332
// Cannot clear because the exporter may still have a reference to this list (e.g. async
210333
// scheduled work), so just create a new list.
211334
spanDataList = new ArrayList<>(bufferSize);
335+
// We removed reference for bufferSize Spans.
336+
synchronized (monitor) {
337+
referencedSpans -= bufferSize;
338+
pushedSpans += bufferSize;
339+
}
212340
}
213341
}
214342
// Last incomplete batch, send this as well.
215343
if (!spanDataList.isEmpty()) {
216344
// Wrap the list with unmodifiableList to ensure exporter does not change the list.
217345
onBatchExport(Collections.unmodifiableList(spanDataList));
346+
// We removed reference for spanDataList.size() Spans.
347+
synchronized (monitor) {
348+
referencedSpans -= spanDataList.size();
349+
pushedSpans += spanDataList.size();
350+
}
351+
spanDataList.clear();
218352
}
219353
}
220354
}

impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@
3131
import io.opencensus.trace.SpanId;
3232
import io.opencensus.trace.TraceId;
3333
import io.opencensus.trace.TraceOptions;
34+
import io.opencensus.trace.Tracestate;
3435
import io.opencensus.trace.config.TraceParams;
3536
import io.opencensus.trace.export.SpanData;
3637
import io.opencensus.trace.export.SpanExporter.Handler;
38+
import java.util.ArrayList;
39+
import java.util.Collection;
3740
import java.util.List;
3841
import java.util.Random;
42+
import javax.annotation.concurrent.GuardedBy;
3943
import org.junit.Before;
4044
import org.junit.Test;
4145
import org.junit.runner.RunWith;
@@ -53,10 +57,14 @@ public class SpanExporterImplTest {
5357
SpanContext.create(
5458
TraceId.generateRandomId(random),
5559
SpanId.generateRandomId(random),
56-
TraceOptions.builder().setIsSampled(true).build());
60+
TraceOptions.builder().setIsSampled(true).build(),
61+
Tracestate.builder().build());
5762
private final SpanContext notSampledSpanContext =
5863
SpanContext.create(
59-
TraceId.generateRandomId(random), SpanId.generateRandomId(random), TraceOptions.DEFAULT);
64+
TraceId.generateRandomId(random),
65+
SpanId.generateRandomId(random),
66+
TraceOptions.DEFAULT,
67+
Tracestate.builder().build());
6068
private final InProcessRunningSpanStore runningSpanStore = new InProcessRunningSpanStore();
6169
private final SampledSpanStoreImpl sampledSpanStore =
6270
SampledSpanStoreImpl.getNoopSampledSpanStoreImpl();
@@ -143,6 +151,86 @@ public void exportMoreSpansThanTheBufferSize() {
143151
span6.toSpanData());
144152
}
145153

154+
private static class BlockingExporter extends Handler {
155+
final Object monitor = new Object();
156+
157+
@GuardedBy("monitor")
158+
Boolean condition = Boolean.FALSE;
159+
160+
@Override
161+
public void export(Collection<SpanData> spanDataList) {
162+
synchronized (monitor) {
163+
while (!condition) {
164+
try {
165+
monitor.wait();
166+
} catch (InterruptedException e) {
167+
// Do nothing
168+
}
169+
}
170+
}
171+
}
172+
173+
private void unblock() {
174+
synchronized (monitor) {
175+
condition = Boolean.TRUE;
176+
monitor.notifyAll();
177+
}
178+
}
179+
}
180+
181+
@Test
182+
public void exportMoreSpansThanTheMaximumLimit() {
183+
final int bufferSize = 4;
184+
final int maxReferencedSpans = bufferSize * 4;
185+
SpanExporterImpl spanExporter = SpanExporterImpl.create(bufferSize, Duration.create(1, 0));
186+
StartEndHandler startEndHandler =
187+
new StartEndHandlerImpl(
188+
spanExporter, runningSpanStore, sampledSpanStore, new SimpleEventQueue());
189+
BlockingExporter blockingExporter = new BlockingExporter();
190+
191+
spanExporter.registerHandler("test.service", serviceHandler);
192+
spanExporter.registerHandler("test.blocking", blockingExporter);
193+
194+
List<SpanData> spansToExport = new ArrayList<>(maxReferencedSpans);
195+
for (int i = 0; i < maxReferencedSpans; i++) {
196+
spansToExport.add(createSampledEndedSpan(startEndHandler, "span_1_" + i).toSpanData());
197+
}
198+
199+
assertThat(spanExporter.getReferencedSpans()).isEqualTo(maxReferencedSpans);
200+
201+
// Now we should start dropping.
202+
for (int i = 0; i < 7; i++) {
203+
createSampledEndedSpan(startEndHandler, "span_2_" + i);
204+
assertThat(spanExporter.getDroppedSpans()).isEqualTo(i + 1);
205+
}
206+
207+
assertThat(spanExporter.getReferencedSpans()).isEqualTo(maxReferencedSpans);
208+
209+
// Release the blocking exporter
210+
blockingExporter.unblock();
211+
212+
List<SpanData> exported = serviceHandler.waitForExport(maxReferencedSpans);
213+
assertThat(exported).isNotNull();
214+
assertThat(exported).containsExactlyElementsIn(spansToExport);
215+
exported.clear();
216+
spansToExport.clear();
217+
218+
// We cannot compare with maxReferencedSpans here because the worker thread may get
219+
// unscheduled immediately after exporting, but before updating the pushed spans, if that is
220+
// the case at most bufferSize spans will miss.
221+
assertThat(spanExporter.getPushedSpans()).isAtLeast((long) maxReferencedSpans - bufferSize);
222+
223+
for (int i = 0; i < 7; i++) {
224+
spansToExport.add(createSampledEndedSpan(startEndHandler, "span_3_" + i).toSpanData());
225+
// No more dropped spans.
226+
assertThat(spanExporter.getDroppedSpans()).isEqualTo(7);
227+
}
228+
229+
exported = serviceHandler.waitForExport(7);
230+
assertThat(exported).isNotNull();
231+
assertThat(exported).containsExactlyElementsIn(spansToExport);
232+
}
233+
146234
@Test
147235
public void interruptWorkerThreadStops() throws InterruptedException {
148236
SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));

0 commit comments

Comments
 (0)