diff --git a/processors/README.md b/processors/README.md index 3e5df3ca8..795fe61ce 100644 --- a/processors/README.md +++ b/processors/README.md @@ -28,6 +28,10 @@ logger_provider: - event_to_span_event_bridge: ``` +## Filtering Log Processor + +`FilteringLogRecordProcessor` is a `LogRecordProcessor` that only keep logs based on a predicate + ## Component owners - [Cesar Munoz](https://github.com/LikeTheSalad), Elastic diff --git a/processors/build.gradle.kts b/processors/build.gradle.kts index 2fc920cc2..a54c9c578 100644 --- a/processors/build.gradle.kts +++ b/processors/build.gradle.kts @@ -24,4 +24,5 @@ dependencies { testImplementation("io.opentelemetry:opentelemetry-sdk-testing") testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator") + testImplementation("io.opentelemetry:opentelemetry-exporter-logging") } diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessor.java b/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessor.java new file mode 100644 index 000000000..a213ec21a --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessor.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.function.Predicate; + +public class FilteringLogRecordProcessor implements LogRecordProcessor { + + public final LogRecordProcessor delegate; + public final Predicate predicate; + + public FilteringLogRecordProcessor( + LogRecordProcessor delegate, Predicate predicate) { + this.delegate = delegate; + this.predicate = predicate; + } + + @Override + public void onEmit(Context context, ReadWriteLogRecord readWriteLogRecord) { + if (predicate.test(readWriteLogRecord.toLogRecordData())) { + delegate.onEmit(context, readWriteLogRecord); + } + } +} diff --git a/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessorTest.java b/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessorTest.java new file mode 100644 index 000000000..50405d454 --- /dev/null +++ b/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringLogRecordProcessorTest.java @@ -0,0 +1,114 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class FilteringLogRecordProcessorTest { + + private final InMemoryLogRecordExporter memoryLogRecordExporter = + InMemoryLogRecordExporter.create(); + ; + private final LogRecordProcessor logRecordProcessor = + SimpleLogRecordProcessor.create(memoryLogRecordExporter); + ; + private final InMemorySpanExporter spansExporter = InMemorySpanExporter.create(); + private AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder; + private Logger logger; + + @BeforeEach + void setUp() { + sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder(); + sdkBuilder + .addPropertiesSupplier( + () -> { + Map configMap = new HashMap<>(); + configMap.put("otel.metrics.exporter", "none"); + configMap.put("otel.traces.exporter", "logging"); + configMap.put("otel.logs.exporter", "logging"); + return configMap; + }) + .addSpanExporterCustomizer((exporter, c) -> spansExporter) + .addLogRecordExporterCustomizer( + (logRecordExporter, configProperties) -> memoryLogRecordExporter) + .addLoggerProviderCustomizer( + new BiFunction() { + @Override + public SdkLoggerProviderBuilder apply( + SdkLoggerProviderBuilder sdkLoggerProviderBuilder, + ConfigProperties configProperties) { + return sdkLoggerProviderBuilder.addLogRecordProcessor( + new FilteringLogRecordProcessor( + logRecordProcessor, + logRecordData -> logRecordData.getSpanContext().isSampled())); + } + }); + + logger = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + new FilteringLogRecordProcessor( + logRecordProcessor, + logRecordData -> { + SpanContext spanContext = logRecordData.getSpanContext(); + return spanContext.isSampled(); + }) {}) + .build() + .get("TestScope"); + } + + @Test + void verifyLogFilteringExistSpanContext() { + + try (OpenTelemetrySdk sdk = sdkBuilder.build().getOpenTelemetrySdk()) { + Tracer tracer = sdk.getTracer("test"); + Span span = tracer.spanBuilder("test").startSpan(); + sdk.getLogsBridge().get("test").logRecordBuilder().setBody("One Log").emit(); + List finishedLogRecordItems = + memoryLogRecordExporter.getFinishedLogRecordItems(); + assertEquals(1, finishedLogRecordItems.size()); + try (Scope scope = span.makeCurrent()) { + + } finally { + span.end(); + } + List finishedSpans = spansExporter.getFinishedSpanItems(); + assertEquals(1, finishedSpans.size()); + } + } + + @Test + void verifyFilteringNotExitSpanContext() { + logger.logRecordBuilder().setBody("One Log").emit(); + List finishedLogRecordItems = + memoryLogRecordExporter.getFinishedLogRecordItems(); + assertEquals(0, finishedLogRecordItems.size()); + } +}