Skip to content

Commit 1bf8bed

Browse files
committed
Added support to copy attributes from resource
1 parent 049282b commit 1bf8bed

File tree

15 files changed

+616
-48
lines changed

15 files changed

+616
-48
lines changed

src/main/java/io/opentelemetry/contrib/generator/telemetry/logs/LogGeneratorThread.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.opentelemetry.contrib.generator.telemetry.dto.GeneratorState;
2323
import io.opentelemetry.contrib.generator.telemetry.jel.JELProvider;
2424
import io.opentelemetry.contrib.generator.telemetry.logs.dto.LogDefinition;
25-
import io.opentelemetry.contrib.generator.telemetry.misc.GeneratorUtils;
2625
import io.opentelemetry.contrib.generator.telemetry.transport.PayloadHandler;
2726
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
2827
import io.opentelemetry.proto.common.v1.AnyValue;
@@ -35,6 +34,8 @@
3534
import lombok.Getter;
3635
import lombok.extern.slf4j.Slf4j;
3736

37+
import static io.opentelemetry.contrib.generator.telemetry.misc.GeneratorUtils.*;
38+
3839
import java.util.*;
3940
import java.util.concurrent.TimeUnit;
4041
import java.util.stream.Collectors;
@@ -69,12 +70,14 @@ public void run() {
6970
logGeneratorState.getThreadPayloadCounts().get(logDefinition.getId()) < logDefinition.getPayloadCount()) {
7071
List<ResourceLogs> resourceLogsList = new ArrayList<>();
7172
ResourceLogs resourceLog;
72-
LogRecord logRecord = getLog(logDefinition);
73-
List<LogRecord> otelLogs = Collections.nCopies(logDefinition.getCopyCount(), logRecord);
73+
LogRecord.Builder partialLogRecord = getLog(logDefinition);
7474
for (Map.Entry<String, Integer> reportingResource : logDefinition.getReportingResourcesCounts().entrySet()) {
7575
List<Resource> postToResources = getResourceSubsetByPostCount(reportingResource.getKey(), reportingResource.getValue());
7676
log.debug(requestID + ": Preparing " + postToResources.size() + " resource logs packets for " + reportingResource);
7777
for (Resource eachResource: postToResources) {
78+
LogRecord logRecord = partialLogRecord.clone().addAllAttributes(getResourceAttributes(logDefinition
79+
.getCopyResourceAttributes(), eachResource)).build();
80+
List<LogRecord> otelLogs = Collections.nCopies(logDefinition.getCopyCount(), logRecord);
7881
resourceLog = ResourceLogs.newBuilder()
7982
.setResource(eachResource)
8083
.addScopeLogs(ScopeLogs.newBuilder()
@@ -101,16 +104,15 @@ public void run() {
101104
}
102105
}
103106

104-
private LogRecord getLog(LogDefinition logDefinition) {
107+
private LogRecord.Builder getLog(LogDefinition logDefinition) {
105108
long nanoTime = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
106109
String severity = jelProcessor.eval(logDefinition.getSeverityOrderFunction()).toString();
107110
return LogRecord.newBuilder()
108111
.setTimeUnixNano(nanoTime)
109112
.setObservedTimeUnixNano(nanoTime)
110113
.setSeverityText(severity)
111-
.addAllAttributes(GeneratorUtils.getEvaluatedAttributes(jelProcessor, logDefinition.getAttributes()))
112-
.setBody(AnyValue.newBuilder().setStringValue(LogMessageProvider.getLogMessage(severity)).build())
113-
.build();
114+
.addAllAttributes(getEvaluatedAttributes(jelProcessor, logDefinition.getAttributes()))
115+
.setBody(AnyValue.newBuilder().setStringValue(LogMessageProvider.getLogMessage(severity)).build());
114116
}
115117

116118
private List<Resource> getResourceSubsetByPostCount(String resourceName, int resourceCount) {

src/main/java/io/opentelemetry/contrib/generator/telemetry/logs/dto/LogDefinition.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class LogDefinition {
3636
private Integer payloadFrequencySeconds;
3737
private Integer payloadCount;
3838
private Integer copyCount;
39+
private Set<String> copyResourceAttributes;
3940
private Map<String, Object> attributes;
4041
@JsonIgnore
4142
private String id;
@@ -45,6 +46,9 @@ public long validate(String requestID, Set<String> allResourceTypes, Integer glo
4546
if (copyCount == null || copyCount < 1) {
4647
copyCount = 1;
4748
}
49+
if (copyResourceAttributes == null) {
50+
copyResourceAttributes = new HashSet<>();
51+
}
4852
validateMandatoryFields();
4953
validateResourceTypesCount(allResourceTypes);
5054
addRequestIDAndLogNameToValueFunction(requestID);

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/GaugeGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ public GaugeGenerator(ELProcessor jelProcessor) {
3636
this.jelProcessor = jelProcessor;
3737
}
3838

39-
public Metric getOTelMetric(MetricDefinition metricDefinition) {
39+
public Metric.Builder getOTelMetric(MetricDefinition metricDefinition) {
4040
Metric.Builder partialMetric = Metric.newBuilder().setName(metricDefinition.getName())
4141
.setUnit(metricDefinition.getUnit());
4242
return metricDefinition.getIsDouble() ?
43-
partialMetric.setGauge(getDoubleGaugeDataPoint(metricDefinition)).build() :
44-
partialMetric.setGauge(getIntGaugeDataPoint(metricDefinition)).build();
43+
partialMetric.setGauge(getDoubleGaugeDataPoint(metricDefinition)) :
44+
partialMetric.setGauge(getIntGaugeDataPoint(metricDefinition));
4545
}
4646

4747
private Gauge getDoubleGaugeDataPoint(MetricDefinition metricDefinition) {

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/MetricGeneratorThread.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,22 @@
1919
import io.opentelemetry.contrib.generator.core.dto.GeneratorResource;
2020
import io.opentelemetry.contrib.generator.telemetry.GeneratorsStateProvider;
2121
import io.opentelemetry.contrib.generator.telemetry.dto.GeneratorState;
22+
import io.opentelemetry.contrib.generator.telemetry.misc.GeneratorUtils;
2223
import io.opentelemetry.contrib.generator.telemetry.transport.PayloadHandler;
2324
import io.opentelemetry.contrib.generator.telemetry.ResourceModelProvider;
2425
import io.opentelemetry.contrib.generator.telemetry.jel.JELProvider;
2526
import io.opentelemetry.contrib.generator.telemetry.misc.Constants;
2627
import io.opentelemetry.contrib.generator.telemetry.metrics.dto.MetricDefinition;
2728
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
2829
import io.opentelemetry.proto.common.v1.InstrumentationScope;
29-
import io.opentelemetry.proto.metrics.v1.Metric;
30-
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
31-
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
30+
import io.opentelemetry.proto.common.v1.KeyValue;
31+
import io.opentelemetry.proto.metrics.v1.*;
3232
import io.opentelemetry.proto.resource.v1.Resource;
3333
import jakarta.el.ELProcessor;
3434
import lombok.extern.slf4j.Slf4j;
3535

36-
import java.util.ArrayList;
37-
import java.util.List;
36+
import java.util.*;
37+
import java.util.function.Function;
3838
import java.util.stream.Collectors;
3939

4040
/**
@@ -45,7 +45,7 @@ public class MetricGeneratorThread implements Runnable {
4545

4646
private final String groupKey;
4747
private final String requestID;
48-
private final List<MetricDefinition> metrics;
48+
private final Map<String, MetricDefinition> metrics;
4949
private final PayloadHandler payloadHandler;
5050
private final GeneratorState<MetricGeneratorThread> metricGeneratorState;
5151
private final GaugeGenerator gaugeGenerator;
@@ -57,7 +57,7 @@ public MetricGeneratorThread(String groupKey, List<MetricDefinition> metrics, Pa
5757
String requestID) {
5858
this.groupKey = groupKey;
5959
this.requestID = requestID;
60-
this.metrics = metrics;
60+
this.metrics = metrics.stream().collect(Collectors.toMap(MetricDefinition::getName, Function.identity()));
6161
this.payloadHandler = payloadHandler;
6262
this.metricGeneratorState = GeneratorsStateProvider.getMetricGeneratorState(requestID);
6363
ELProcessor jelProcessor = JELProvider.getJelProcessor();
@@ -70,18 +70,26 @@ public MetricGeneratorThread(String groupKey, List<MetricDefinition> metrics, Pa
7070
@Override
7171
public void run() {
7272
log.debug(requestID + ": Metric generator thread invoked for resource type: " + groupKey + " with metrics: " +
73-
metrics.stream().map(MetricDefinition::getName).collect(Collectors.toList()));
74-
if (metricGeneratorState.isGenerateData() && currentCount < metrics.get(0).getPayloadCount()) {
73+
metrics.values().stream().map(MetricDefinition::getName).collect(Collectors.toList()));
74+
int payloadCount = metrics.values().stream().findFirst().get().getPayloadCount();
75+
if (metricGeneratorState.isGenerateData() && currentCount < payloadCount) {
7576
List<ResourceMetrics> resourceMetricsList = new ArrayList<>();
7677
ResourceMetrics resourceMetric;
77-
List<Metric> otelMetrics = metrics.stream()
78+
List<Metric.Builder> partialOTelMetrics = metrics.values().stream()
7879
.map(this::getMetric).collect(Collectors.toList());
79-
List<Resource> resources = ResourceModelProvider.getResourceModel(requestID).get(groupKey.split("::")[0]).stream()
80+
List<Resource> resources = ResourceModelProvider.getResourceModel(requestID)
81+
.get(groupKey.split("::")[0]).stream()
8082
.filter(GeneratorResource::isActive)
8183
.map(GeneratorResource::getOTelResource)
8284
.collect(Collectors.toList());
8385
log.debug(requestID + ": Preparing " + resources.size() + " resource metric packets for " + groupKey);
8486
for (Resource eachResource: resources) {
87+
List<Metric> otelMetrics = new ArrayList<>();
88+
for (Metric.Builder eachPartialMetric: partialOTelMetrics) {
89+
List<KeyValue> resourceAttrs = GeneratorUtils.getResourceAttributes(metrics.get(eachPartialMetric.getName())
90+
.getCopyResourceAttributes(), eachResource);
91+
otelMetrics.add(getMetricWithResourceAttributes(eachPartialMetric, resourceAttrs));
92+
}
8593
resourceMetric = ResourceMetrics.newBuilder()
8694
.setResource(eachResource)
8795
.addScopeMetrics(ScopeMetrics.newBuilder()
@@ -106,7 +114,7 @@ public void run() {
106114
}
107115
}
108116

109-
private Metric getMetric(MetricDefinition metricDefinition) {
117+
private Metric.Builder getMetric(MetricDefinition metricDefinition) {
110118
switch (metricDefinition.getOtelType()) {
111119
case Constants.GAUGE:
112120
return gaugeGenerator.getOTelMetric(metricDefinition);
@@ -116,4 +124,29 @@ private Metric getMetric(MetricDefinition metricDefinition) {
116124
return summaryGenerator.getOTelMetric(metricDefinition);
117125
}
118126
}
127+
128+
private Metric getMetricWithResourceAttributes(Metric.Builder partialMetric, List<KeyValue> resourceAttributes) {
129+
Metric.DataCase metricType = partialMetric.getDataCase();
130+
if (metricType.equals(Metric.DataCase.GAUGE)) {
131+
List<NumberDataPoint> dataPoints = partialMetric.getGauge().getDataPointsList();
132+
List<NumberDataPoint> dataPointsWAttrs = dataPoints.stream().map(NumberDataPoint::toBuilder)
133+
.map(bdp -> bdp.addAllAttributes(resourceAttributes).build()).collect(Collectors.toList());
134+
Gauge newGauge = partialMetric.getGauge().toBuilder().clearDataPoints().addAllDataPoints(dataPointsWAttrs).build();
135+
return Metric.newBuilder().setName(partialMetric.getName()).setUnit(partialMetric.getUnit()).setGauge(newGauge).build();
136+
} else if (metricType.equals(Metric.DataCase.SUM)) {
137+
List<NumberDataPoint> dataPoints = partialMetric.getSum().getDataPointsList();
138+
List<NumberDataPoint> dataPointsWAttrs = dataPoints.stream().map(NumberDataPoint::toBuilder)
139+
.map(bdp -> bdp.addAllAttributes(resourceAttributes).build()).collect(Collectors.toList());
140+
partialMetric.getSum().toBuilder().clearDataPoints().addAllDataPoints(dataPointsWAttrs).build();
141+
Sum newSum = partialMetric.getSum().toBuilder().clearDataPoints().addAllDataPoints(dataPointsWAttrs).build();
142+
return Metric.newBuilder().setName(partialMetric.getName()).setUnit(partialMetric.getUnit()).setSum(newSum).build();
143+
} else {
144+
List<SummaryDataPoint> dataPoints = partialMetric.getSummary().getDataPointsList();
145+
List<SummaryDataPoint> dataPointsWAttrs = dataPoints.stream().map(SummaryDataPoint::toBuilder)
146+
.map(bdp -> bdp.addAllAttributes(resourceAttributes).build()).collect(Collectors.toList());
147+
Summary newSummary = partialMetric.getSummary().toBuilder().clearDataPoints().addAllDataPoints(dataPointsWAttrs).build();
148+
return Metric.newBuilder().setName(partialMetric.getName()).setUnit(partialMetric.getUnit()).setSummary(newSummary).build();
149+
}
150+
}
151+
119152
}

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/SumGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ public SumGenerator(String requestID, ELProcessor jelProcessor) {
4040
this.jelProcessor = jelProcessor;
4141
}
4242

43-
public Metric getOTelMetric(MetricDefinition metricDefinition) {
43+
public Metric.Builder getOTelMetric(MetricDefinition metricDefinition) {
4444
Metric.Builder partialMetric = Metric.newBuilder().setName(metricDefinition.getName())
4545
.setUnit(metricDefinition.getUnit());
4646
return metricDefinition.getIsDouble() ?
47-
partialMetric.setSum(getDoubleSumDataPoint(metricDefinition)).build() :
48-
partialMetric.setSum(getIntSumDataPoint(metricDefinition)).build();
47+
partialMetric.setSum(getDoubleSumDataPoint(metricDefinition)) :
48+
partialMetric.setSum(getIntSumDataPoint(metricDefinition));
4949
}
5050

5151
private Sum getDoubleSumDataPoint(MetricDefinition metricDefinition) {

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/SummaryGenerator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@ public SummaryGenerator(ELProcessor jelProcessor) {
4242
this.jelProcessor = jelProcessor;
4343
}
4444

45-
public Metric getOTelMetric(MetricDefinition metricDefinition) {
45+
public Metric.Builder getOTelMetric(MetricDefinition metricDefinition) {
4646
return Metric.newBuilder()
4747
.setName(metricDefinition.getName())
4848
.setUnit(metricDefinition.getUnit())
49-
.setSummary(getDoubleSummaryDataPoint(metricDefinition))
50-
.build();
49+
.setSummary(getDoubleSummaryDataPoint(metricDefinition));
5150
}
5251

5352
private Summary getDoubleSummaryDataPoint(MetricDefinition metricDefinition) {

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/dto/MetricDefinition.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class MetricDefinition implements Cloneable {
4040
private Integer payloadFrequencySeconds;
4141
private Integer payloadCount;
4242
private Set<String> reportingResources;
43+
private Set<String> copyResourceAttributes;
4344
private Map<String, Object> attributes;
4445

4546
public void validate(String requestID, Set<String> allResourceTypes, Integer globalPayloadFrequency, Integer globalPayloadCount) {
@@ -53,6 +54,9 @@ public void validate(String requestID, Set<String> allResourceTypes, Integer glo
5354
if (isMonotonic == null) {
5455
isMonotonic = false;
5556
}
57+
if (copyResourceAttributes == null) {
58+
copyResourceAttributes = new HashSet<>();
59+
}
5660
}
5761

5862
private void validateMandatoryFields() {

src/main/java/io/opentelemetry/contrib/generator/telemetry/misc/GeneratorUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import io.opentelemetry.contrib.generator.core.exception.GeneratorException;
2020
import io.opentelemetry.contrib.generator.core.utils.CommonUtils;
2121

22+
import io.opentelemetry.proto.common.v1.AnyValue;
2223
import io.opentelemetry.proto.common.v1.KeyValue;
24+
import io.opentelemetry.proto.resource.v1.Resource;
2325
import jakarta.el.ELProcessor;
2426
import lombok.extern.slf4j.Slf4j;
2527
import org.apache.commons.collections4.MapUtils;
@@ -97,4 +99,18 @@ public static List<KeyValue> getEvaluatedAttributes(ELProcessor jelProcessor, Ma
9799
return attributes;
98100
}
99101

102+
public static List<KeyValue> getResourceAttributes(Set<String> attributes, Resource resource) {
103+
List<KeyValue> selectedRsrcAttrs = new ArrayList<>();
104+
for (String eachAttr: attributes) {
105+
Optional<KeyValue> optionalKV = resource.getAttributesList().stream().filter(kv -> kv.getKey().equals(eachAttr))
106+
.findFirst();
107+
selectedRsrcAttrs.add(optionalKV.orElseGet(() -> getEmptyKV(eachAttr)));
108+
}
109+
return selectedRsrcAttrs;
110+
}
111+
112+
public static KeyValue getEmptyKV(String key) {
113+
return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue("").build()).build();
114+
}
115+
100116
}

0 commit comments

Comments
 (0)