Skip to content

Commit 07dbde4

Browse files
committed
POC for severity_based log record processor
1 parent cfb959b commit 07dbde4

File tree

7 files changed

+652
-0
lines changed

7 files changed

+652
-0
lines changed

sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
import io.opentelemetry.sdk.OpenTelemetrySdk;
1616
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
1717
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
18+
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessorModel;
1819
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.OpenTelemetryConfigurationModel;
1920
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SamplerModel;
21+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
2022
import io.opentelemetry.sdk.trace.samplers.Sampler;
2123
import java.io.Closeable;
2224
import java.io.IOException;
@@ -210,6 +212,29 @@ public static Sampler createSampler(DeclarativeConfigProperties genericSamplerMo
210212
samplerModel);
211213
}
212214

215+
/**
216+
* Create a {@link LogRecordProcessor} from the {@code logRecordProcessorModel} representing the
217+
* log record processor config.
218+
*
219+
* <p>This is used when log record processors are composed, with one processor accepting one or
220+
* more additional processors as config properties. The {@link ComponentProvider} implementation
221+
* can call this to configure a delegate {@link LogRecordProcessor} from the {@link
222+
* DeclarativeConfigProperties} corresponding to a particular config property.
223+
*/
224+
public static LogRecordProcessor createLogRecordProcessor(
225+
DeclarativeConfigProperties genericLogRecordProcessorModel) {
226+
YamlDeclarativeConfigProperties yamlDeclarativeConfigProperties =
227+
requireYamlDeclarativeConfigProperties(genericLogRecordProcessorModel);
228+
LogRecordProcessorModel logRecordProcessorModel =
229+
MAPPER.convertValue(
230+
DeclarativeConfigProperties.toMap(yamlDeclarativeConfigProperties),
231+
LogRecordProcessorModel.class);
232+
return createAndMaybeCleanup(
233+
LogRecordProcessorFactory.getInstance(),
234+
SpiHelper.create(yamlDeclarativeConfigProperties.getComponentLoader()),
235+
logRecordProcessorModel);
236+
}
237+
213238
private static YamlDeclarativeConfigProperties requireYamlDeclarativeConfigProperties(
214239
DeclarativeConfigProperties declarativeConfigProperties) {
215240
if (!(declarativeConfigProperties instanceof YamlDeclarativeConfigProperties)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig.internal;
7+
8+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
9+
import io.opentelemetry.api.logs.Severity;
10+
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
11+
import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration;
12+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
13+
import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
17+
/**
18+
* ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration.
19+
*
20+
* <p>This provider creates a {@link SeverityBasedLogRecordProcessor} that filters log records
21+
* based on minimum severity level. Only log records with a severity level greater than or
22+
* equal to the configured minimum are forwarded to the configured downstream processors.
23+
*
24+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
25+
* at any time.
26+
*/
27+
public class SeverityBasedLogRecordProcessorComponentProvider
28+
implements ComponentProvider<LogRecordProcessor> {
29+
30+
@Override
31+
public Class<LogRecordProcessor> getType() {
32+
return LogRecordProcessor.class;
33+
}
34+
35+
@Override
36+
public String getName() {
37+
return "severity_based";
38+
}
39+
40+
@Override
41+
public LogRecordProcessor create(DeclarativeConfigProperties config) {
42+
String minimumSeverityStr = config.getString("minimum_severity");
43+
if (minimumSeverityStr == null) {
44+
throw new IllegalArgumentException(
45+
"minimum_severity is required for severity_based log processors");
46+
}
47+
48+
Severity minimumSeverity;
49+
try {
50+
minimumSeverity = Severity.valueOf(minimumSeverityStr);
51+
} catch (IllegalArgumentException e) {
52+
throw new IllegalArgumentException("Invalid severity value: " + minimumSeverityStr, e);
53+
}
54+
55+
List<DeclarativeConfigProperties> processorConfigs = config.getStructuredList("processors");
56+
if (processorConfigs == null || processorConfigs.isEmpty()) {
57+
throw new IllegalArgumentException(
58+
"At least one processor is required for severity_based log processors");
59+
}
60+
61+
List<LogRecordProcessor> processors = new ArrayList<>();
62+
for (DeclarativeConfigProperties processorConfig : processorConfigs) {
63+
LogRecordProcessor processor =
64+
DeclarativeConfiguration.createLogRecordProcessor(processorConfig);
65+
processors.add(processor);
66+
}
67+
68+
return SeverityBasedLogRecordProcessor.builder(minimumSeverity)
69+
.addProcessors(processors)
70+
.build();
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector
2+
io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
10+
11+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
12+
import io.opentelemetry.common.ComponentLoader;
13+
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider;
14+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
15+
import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor;
16+
import java.io.ByteArrayInputStream;
17+
import java.nio.charset.StandardCharsets;
18+
import java.util.Collections;
19+
import org.junit.jupiter.api.Test;
20+
21+
class SeverityBasedLogRecordProcessorComponentProviderTest {
22+
23+
@Test
24+
void createSeverityBasedProcessor_DirectComponentProvider() {
25+
SeverityBasedLogRecordProcessorComponentProvider provider =
26+
new SeverityBasedLogRecordProcessorComponentProvider();
27+
28+
assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class);
29+
assertThat(provider.getName()).isEqualTo("severity_based");
30+
}
31+
32+
@Test
33+
void createSeverityBasedProcessor_ValidConfig() {
34+
DeclarativeConfigProperties config =
35+
getConfig(
36+
"minimum_severity: \"WARN\"\n"
37+
+ "processors:\n"
38+
+ " - simple:\n"
39+
+ " exporter:\n"
40+
+ " console: {}\n");
41+
42+
SeverityBasedLogRecordProcessorComponentProvider provider =
43+
new SeverityBasedLogRecordProcessorComponentProvider();
44+
45+
LogRecordProcessor processor = provider.create(config);
46+
47+
assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class);
48+
49+
assertThat(processor.toString())
50+
.contains("minimumSeverity=WARN")
51+
.contains("delegate=SimpleLogRecordProcessor")
52+
.contains("logRecordExporter=SystemOutLogRecordExporter");
53+
}
54+
55+
@Test
56+
void createSeverityBasedProcessor_MissingMinimumSeverity() {
57+
DeclarativeConfigProperties config =
58+
getConfig(
59+
"processors:\n" // this comment exists only to influence spotless formatting
60+
+ " - simple:\n"
61+
+ " exporter:\n"
62+
+ " console: {}\n");
63+
64+
SeverityBasedLogRecordProcessorComponentProvider provider =
65+
new SeverityBasedLogRecordProcessorComponentProvider();
66+
67+
assertThatThrownBy(() -> provider.create(config))
68+
.isInstanceOf(IllegalArgumentException.class)
69+
.hasMessage("minimum_severity is required for severity_based log processors");
70+
}
71+
72+
@Test
73+
void createSeverityBasedProcessor_InvalidSeverity() {
74+
75+
DeclarativeConfigProperties config =
76+
getConfig(
77+
"minimum_severity: \"INVALID\"\n"
78+
+ "processors:\n"
79+
+ " - simple:\n"
80+
+ " exporter:\n"
81+
+ " console: {}\n");
82+
83+
SeverityBasedLogRecordProcessorComponentProvider provider =
84+
new SeverityBasedLogRecordProcessorComponentProvider();
85+
86+
assertThatThrownBy(() -> provider.create(config))
87+
.isInstanceOf(IllegalArgumentException.class)
88+
.hasMessage("Invalid severity value: INVALID");
89+
}
90+
91+
@Test
92+
void createSeverityBasedProcessor_MissingProcessors() {
93+
DeclarativeConfigProperties config = getConfig("");
94+
95+
SeverityBasedLogRecordProcessorComponentProvider provider =
96+
new SeverityBasedLogRecordProcessorComponentProvider();
97+
98+
assertThatThrownBy(() -> provider.create(config))
99+
.isInstanceOf(IllegalArgumentException.class)
100+
.hasMessage("minimum_severity is required for severity_based log processors");
101+
}
102+
103+
@Test
104+
void createSeverityBasedProcessor_EmptyProcessors() {
105+
DeclarativeConfigProperties config = getConfig("minimum_severity: \"WARN\"\nprocessors: []\n");
106+
107+
SeverityBasedLogRecordProcessorComponentProvider provider =
108+
new SeverityBasedLogRecordProcessorComponentProvider();
109+
110+
assertThatThrownBy(() -> provider.create(config))
111+
.isInstanceOf(IllegalArgumentException.class)
112+
.hasMessage("At least one processor is required for severity_based log processors");
113+
}
114+
115+
@Test
116+
void createSeverityBasedProcessor_MultipleProcessors() {
117+
DeclarativeConfigProperties config =
118+
getConfig(
119+
"minimum_severity: \"INFO\"\n"
120+
+ "processors:\n"
121+
+ " - simple:\n"
122+
+ " exporter:\n"
123+
+ " console: {}\n"
124+
+ " - simple:\n"
125+
+ " exporter:\n"
126+
+ " console: {}\n");
127+
128+
SeverityBasedLogRecordProcessorComponentProvider provider =
129+
new SeverityBasedLogRecordProcessorComponentProvider();
130+
131+
LogRecordProcessor processor = provider.create(config);
132+
133+
assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class);
134+
assertThat(processor.toString()).contains("SeverityBasedLogRecordProcessor");
135+
}
136+
137+
private static DeclarativeConfigProperties getConfig(String yaml) {
138+
Object yamlObj =
139+
DeclarativeConfiguration.loadYaml(
140+
new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)),
141+
Collections.emptyMap());
142+
143+
return DeclarativeConfiguration.toConfigProperties(
144+
yamlObj,
145+
ComponentLoader.forClassLoader(
146+
SeverityBasedLogRecordProcessorComponentProviderTest.class.getClassLoader()));
147+
}
148+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.logs;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import io.opentelemetry.api.logs.Severity;
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.sdk.common.CompletableResultCode;
13+
import java.util.List;
14+
15+
/**
16+
* Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity
17+
* level and delegates to downstream processors.
18+
*
19+
* <p>This processor only forwards log records to downstream processors if the log record's severity
20+
* level is greater than or equal to the configured minimum severity level.
21+
*/
22+
public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor {
23+
24+
private final Severity minimumSeverity;
25+
private final LogRecordProcessor delegate;
26+
27+
SeverityBasedLogRecordProcessor(Severity minimumSeverity, List<LogRecordProcessor> processors) {
28+
this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity");
29+
requireNonNull(processors, "processors");
30+
this.delegate = LogRecordProcessor.composite(processors);
31+
}
32+
33+
/**
34+
* Returns a new {@link SeverityBasedLogRecordProcessorBuilder} to construct a {@link
35+
* SeverityBasedLogRecordProcessor}.
36+
*
37+
* @param minimumSeverity the minimum severity level required for processing
38+
* @return a new {@link SeverityBasedLogRecordProcessorBuilder}
39+
*/
40+
public static SeverityBasedLogRecordProcessorBuilder builder(Severity minimumSeverity) {
41+
return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity);
42+
}
43+
44+
@Override
45+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
46+
if (logRecord.getSeverity().getSeverityNumber() >= minimumSeverity.getSeverityNumber()) {
47+
delegate.onEmit(context, logRecord);
48+
}
49+
}
50+
51+
@Override
52+
public CompletableResultCode shutdown() {
53+
return delegate.shutdown();
54+
}
55+
56+
@Override
57+
public CompletableResultCode forceFlush() {
58+
return delegate.forceFlush();
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "SeverityBasedLogRecordProcessor{"
64+
+ "minimumSeverity="
65+
+ minimumSeverity
66+
+ ", delegate="
67+
+ delegate
68+
+ '}';
69+
}
70+
}

0 commit comments

Comments
 (0)