Skip to content

Commit 3473acd

Browse files
authored
Merge pull request #87 from cisco-open/feature/master/allow-filtering-resources-for-logs
Added support for filtering reporting resources for logs
2 parents a94f656 + f0dee06 commit 3473acd

File tree

9 files changed

+123
-17
lines changed

9 files changed

+123
-17
lines changed

example-definitions/json/log-definition.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@
4545
"event.reason": "roundRobin([\"Pulling\", \"Pulled\", \"Created\", \"Pulling\", \"Backoff\"])",
4646
"event.message": "roundRobin([\"Pulling image k8s.gcr.io/echoserver:1.8\", \"Image downloaded\", \"Created container\", \"Pulling image cjknsjc/ccsdc:fff\", \"Error: ImagePullBackoff\"])"
4747
}
48+
},
49+
{
50+
"severityOrderFunction": "severityDistributionCount([\"ERROR\", \"WARN\", \"DEBUG\"], [1, 1, 4])",
51+
"payloadFrequencySeconds": 20,
52+
"payloadCount": 2,
53+
"copyCount": 200,
54+
"filteredReportingResources": {
55+
"node": ["k8s.cluster.name=cluster-skyrim"]
56+
}
4857
}
4958
]
5059
}

example-definitions/qa/log-definition.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,9 @@ logs:
4747
event.domain: 'roundRobin(["k8s"])'
4848
event.reason: 'roundRobin(["Pulling", "Pulled", "Created", "Pulling", "Backoff"])'
4949
event.message: 'roundRobin(["Pulling image k8s.gcr.io/echoserver:1.8", "Image downloaded", "Created container", "Pulling image cjknsjc/ccsdc:fff", "Error: ImagePullBackoff"])'
50+
- severityOrderFunction: 'severityDistributionCount(["ERROR", "WARN", "DEBUG"], [1, 1, 4])'
51+
payloadFrequencySeconds: 20
52+
payloadCount: 2
53+
copyCount: 200
54+
filteredReportingResources:
55+
node: ["k8s.cluster.name=cluster-skyrim"]

src/main/java/io/opentelemetry/contrib/generator/telemetry/logs/LogGeneratorThread.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import jakarta.el.ELProcessor;
3535
import lombok.Getter;
3636
import lombok.extern.slf4j.Slf4j;
37+
import org.apache.commons.collections4.MapUtils;
3738

3839
import static io.opentelemetry.contrib.generator.telemetry.misc.GeneratorUtils.*;
3940

