Skip to content

Commit 18f118c

Browse files
author
Rishabh Singh
authored
feat: Drop logs based on config (#237)
* Drop logs based on config
1 parent 9cf73b6 commit 18f118c

File tree

4 files changed

+80
-35
lines changed

4 files changed

+80
-35
lines changed

span-normalizer/helm/templates/span-normalizer-config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ data:
6060
{{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }}
6161
rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }}
6262
{{- end }}
63+
64+
{{- if hasKey .Values.spanNormalizerConfig.processor "excludeLogsTenantIds" }}
65+
excludeLogsTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeLogsTenantIds | toJson }}
66+
{{- end }}
6367
}
6468
{{- end }}
6569

span-normalizer/span-normalizer/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,6 @@ dependencies {
6666
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
6767
testImplementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
6868
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8")
69+
testImplementation("org.mockito:mockito-core:3.8.0")
6970
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
7071
}

span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package org.hypertrace.core.spannormalizer.jaeger;
22

3+
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG;
4+
35
import com.google.common.annotations.VisibleForTesting;
46
import com.google.protobuf.util.Timestamps;
7+
import com.typesafe.config.Config;
58
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
69
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
710
import io.micrometer.core.instrument.Counter;
811
import java.nio.ByteBuffer;
12+
import java.util.Collections;
913
import java.util.List;
1014
import java.util.Map;
1115
import java.util.concurrent.ConcurrentHashMap;
@@ -27,24 +31,31 @@ public class JaegerSpanToLogRecordsTransformer
2731

2832
private static final Logger LOG =
2933
LoggerFactory.getLogger(JaegerSpanToLogRecordsTransformer.class);
34+
private static final String TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG = "processor.excludeLogsTenantIds";
3035

3136
private static final String VALID_SPAN_WITH_LOGS_RECEIVED_COUNT =
3237
"hypertrace.reported.span.with.logs.processed";
3338

3439
private static final ConcurrentMap<String, Counter> tenantToSpanWithLogsReceivedCount =
3540
new ConcurrentHashMap<>();
3641

42+
private List<String> tenantIdsToExclude;
43+
3744
@Override
3845
public void init(ProcessorContext context) {
39-
// no-op
46+
Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG);
47+
this.tenantIdsToExclude =
48+
jobConfig.hasPath(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG)
49+
? jobConfig.getStringList(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG)
50+
: Collections.emptyList();
4051
}
4152

