Skip to content

Commit c4b2214

Browse files
committed
POC for severity_based log record processor
1 parent cfb959b commit c4b2214

File tree

7 files changed

+604
-0
lines changed

7 files changed

+604
-0
lines changed

sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
import io.opentelemetry.sdk.OpenTelemetrySdk;
1616
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
1717
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
18+
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessorModel;
1819
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.OpenTelemetryConfigurationModel;
1920
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SamplerModel;
21+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
2022
import io.opentelemetry.sdk.trace.samplers.Sampler;
2123
import java.io.Closeable;
2224
import java.io.IOException;
@@ -210,6 +212,29 @@ public static Sampler createSampler(DeclarativeConfigProperties genericSamplerMo
210212
samplerModel);
211213
}
212214

215+
/**
216+
* Create a {@link LogRecordProcessor} from the {@code logRecordProcessorModel} representing the
217+
* log record processor config.
218+
*
219+
* <p>This is used when log record processors are composed, with one processor accepting one or
220+
* more additional processors as config properties. The {@link ComponentProvider} implementation
221+
* can call this to configure a delegate {@link LogRecordProcessor} from the {@link
222+
* DeclarativeConfigProperties} corresponding to a particular config property.
223+
*/
224+
public static LogRecordProcessor createLogRecordProcessor(
225+
DeclarativeConfigProperties genericLogRecordProcessorModel) {
226+
YamlDeclarativeConfigProperties yamlDeclarativeConfigProperties =
227+
requireYamlDeclarativeConfigProperties(genericLogRecordProcessorModel);
228+
LogRecordProcessorModel logRecordProcessorModel =
229+
MAPPER.convertValue(
230+
DeclarativeConfigProperties.toMap(yamlDeclarativeConfigProperties),
231+
LogRecordProcessorModel.class);
232+
return createAndMaybeCleanup(
233+
LogRecordProcessorFactory.getInstance(),
234+
SpiHelper.create(yamlDeclarativeConfigProperties.getComponentLoader()),
235+
logRecordProcessorModel);
236+
}
237+
213238
private static YamlDeclarativeConfigProperties requireYamlDeclarativeConfigProperties(
214239
DeclarativeConfigProperties declarativeConfigProperties) {
215240
if (!(declarativeConfigProperties instanceof YamlDeclarativeConfigProperties)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig.internal;
7+
8+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
9+
import io.opentelemetry.api.logs.Severity;
10+
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
11+
import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration;
12+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
13+
import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
17+
/**
18+
* ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration.
19+
*
20+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
21+
* at any time.
22+
*/
23+
public class SeverityBasedLogRecordProcessorComponentProvider
24+
implements ComponentProvider<LogRecordProcessor> {
25+
26+
@Override
27+
public Class<LogRecordProcessor> getType() {
28+
return LogRecordProcessor.class;
29+
}
30+
31+
@Override
32+
public String getName() {
33+
return "severity_based";
34+
}
35+
36+
@Override
37+
public LogRecordProcessor create(DeclarativeConfigProperties config) {
38+
String minimumSeverityStr = config.getString("minimum_severity");
39+
if (minimumSeverityStr == null) {
40+
throw new IllegalArgumentException(
41+
"minimum_severity is required for severity_based log processors");
42+
}
43+
44+
Severity minimumSeverity;
45+
try {
46+
minimumSeverity = Severity.valueOf(minimumSeverityStr);
47+
} catch (IllegalArgumentException e) {
48+
throw new IllegalArgumentException("Invalid severity value: " + minimumSeverityStr, e);
49+
}
50+
51+
List<DeclarativeConfigProperties> processorConfigs = config.getStructuredList("processors");
52+
if (processorConfigs == null || processorConfigs.isEmpty()) {
53+
throw new IllegalArgumentException(
54+
"At least one processor is required for severity_based log processors");
55+
}
56+
57+
List<LogRecordProcessor> processors = new ArrayList<>();
58+
for (DeclarativeConfigProperties processorConfig : processorConfigs) {
59+
LogRecordProcessor processor =
60+
DeclarativeConfiguration.createLogRecordProcessor(processorConfig);
61+
processors.add(processor);
62+
}
63+
64+
return SeverityBasedLogRecordProcessor.builder(minimumSeverity)
65+
.addProcessors(processors)
66+
.build();
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector
2+
io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
10+
11+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
12+
import io.opentelemetry.common.ComponentLoader;
13+
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider;
14+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
15+
import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor;
16+
import java.io.ByteArrayInputStream;
17+
import java.nio.charset.StandardCharsets;
18+
import java.util.Collections;
19+
import org.junit.jupiter.api.Test;
20+
21+
class SeverityBasedLogRecordProcessorComponentProviderTest {
22+
23+
@Test
24+
void createSeverityBasedProcessor_DirectComponentProvider() {
25+
SeverityBasedLogRecordProcessorComponentProvider provider =
26+
new SeverityBasedLogRecordProcessorComponentProvider();
27+
28+
assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class);
29+
assertThat(provider.getName()).isEqualTo("severity_based");
30+
}
31+
32+
@Test
33+
void createSeverityBasedProcessor_ValidConfig() {
34+
DeclarativeConfigProperties config =
35+
getConfig(
36+
"minimum_severity: \"WARN\"\n"
37+
+ "processors:\n"
38+
+ " - simple:\n"
39+
+ " exporter:\n"
40+
+ " console: {}\n");
41+
42+
SeverityBasedLogRecordProcessorComponentProvider provider =
43+
new SeverityBasedLogRecordProcessorComponentProvider();
44+
45+
LogRecordProcessor processor = provider.create(config);
46+
47+
assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class);
48+
49+
assertThat(processor.toString())
50+
.contains("minimumSeverity=WARN")
51+
.contains("delegate=SimpleLogRecordProcessor")
52+
.contains("logRecordExporter=SystemOutLogRecordExporter");
53+
}
54+
55+
@Test
56+
void createSeverityBasedProcessor_MissingMinimumSeverity() {
57+
DeclarativeConfigProperties config =
58+
getConfig(
59+
"processors:\n" // this comment exists only to influence spotless formatting
60+
+ " - simple:\n"
61+
+ " exporter:\n"
62+
+ " console: {}\n");
63+
64+
SeverityBasedLogRecordProcessorComponentProvider provider =
65+
new SeverityBasedLogRecordProcessorComponentProvider();
66+
67+
assertThatThrownBy(() -> provider.create(config))
68+
.isInstanceOf(IllegalArgumentException.class)
69+
.hasMessage("minimum_severity is required for severity_based log processors");
70+
}
71+
72+
@Test
73+
void createSeverityBasedProcessor_InvalidSeverity() {
74+
75+
DeclarativeConfigProperties config =
76+
getConfig(
77+
"minimum_severity: \"INVALID\"\n"
78+
+ "processors:\n"
79+
+ " - simple:\n"
80+
+ " exporter:\n"
81+
+ " console: {}\n");
82+
83+
SeverityBasedLogRecordProcessorComponentProvider provider =
84+
new SeverityBasedLogRecordProcessorComponentProvider();
85+
86+
assertThatThrownBy(() -> provider.create(config))
87+
.isInstanceOf(IllegalArgumentException.class)
88+
.hasMessage("Invalid severity value: INVALID");
89+
}
90+
91+
private static DeclarativeConfigProperties getConfig(String yaml) {
92+
Object yamlObj =
93+
DeclarativeConfiguration.loadYaml(
94+
new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)),
95+
Collections.emptyMap());
96+
97+
return DeclarativeConfiguration.toConfigProperties(
98+
yamlObj,
99+
ComponentLoader.forClassLoader(
100+
SeverityBasedLogRecordProcessorComponentProviderTest.class.getClassLoader()));
101+
}
102+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.logs;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import io.opentelemetry.api.logs.Severity;
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.sdk.common.CompletableResultCode;
13+
import java.util.List;
14+
15+
/**
16+
* Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity
17+
* level and delegates to downstream processors.
18+
*
19+
* <p>This processor only forwards log records to downstream processors if the log record's severity
20+
* level is greater than or equal to the configured minimum severity level.
21+
*/
22+
public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor {
23+
24+
private final Severity minimumSeverity;
25+
private final LogRecordProcessor delegate;
26+
27+
SeverityBasedLogRecordProcessor(Severity minimumSeverity, List<LogRecordProcessor> processors) {
28+
this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity");
29+
requireNonNull(processors, "processors");
30+
this.delegate = LogRecordProcessor.composite(processors);
31+
}
32+
33+
/**
34+
* Returns a new {@link SeverityBasedLogRecordProcessorBuilder} to construct a {@link
35+
* SeverityBasedLogRecordProcessor}.
36+
*
37+
* @param minimumSeverity the minimum severity level required for processing
38+
* @return a new {@link SeverityBasedLogRecordProcessorBuilder}
39+
*/
40+
public static SeverityBasedLogRecordProcessorBuilder builder(Severity minimumSeverity) {
41+
return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity);
42+
}
43+
44+
@Override
45+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
46+
if (logRecord.getSeverity().getSeverityNumber() >= minimumSeverity.getSeverityNumber()) {
47+
delegate.onEmit(context, logRecord);
48+
}
49+
}
50+
51+
@Override
52+
public CompletableResultCode shutdown() {
53+
return delegate.shutdown();
54+
}
55+
56+
@Override
57+
public CompletableResultCode forceFlush() {
58+
return delegate.forceFlush();
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "SeverityBasedLogRecordProcessor{"
64+
+ "minimumSeverity="
65+
+ minimumSeverity
66+
+ ", delegate="
67+
+ delegate
68+
+ '}';
69+
}
70+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.logs;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import io.opentelemetry.api.logs.Severity;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
/** Builder class for {@link SeverityBasedLogRecordProcessor}. */
15+
public final class SeverityBasedLogRecordProcessorBuilder {
16+
17+
private final Severity minimumSeverity;
18+
private final List<LogRecordProcessor> processors = new ArrayList<>();
19+
20+
SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity) {
21+
this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity");
22+
}
23+
24+
/**
25+
* Adds multiple {@link LogRecordProcessor}s to the list of downstream processors.
26+
*
27+
* @param processors the processors to add
28+
* @return this builder
29+
*/
30+
public SeverityBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) {
31+
requireNonNull(processors, "processors");
32+
for (LogRecordProcessor processor : processors) {
33+
requireNonNull(processor, "processor");
34+
this.processors.add(processor);
35+
}
36+
return this;
37+
}
38+
39+
/**
40+
* Adds multiple {@link LogRecordProcessor}s to the list of downstream processors.
41+
*
42+
* @param processors the processors to add
43+
* @return this builder
44+
*/
45+
public SeverityBasedLogRecordProcessorBuilder addProcessors(
46+
Iterable<LogRecordProcessor> processors) {
47+
48+
requireNonNull(processors, "processors");
49+
for (LogRecordProcessor processor : processors) {
50+
requireNonNull(processor, "processor");
51+
this.processors.add(processor);
52+
}
53+
return this;
54+
}
55+
56+
/**
57+
* Returns a new {@link SeverityBasedLogRecordProcessor} with the configuration of this builder.
58+
*
59+
* @return a new {@link SeverityBasedLogRecordProcessor}
60+
* @throws IllegalArgumentException if no processors have been added
61+
*/
62+
public SeverityBasedLogRecordProcessor build() {
63+
if (processors.isEmpty()) {
64+
throw new IllegalArgumentException("At least one processor must be added");
65+
}
66+
return new SeverityBasedLogRecordProcessor(minimumSeverity, processors);
67+
}
68+
}

0 commit comments

Comments
 (0)