@@ -72,10 +73,21 @@ public void run() {
7273
List<ResourceLogs> resourceLogsList = new ArrayList<>();
7374
ResourceLogs resourceLog;
7475
LogRecord.Builder partialLogRecord = getLog(logDefinition);
75-
for (Map.Entry<String, Integer> reportingResource : logDefinition.getReportingResourcesCounts().entrySet()) {
76-
List<Resource> postToResources = getResourceSubsetByPostCount(reportingResource.getKey(), reportingResource.getValue());
77-
log.debug(requestID + ": Preparing " + postToResources.size() + " resource logs packets for " + reportingResource);
78-
for (Resource eachResource: postToResources) {
76+
Map<String, List<Resource>> reportingResourcesByType = new HashMap<>();
77+
for (Map.Entry<String, Integer> resourceTypeWithCount:
78+
MapUtils.emptyIfNull(logDefinition.getReportingResourcesCounts()).entrySet()) {
79+
reportingResourcesByType.put(resourceTypeWithCount.getKey(),
80+
getResourceSubsetByPostCount(resourceTypeWithCount.getKey(), resourceTypeWithCount.getValue()));
81+
}
82+
for (Map.Entry<String, Map<String, String>> resourceTypeWithFilter:
83+
MapUtils.emptyIfNull(logDefinition.getParsedFilteredReportingResources()).entrySet()) {
84+
reportingResourcesByType.put(resourceTypeWithFilter.getKey(),
85+
getFilteredResources(resourceTypeWithFilter.getKey(), resourceTypeWithFilter.getValue()));
86+
}
87+
for (Map.Entry<String, List<Resource>> reportingResourceByType : reportingResourcesByType.entrySet()) {
88+
log.debug(requestID + ": Preparing " + reportingResourceByType.getValue().size() +
89+
" resource logs packets for " + reportingResourceByType.getKey());
90+
for (Resource eachResource: reportingResourceByType.getValue()) {
7991
LogRecord logRecord = partialLogRecord.clone().addAllAttributes(getResourceAttributes(logDefinition
8092
.getCopyResourceAttributes(), eachResource)).build();
8193
List<LogRecord> otelLogs = Collections.nCopies(logDefinition.getCopyCount(), logRecord);
@@ -91,13 +103,15 @@ public void run() {
91103
.build();
92104
resourceLogsList.add(resourceLog);
93105
}
94-
log.info(requestID + ": Sending payload for: " + reportingResource);
106+
log.info(requestID + ": Sending payload for: " + reportingResourceByType.getKey());
95107
ExportLogsServiceRequest resourceLogs = ExportLogsServiceRequest.newBuilder().addAllResourceLogs(resourceLogsList).build();
96108
boolean responseStatus = payloadHandler.postPayload(resourceLogs);
97109
if (logGeneratorState.getTransportStorage() != null) {
98-
logGeneratorState.getTransportStorage().store(logDefinition.getId(), reportingResource.getKey(), resourceLogs, responseStatus);
110+
logGeneratorState.getTransportStorage().store(logDefinition.getId(),
111+
reportingResourceByType.getKey(), resourceLogs, responseStatus);
99112
}
100-
log.debug(requestID + ": Complete payload for resource: " + reportingResource + " in log Definition" + logDefinition.getId() + ": " + resourceLogs);
113+
log.debug(requestID + ": Complete payload for resource: " + reportingResourceByType.getKey() +
114+
" in log Definition" + logDefinition.getId() + ": " + resourceLogs);
101115
resourceLogsList.clear();
102116
}
103117
currentPayloadCount++;
@@ -131,4 +145,18 @@ private List<Resource> getResourceSubsetByPostCount(String resourceName, int res
131145
return resourcesInResourceModel.subList(resourceStartIndex, resourceEndIndex)
132146
.stream().map(GeneratorResource::getOTelResource).collect(Collectors.toList());
133147
}
148+
149+
private List<Resource> getFilteredResources(String resourceName, Map<String, String> filters) {
150+
List<Resource> filteredResources = new ArrayList<>();
151+
List<GeneratorResource> allResources = ResourceModelProvider.getResourceModel(requestID)
152+
.get(resourceName).stream()
153+
.filter(GeneratorResource::isActive)
154+
.toList();
155+
for (GeneratorResource eachResource: allResources) {
156+
if (eachResource.getEvaluatedAttributes().entrySet().containsAll(filters.entrySet())) {
157+
filteredResources.add(eachResource.getOTelResource());
158+
}
159+
}
160+
return filteredResources;
161+
}
134162
}

src/main/java/io/opentelemetry/contrib/generator/telemetry/logs/dto/LogDefinition.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@ public class LogDefinition {
3333

3434
private String severityOrderFunction;
3535
private Map<String, Integer> reportingResourcesCounts;
36+
private Map<String, Set<String>> filteredReportingResources;
3637
private Integer payloadFrequencySeconds;
3738
private Integer payloadCount;
3839
private Integer copyCount;
3940
private Set<String> copyResourceAttributes;
4041
private Map<String, Object> attributes;
4142
@JsonIgnore
4243
private String id;
44+
@JsonIgnore
45+
private Map<String, Map<String, String>> parsedFilteredReportingResources;
4346

4447
public long validate(String requestID, Set<String> allResourceTypes, Integer globalPayloadFrequencySeconds, int logIndex) {
4548
id = "log_by_ttg_" + logIndex;
@@ -50,7 +53,8 @@ public long validate(String requestID, Set<String> allResourceTypes, Integer glo
5053
copyResourceAttributes = new HashSet<>();
5154
}
5255
validateMandatoryFields();
53-
validateResourceTypesCount(allResourceTypes);
56+
validateResourceTypes(allResourceTypes);
57+
parseFilteredReportingResources();
5458
addRequestIDAndLogNameToValueFunction(requestID);
5559
attributes = GeneratorUtils.addArgsToAttributeExpressions(requestID, "log", id, attributes);
5660
return validatePayloadFrequency(globalPayloadFrequencySeconds);
@@ -60,8 +64,10 @@ private void validateMandatoryFields() {
6064
if (payloadCount == null || payloadCount < 1) {
6165
throw new GeneratorException("Payload count cannot be less than 1. Update the value in log " + this);
6266
}
63-
if (MapUtils.emptyIfNull(reportingResourcesCounts).isEmpty()) {
64-
throw new GeneratorException("Mandatory field 'reportingResourcesCount' not provided in log definition YAML for log " + this);
67+
if (MapUtils.emptyIfNull(reportingResourcesCounts).isEmpty() &&
68+
MapUtils.emptyIfNull(filteredReportingResources).isEmpty()) {
69+
throw new GeneratorException("At least one resource type must be specified in either reportingResources" +
70+
" or filteredReportingResources for" + this);
6571
}
6672
if (StringUtils.defaultString(severityOrderFunction).isBlank()) {
6773
throw new GeneratorException("Mandatory field 'severityFrequency' not provided in log definition YAML for log " + this);
@@ -107,7 +113,13 @@ private void validateAttributes() {
107113
attributes = GeneratorUtils.validateAttributes(attributes);
108114
}
109115

110-
private void validateResourceTypesCount(Set<String> allResourceTypes) {
116+
private void validateResourceTypes(Set<String> allResourceTypes) {
117+
Set<String> resourcesInCounts = MapUtils.emptyIfNull(reportingResourcesCounts).keySet();
118+
Set<String> resourcesInFilteredResources = MapUtils.emptyIfNull(filteredReportingResources).keySet();
119+
if (!Collections.disjoint(resourcesInCounts, resourcesInFilteredResources)) {
120+
throw new GeneratorException("'reportingResourcesCounts' and 'filteredReportingResources' cannot have " +
121+
"the same resource type for log " + this);
122+
}
111123
Map<String, Integer> resourceCount = new HashMap<>();
112124
for (Map.Entry<String, Integer> eachResource : MapUtils.emptyIfNull(reportingResourcesCounts).entrySet()) {
113125
if (eachResource.getKey().trim().length() == 0) {
@@ -124,6 +136,32 @@ private void validateResourceTypesCount(Set<String> allResourceTypes) {
124136
else resourceCount.put(eachResource.getKey().trim(), eachResource.getValue());
125137
}
126138
reportingResourcesCounts = resourceCount;
139+
for (String resourceType: MapUtils.emptyIfNull(filteredReportingResources).keySet()) {
140+
if (!allResourceTypes.contains(resourceType)) {
141+
throw new GeneratorException("Invalid resource type (" + resourceType + ") found in " +
142+
"'filteredReportingResources' for log " + this);
143+
}
144+
}
145+
}
146+
147+
private void parseFilteredReportingResources() {
148+
if (!MapUtils.emptyIfNull(filteredReportingResources).isEmpty()) {
149+
parsedFilteredReportingResources = new HashMap<>();
150+
for (Map.Entry<String, Set<String>> eachFilteredReportingResource: filteredReportingResources.entrySet()) {
151+
String resourceType = eachFilteredReportingResource.getKey();
152+
parsedFilteredReportingResources.put(resourceType, new HashMap<>());
153+
for (String attributeFilter : eachFilteredReportingResource.getValue()) {
154+
String[] filterTokens = attributeFilter.split("=");
155+
if (filterTokens.length != 2) {
156+
log.warn("Attribute filter " + attributeFilter + " provided for resource type " +
157+
resourceType + " in the log definition YAML for log " + this + " is not valid. " +
158+
"Must contain a single '='");
159+
continue;
160+
}
161+
parsedFilteredReportingResources.get(resourceType).put(filterTokens[0], filterTokens[1]);
162+
}
163+
}
164+
}
127165
}
128166

129167
private void addRequestIDAndLogNameToValueFunction(String requestID) {

src/main/java/io/opentelemetry/contrib/generator/telemetry/metrics/dto/MetricDefinition.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.opentelemetry.contrib.generator.telemetry.metrics.dto;
1818

19+
import com.fasterxml.jackson.annotation.JsonIgnore;
1920
import io.opentelemetry.contrib.generator.core.exception.GeneratorException;
2021
import io.opentelemetry.contrib.generator.telemetry.misc.Constants;
2122
import io.opentelemetry.contrib.generator.telemetry.misc.GeneratorUtils;
@@ -46,6 +47,7 @@ public class MetricDefinition implements Cloneable {
4647
private Map<String, Set<String>> filteredReportingResources;
4748
private Set<String> copyResourceAttributes;
4849
private Map<String, Object> attributes;
50+
@JsonIgnore
4951
private Map<String, Map<String, String>> parsedFilteredReportingResources;
5052

5153
public void validate(String requestID, Set<String> allResourceTypes, Integer globalPayloadFrequency,

src/test/java/io/opentelemetry/contrib/generator/telemetry/TestAllGeneratorsWithJSONInput.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,14 @@ public void validatePacketCounts() {
5555
int METRIC_REPORTING_RESOURCES_COUNT = NETWORK_INTERFACE_COUNT + CONTAINER_COUNT + MACHINE_COUNT + NODE_COUNT +
5656
POD_COUNT + DISK_COUNT + AWS_EBS_COUNT + AWS_RDS_COUNT;
5757
int LOG_REPORTING_RESOURCES_COUNT = CONTAINER_COUNT + NODE_COUNT + 2 * POD_COUNT + MACHINE_COUNT;
58+
int LOG_FILTERED_RESOURCES = 5;
59+
int LOG_FILTERED_PAYLOADS = 20;
5860
int metricPayloadCount = 10;
5961
int logsPayloadCount = 20;
6062
int POD_EVENTS_COUNT = 30 * 5;
6163
int expectedMetricPackets = METRIC_REPORTING_RESOURCES_COUNT * metricPayloadCount;
62-
int expectedLogsPackets = LOG_REPORTING_RESOURCES_COUNT * logsPayloadCount + POD_EVENTS_COUNT;
64+
int expectedLogsPackets = LOG_REPORTING_RESOURCES_COUNT * logsPayloadCount + POD_EVENTS_COUNT +
65+
(LOG_FILTERED_RESOURCES * LOG_FILTERED_PAYLOADS);
6366
int expectedSpanPackets = 11518;
6467
Assert.assertEquals(testStore.getMetricsPacketCount(), expectedMetricPackets, "Mismatch in expected metric packets count");
6568
Assert.assertEquals(testStore.getLogsPacketCount(), expectedLogsPackets, "Mismatch in expected log packets count");
@@ -70,7 +73,7 @@ public void validatePacketCounts() {
7073
public void validateStorageCounts() {
7174
Assert.assertEquals(transportStorage.getStoredMetricsPayloads().size(), 8,
7275
"Mismatch in resource type counts for metric payloads");
73-
Assert.assertEquals(transportStorage.getStoredLogsPayloads().size(), 4,
76+
Assert.assertEquals(transportStorage.getStoredLogsPayloads().size(), 5,
7477
"Mismatch in resource type counts for log payloads");
7578
Assert.assertEquals(transportStorage.getStoredLogsPayloads().get("log_by_ttg_0").size(), 3,
7679
"Mismatch in resource type counts for log payloads");
@@ -80,7 +83,7 @@ public void validateStorageCounts() {
8083
"Mismatch in resource type counts for trace payloads");
8184
Assert.assertEquals(transportStorage.getMetricsResponses().size(), 8,
8285
"Mismatch in resource type counts for metric response statuses");
83-
Assert.assertEquals(transportStorage.getLogsResponses().size(), 4,
86+
Assert.assertEquals(transportStorage.getLogsResponses().size(), 5,
8487
"Mismatch in resource type counts for log response statuses");
8588
Assert.assertEquals(transportStorage.getTracesResponses().size(), 8,
8689
"Mismatch in resource type counts for trace response statuses");

src/test/java/io/opentelemetry/contrib/generator/telemetry/TestLogsGenerator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ public class TestLogsGenerator {
3838
private final int CONTAINER_COUNT_LOG = 150;
3939
private final int MACHINE_COUNT = 4;
4040
private final int NODE_COUNT = 25;
41+
private final int FILTERED_NODE_COUNT = 5;
4142
private final int POD_COUNT = 10;
4243
private final int POD_EVENT_COUNT = 30;
4344
private final int POST_COUNT_K8S_LOG = 10;
4445
private final int POST_COUNT_LOG_LOG_1 = 2;
4546
private final int POST_COUNT_LOG_LOG_2 = 5;
4647
private final int POST_COUNT_K8S_EVENT = 5;
48+
private final int FILTERED_NODE_PAYLOAD_COUNT = 2;
4749

4850
@BeforeClass
4951
public void generateData() {
@@ -57,11 +59,13 @@ public void generateData() {
5759
@Test
5860
public void testPayloadAndPacketCounts() {
5961
//Check payload count = Summation of all post counts per log definition
60-
int expectedPayloadCount = 2 * POST_COUNT_K8S_LOG + POST_COUNT_LOG_LOG_1 + 2 * POST_COUNT_LOG_LOG_2 + POST_COUNT_K8S_EVENT;
62+
int expectedPayloadCount = 2 * POST_COUNT_K8S_LOG + POST_COUNT_LOG_LOG_1 + 2 * POST_COUNT_LOG_LOG_2 +
63+
POST_COUNT_K8S_EVENT + FILTERED_NODE_PAYLOAD_COUNT;
6164
Assert.assertEquals(testStore.getLogsPayloads().size(), expectedPayloadCount, "Mismatch in payload count");
6265
//Check packet count = Summation (payload count * number of resources) for every log
6366
int expectedPacketCount = (CONTAINER_COUNT_K8S + POD_COUNT) * POST_COUNT_K8S_LOG + NODE_COUNT * POST_COUNT_LOG_LOG_1
64-
+ (MACHINE_COUNT + CONTAINER_COUNT_LOG) * POST_COUNT_LOG_LOG_2 + POD_EVENT_COUNT * POST_COUNT_K8S_EVENT;
67+
+ (MACHINE_COUNT + CONTAINER_COUNT_LOG) * POST_COUNT_LOG_LOG_2 +
68+
POD_EVENT_COUNT * POST_COUNT_K8S_EVENT + FILTERED_NODE_COUNT * FILTERED_NODE_PAYLOAD_COUNT;
6569
Assert.assertEquals(testStore.getLogsPacketCount(), expectedPacketCount, "Mismatch in resource logs packet count");
6670
//Check log count for each log = number of reporting resources * number of payloads defined per log definition
6771
}
@@ -73,7 +77,9 @@ public void testLogsCounts() {
7377
int log1_Count = NODE_COUNT * POST_COUNT_LOG_LOG_1;
7478
int log2_Count = (MACHINE_COUNT + CONTAINER_COUNT_LOG) * POST_COUNT_LOG_LOG_2;
7579
int k8sEventsCount = POD_EVENT_COUNT * POST_COUNT_K8S_EVENT;
76-
Assert.assertEquals(testStore.getLogsPacketCount(), k8sLogs_Count + log1_Count + log2_Count + k8sEventsCount,
80+
int filteredNodeEvents = FILTERED_NODE_COUNT * FILTERED_NODE_PAYLOAD_COUNT;
81+
Assert.assertEquals(testStore.getLogsPacketCount(), k8sLogs_Count + log1_Count + log2_Count +
82+
k8sEventsCount + filteredNodeEvents,
7783
"Mismatch in logs count");
7884
}
7985
}

src/test/resources/test-definitions/logs-test-combined.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
"event.reason": "roundRobin([\"Pulling\", \"Pulled\", \"Created\", \"Pulling\", \"Backoff\"])",
4242
"event.message": "roundRobin([\"Pulling image k8s.gcr.io/echoserver:1.8\", \"Image downloaded\", \"Created container\", \"Pulling image cjknsjc/ccsdc:fff\", \"Error: ImagePullBackoff\"])"
4343
}
44+
},
45+
{
46+
"payloadCount": 20,
47+
"severityOrderFunction": "severityDistributionCount([\"INFO\", \"ERROR\", \"WARN\", \"DEBUG\"], [5, 1, 1, 4])",
48+
"filteredReportingResources": {
49+
"node": ["k8s.cluster.name=cluster-aukus"]
50+
}
4451
}
4552
]
4653
}

src/test/resources/test-definitions/logs-test.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,10 @@ logs:
5151
event.domain: 'roundRobin(["k8s"])'
5252
event.reason: 'roundRobin(["Pulling", "Pulled", "Created", "Pulling", "Backoff"])'
5353
event.message: 'roundRobin(["Pulling image k8s.gcr.io/echoserver:1.8", "Image downloaded", "Created container", "Pulling image cjknsjc/ccsdc:fff", "Error: ImagePullBackoff"])'
54+
- severityOrderFunction: 'severityDistributionCount(["ERROR", "WARN", "DEBUG"], [1, 1, 4])'
55+
payloadFrequencySeconds: 20
56+
payloadCount: 2
57+
copyCount: 10
58+
copyResourceAttributes: ["k8s.pod.ip", "k8s.node.ip.internal"]
59+
filteredReportingResources:
60+
node: ["k8s.cluster.name=cluster-aukus"]

0 commit comments

Comments
 (0)