Skip to content

Commit cdbd7b4

Browse files
authored
feat: enhance the capping of max spans per trace at global level (#290)
* feat: add global upper limit for max span count limit * fixed code style * renamed config name from upper.max.span.count to default.max.span.count for global limit
1 parent efa5d17 commit cdbd7b4

File tree

4 files changed

+102
-15
lines changed

4 files changed

+102
-15
lines changed

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class RawSpanGrouperConstants {
1414
public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY =
1515
"dataflow.metriccollection.sampling.percent";
1616
public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count";
17+
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
1718
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
1819
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
1920
}

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder;
44
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY;
5+
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT;
56
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
67
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT;
78
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
@@ -64,6 +65,7 @@ public class RawSpansProcessor
6465
private To outputTopic;
6566
private double dataflowSamplingPercent = -1;
6667
private static final Map<String, Long> maxSpanCountMap = new HashMap<>();
68+
private long defaultMaxSpanCountLimit = Long.MAX_VALUE;
6769

6870
// counter for number of spans dropped per tenant
6971
private static final ConcurrentMap<String, Counter> droppedSpansCounter =
@@ -99,6 +101,10 @@ public void init(ProcessorContext context) {
99101
});
100102
}
101103

104+
if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
105+
defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT);
106+
}
107+
102108
this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER);
103109
restorePunctuators();
104110
}
@@ -164,9 +170,16 @@ public KeyValue<String, StructuredTrace> transform(TraceIdentity key, RawSpan va
164170
}
165171

166172
private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
167-
if (traceState != null
168-
&& maxSpanCountMap.containsKey(key.getTenantId())
169-
&& traceState.getSpanIds().size() >= maxSpanCountMap.get(key.getTenantId())) {
173+
int inFlightSpansPerTrace =
174+
traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE;
175+
long maxSpanCountTenantLimit =
176+
maxSpanCountMap.containsKey(key.getTenantId())
177+
? maxSpanCountMap.get(key.getTenantId())
178+
: Long.MAX_VALUE;
179+
180+
if (inFlightSpansPerTrace >= defaultMaxSpanCountLimit
181+
|| inFlightSpansPerTrace >= maxSpanCountTenantLimit) {
182+
170183
if (logger.isDebugEnabled()) {
171184
logger.debug(
172185
"Dropping span [{}] from tenant_id={}, trace_id={} after grouping {} spans",
@@ -175,6 +188,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
175188
HexUtils.getHex(key.getTraceId()),
176189
traceState.getSpanIds().size());
177190
}
191+
178192
// increment the counter for dropped spans
179193
droppedSpansCounter
180194
.computeIfAbsent(
@@ -183,20 +197,21 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
183197
PlatformMetricsRegistry.registerCounter(
184198
DROPPED_SPANS_COUNTER, Map.of("tenantId", k)))
185199
.increment();
200+
201+
// increment the counter when the number of spans reaches the max.span.count limit.
202+
if (inFlightSpansPerTrace == defaultMaxSpanCountLimit
203+
|| inFlightSpansPerTrace == maxSpanCountTenantLimit) {
204+
truncatedTracesCounter
205+
.computeIfAbsent(
206+
key.getTenantId(),
207+
k ->
208+
PlatformMetricsRegistry.registerCounter(
209+
TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k)))
210+
.increment();
211+
}
212+
// drop the span as limit is reached
186213
return true;
187214
}
188-
// increment the counter when the number of spans reaches the max.span.count limit.
189-
if (traceState != null
190-
&& maxSpanCountMap.containsKey(key.getTenantId())
191-
&& traceState.getSpanIds().size() == maxSpanCountMap.get(key.getTenantId())) {
192-
truncatedTracesCounter
193-
.computeIfAbsent(
194-
key.getTenantId(),
195-
k ->
196-
PlatformMetricsRegistry.registerCounter(
197-
TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k)))
198-
.increment();
199-
}
200215
return false;
201216
}
202217

raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
7777

7878
String tenantId = "tenant1";
7979

80+
// create spans for trace-1 of tenant1
8081
RawSpan span1 =
8182
RawSpan.newBuilder()
8283
.setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
@@ -95,6 +96,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
9596
.setCustomerId("tenant1")
9697
.setEvent(createEvent("event-3", "tenant1"))
9798
.build();
99+
100+
// create spans for trace-2 of tenant1
98101
RawSpan span4 =
99102
RawSpan.newBuilder()
100103
.setTraceId(ByteBuffer.wrap("trace-2".getBytes()))
@@ -107,6 +110,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
107110
.setCustomerId("tenant1")
108111
.setEvent(createEvent("event-5", "tenant1"))
109112
.build();
113+
114+
// create spans for trace-3 of tenant1
110115
RawSpan span6 =
111116
RawSpan.newBuilder()
112117
.setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
@@ -144,6 +149,57 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
144149
.setEvent(createEvent("event-11", "tenant1"))
145150
.build();
146151

152+
// create 8 spans for tenant-2 for trace-4
153+
String tenant2 = "tenant2";
154+
RawSpan span12 =
155+
RawSpan.newBuilder()
156+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
157+
.setCustomerId(tenant2)
158+
.setEvent(createEvent("event-12", tenant2))
159+
.build();
160+
RawSpan span13 =
161+
RawSpan.newBuilder()
162+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
163+
.setCustomerId(tenant2)
164+
.setEvent(createEvent("event-13", tenant2))
165+
.build();
166+
RawSpan span14 =
167+
RawSpan.newBuilder()
168+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
169+
.setCustomerId(tenant2)
170+
.setEvent(createEvent("event-14", tenant2))
171+
.build();
172+
RawSpan span15 =
173+
RawSpan.newBuilder()
174+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
175+
.setCustomerId(tenant2)
176+
.setEvent(createEvent("event-15", tenant2))
177+
.build();
178+
RawSpan span16 =
179+
RawSpan.newBuilder()
180+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
181+
.setCustomerId(tenant2)
182+
.setEvent(createEvent("event-16", tenant2))
183+
.build();
184+
RawSpan span17 =
185+
RawSpan.newBuilder()
186+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
187+
.setCustomerId(tenant2)
188+
.setEvent(createEvent("event-17", tenant2))
189+
.build();
190+
RawSpan span18 =
191+
RawSpan.newBuilder()
192+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
193+
.setCustomerId(tenant2)
194+
.setEvent(createEvent("event-18", tenant2))
195+
.build();
196+
RawSpan span19 =
197+
RawSpan.newBuilder()
198+
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
199+
.setCustomerId(tenant2)
200+
.setEvent(createEvent("event-19", tenant2))
201+
.build();
202+
147203
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1);
148204
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4);
149205
td.advanceWallClockTime(Duration.ofSeconds(1));
@@ -199,6 +255,20 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
199255
// trace should be truncated with 5 spans
200256
trace = (StructuredTrace) outputTopic.readValue();
201257
assertEquals(5, trace.getEventList().size());
258+
259+
// input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only
260+
// 6
261+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12);
262+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13);
263+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14);
264+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15);
265+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16);
266+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17);
267+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18);
268+
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19);
269+
td.advanceWallClockTime(Duration.ofSeconds(35));
270+
trace = (StructuredTrace) outputTopic.readValue();
271+
assertEquals(6, trace.getEventList().size());
202272
}
203273

204274
private Event createEvent(String eventId, String tenantId) {

raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS
2828
span.window.store.segment.size.mins = 20
2929
span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS}
3030

31+
default.max.span.count = 6
3132
max.span.count = {
3233
tenant1 = 5
3334
}

0 commit comments

Comments
 (0)