4253
@Override
4354
public KeyValue<String, LogEvents> transform(byte[] key, PreProcessedSpan preProcessedSpan) {
4455
try {
4556
Span value = preProcessedSpan.getSpan();
4657
String tenantId = preProcessedSpan.getTenantId();
47-
if (value.getLogsCount() == 0) {
58+
if (value.getLogsCount() == 0 || tenantIdsToExclude.contains(tenantId)) {
4859
return null;
4960
}
5061

span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,84 @@
22

33
import com.google.protobuf.ByteString;
44
import com.google.protobuf.Timestamp;
5+
import com.typesafe.config.ConfigFactory;
56
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
67
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log;
78
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.apache.kafka.streams.KeyValue;
13+
import org.apache.kafka.streams.processor.ProcessorContext;
814
import org.hypertrace.core.datamodel.LogEvents;
915
import org.junit.jupiter.api.Assertions;
1016
import org.junit.jupiter.api.Test;
17+
import org.mockito.Mockito;
1118

1219
public class JaegerSpanToLogRecordsTransformerTest {
1320

1421
@Test
1522
void testBuildLogEventRecords() {
16-
Span span =
17-
Span.newBuilder()
18-
.setSpanId(ByteString.copyFrom("1".getBytes()))
19-
.setTraceId(ByteString.copyFrom("trace-1".getBytes()))
20-
.addTags(
21-
JaegerSpanInternalModel.KeyValue.newBuilder()
22-
.setKey("jaeger.servicename")
23-
.setVStr("SERVICE_NAME")
24-
.build())
25-
.addLogs(
26-
Log.newBuilder()
27-
.setTimestamp(Timestamp.newBuilder().setSeconds(5).build())
28-
.addFields(
29-
JaegerSpanInternalModel.KeyValue.newBuilder()
30-
.setKey("e1")
31-
.setVStr("some event detail")
32-
.build())
33-
.addFields(
34-
JaegerSpanInternalModel.KeyValue.newBuilder()
35-
.setKey("e2")
36-
.setVStr("some event detail")
37-
.build()))
38-
.addLogs(
39-
Log.newBuilder()
40-
.setTimestamp(Timestamp.newBuilder().setSeconds(10).build())
41-
.addFields(
42-
JaegerSpanInternalModel.KeyValue.newBuilder()
43-
.setKey("z2")
44-
.setVStr("some event detail")
45-
.build()))
46-
.build();
47-
4823
LogEvents logEvents =
49-
new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(span, "tenant");
24+
new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(getTestSpan(), "tenant");
5025
Assertions.assertEquals(2, logEvents.getLogEvents().size());
5126
Assertions.assertEquals(
5227
2, logEvents.getLogEvents().get(0).getAttributes().getAttributeMap().size());
5328
Assertions.assertEquals(
5429
1, logEvents.getLogEvents().get(1).getAttributes().getAttributeMap().size());
5530
}
31+
32+
@Test
33+
void testDropLogEventRecords() {
34+
Map<String, Object> configs = new HashMap<>();
35+
configs.putAll(
36+
Map.of(
37+
"processor",
38+
Map.of("tenantIdTagKey", "tenant-key", "excludeLogsTenantIds", List.of("tenant-1"))));
39+
40+
ProcessorContext processorContext = Mockito.mock(ProcessorContext.class);
41+
Mockito.when(processorContext.appConfigs())
42+
.thenReturn(Map.of("span-normalizer-job-config", ConfigFactory.parseMap(configs)));
43+
JaegerSpanToLogRecordsTransformer jaegerSpanToLogRecordsTransformer =
44+
new JaegerSpanToLogRecordsTransformer();
45+
jaegerSpanToLogRecordsTransformer.init(processorContext);
46+
KeyValue<String, LogEvents> keyValue =
47+
jaegerSpanToLogRecordsTransformer.transform(
48+
null, new PreProcessedSpan("tenant-1", getTestSpan()));
49+
Assertions.assertNull(keyValue);
50+
}
51+
52+
private Span getTestSpan() {
53+
return Span.newBuilder()
54+
.setSpanId(ByteString.copyFrom("1".getBytes()))
55+
.setTraceId(ByteString.copyFrom("trace-1".getBytes()))
56+
.addTags(
57+
JaegerSpanInternalModel.KeyValue.newBuilder()
58+
.setKey("jaeger.servicename")
59+
.setVStr("SERVICE_NAME")
60+
.setKey("")
61+
.build())
62+
.addLogs(
63+
Log.newBuilder()
64+
.setTimestamp(Timestamp.newBuilder().setSeconds(5).build())
65+
.addFields(
66+
JaegerSpanInternalModel.KeyValue.newBuilder()
67+
.setKey("e1")
68+
.setVStr("some event detail")
69+
.build())
70+
.addFields(
71+
JaegerSpanInternalModel.KeyValue.newBuilder()
72+
.setKey("e2")
73+
.setVStr("some event detail")
74+
.build()))
75+
.addLogs(
76+
Log.newBuilder()
77+
.setTimestamp(Timestamp.newBuilder().setSeconds(10).build())
78+
.addFields(
79+
JaegerSpanInternalModel.KeyValue.newBuilder()
80+
.setKey("z2")
81+
.setVStr("some event detail")
82+
.build()))
83+
.build();
84+
}
5685
}

0 commit comments

Comments
 (0)