55
66import com .azure .monitor .opentelemetry .autoconfigure .implementation .AiSemanticAttributes ;
77import com .microsoft .applicationinsights .agent .internal .configuration .Configuration ;
8+ import com .microsoft .applicationinsights .agent .internal .configuration .Configuration .ProcessorConfig ;
9+ import com .microsoft .applicationinsights .agent .internal .configuration .Configuration .ProcessorType ;
810import com .microsoft .applicationinsights .agent .internal .sampling .AiFixedPercentageSampler ;
911import com .microsoft .applicationinsights .agent .internal .sampling .SamplingOverrides ;
12+ import com .microsoft .applicationinsights .agent .internal .processors .AgentProcessor ;
13+ import com .microsoft .applicationinsights .agent .internal .processors .AttributeProcessor ;
14+ import com .microsoft .applicationinsights .agent .internal .processors .LogProcessor ;
1015import io .opentelemetry .api .trace .Span ;
1116import io .opentelemetry .api .trace .SpanContext ;
1217import io .opentelemetry .context .Context ;
1318import io .opentelemetry .instrumentation .api .instrumenter .LocalRootSpan ;
1419import io .opentelemetry .sdk .common .CompletableResultCode ;
1520import io .opentelemetry .sdk .logs .LogRecordProcessor ;
1621import io .opentelemetry .sdk .logs .ReadWriteLogRecord ;
22+ import io .opentelemetry .sdk .logs .data .LogRecordData ;
1723import io .opentelemetry .sdk .trace .ReadableSpan ;
1824import io .opentelemetry .sdk .trace .samplers .SamplingDecision ;
1925import io .opentelemetry .sdk .trace .samplers .SamplingResult ;
2026import io .opentelemetry .semconv .ExceptionAttributes ;
27+ import java .util .ArrayList ;
28+ import java .util .Collections ;
2129import java .util .List ;
2230
2331public class AzureMonitorLogFilteringProcessor implements LogRecordProcessor {
2432
2533 private final SamplingOverrides logSamplingOverrides ;
2634 private final SamplingOverrides exceptionSamplingOverrides ;
2735 private final LogRecordProcessor batchLogRecordProcessor ;
36+ private final List <ProcessorApplier > previewProcessorAppliers ;
37+ private final boolean hasPreviewProcessors ;
2838
2939 private volatile int severityThreshold ;
3040
3141 public AzureMonitorLogFilteringProcessor (
3242 List <Configuration .SamplingOverride > logSamplingOverrides ,
3343 List <Configuration .SamplingOverride > exceptionSamplingOverrides ,
3444 LogRecordProcessor batchLogRecordProcessor ,
35- int severityThreshold ) {
45+ int severityThreshold ,
46+ List <ProcessorConfig > processorConfigs ) {
3647
3748 this .severityThreshold = severityThreshold ;
3849 this .logSamplingOverrides = new SamplingOverrides (logSamplingOverrides );
3950 this .exceptionSamplingOverrides = new SamplingOverrides (exceptionSamplingOverrides );
4051 this .batchLogRecordProcessor = batchLogRecordProcessor ;
41- this .severityThreshold = severityThreshold ;
52+ this .previewProcessorAppliers = buildProcessorAppliers (processorConfigs );
53+ this .hasPreviewProcessors = !previewProcessorAppliers .isEmpty ();
4254 }
4355
4456 public void setSeverityThreshold (int severityThreshold ) {
@@ -54,6 +66,9 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
5466 return ;
5567 }
5668
69+ LogRecordData processedLogRecord =
70+ hasPreviewProcessors ? applyPreviewProcessors (logRecord ) : logRecord .toLogRecordData ();
71+
5772 Double parentSpanSampleRate = null ;
5873 Span currentSpan = Span .fromContext (context );
5974 if (currentSpan instanceof ReadableSpan ) {
@@ -64,14 +79,17 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
6479 // deal with sampling synchronously so that we only call setAttributeExceptionLogged()
6580 // when we know we are emitting the exception (span sampling happens synchronously as well)
6681
67- String stack = logRecord .getAttribute (ExceptionAttributes .EXCEPTION_STACKTRACE );
82+ String stack =
83+ processedLogRecord
84+ .getAttributes ()
85+ .get (ExceptionAttributes .EXCEPTION_STACKTRACE );
6886
6987 SamplingOverrides samplingOverrides =
7088 stack != null ? exceptionSamplingOverrides : logSamplingOverrides ;
7189
7290 SpanContext spanContext = logRecord .getSpanContext ();
7391
74- AiFixedPercentageSampler sampler = samplingOverrides .getOverride (logRecord .getAttributes ());
92+ AiFixedPercentageSampler sampler = samplingOverrides .getOverride (processedLogRecord .getAttributes ());
7593
7694 boolean hasSamplingOverride = sampler != null ;
7795
@@ -98,7 +116,7 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
98116 logRecord .setAttribute (AiSemanticAttributes .SAMPLE_RATE , sampleRate );
99117 }
100118
101- setAttributeExceptionLogged (LocalRootSpan .fromContext (context ), logRecord );
119+ setAttributeExceptionLogged (LocalRootSpan .fromContext (context ), stack );
102120
103121 batchLogRecordProcessor .onEmit (context , logRecord );
104122 }
@@ -118,10 +136,94 @@ public void close() {
118136 batchLogRecordProcessor .close ();
119137 }
120138
121- private static void setAttributeExceptionLogged (Span span , ReadWriteLogRecord logRecord ) {
122- String stacktrace = logRecord .getAttribute (ExceptionAttributes .EXCEPTION_STACKTRACE );
139+ private LogRecordData applyPreviewProcessors (ReadWriteLogRecord logRecord ) {
140+ LogRecordData processed = logRecord .toLogRecordData ();
141+ for (ProcessorApplier processorApplier : previewProcessorAppliers ) {
142+ processed = processorApplier .apply (processed );
143+ }
144+ return processed ;
145+ }
146+
147+ private static List <ProcessorApplier > buildProcessorAppliers (
148+ List <ProcessorConfig > processorConfigs ) {
149+ if (processorConfigs .isEmpty ()) {
150+ return Collections .emptyList ();
151+ }
152+ List <ProcessorApplier > appliers = new ArrayList <>(processorConfigs .size ());
153+ for (ProcessorConfig processorConfig : processorConfigs ) {
154+ appliers .add (ProcessorApplier .fromConfig (processorConfig ));
155+ }
156+ return Collections .unmodifiableList (appliers );
157+ }
158+
159+ private static void setAttributeExceptionLogged (Span span , String stacktrace ) {
123160 if (stacktrace != null ) {
124161 span .setAttribute (AiSemanticAttributes .LOGGED_EXCEPTION , stacktrace );
125162 }
126163 }
164+
165+ private interface ProcessorApplier {
166+ LogRecordData apply (LogRecordData logRecordData );
167+
168+ static ProcessorApplier fromConfig (ProcessorConfig config ) {
169+ config .validate ();
170+ if (config .type == ProcessorType .ATTRIBUTE ) {
171+ AttributeProcessor attributeProcessor = AttributeProcessor .create (config , true );
172+ return new AttributeProcessorApplier (attributeProcessor );
173+ }
174+ if (config .type == ProcessorType .LOG ) {
175+ LogProcessor logProcessor = LogProcessor .create (config );
176+ return new LogBodyProcessorApplier (logProcessor );
177+ }
178+ throw new IllegalStateException ("Unsupported processor type: " + config .type );
179+ }
180+ }
181+
182+ private static final class AttributeProcessorApplier implements ProcessorApplier {
183+ private final AttributeProcessor attributeProcessor ;
184+
185+ private AttributeProcessorApplier (AttributeProcessor attributeProcessor ) {
186+ this .attributeProcessor = attributeProcessor ;
187+ }
188+
189+ @ Override
190+ public LogRecordData apply (LogRecordData logRecordData ) {
191+ AgentProcessor .IncludeExclude include = attributeProcessor .getInclude ();
192+ if (include != null
193+ && !include .isMatch (logRecordData .getAttributes (), logRecordData .getBody ().asString ())) {
194+ return logRecordData ;
195+ }
196+ AgentProcessor .IncludeExclude exclude = attributeProcessor .getExclude ();
197+ if (exclude != null
198+ && exclude .isMatch (logRecordData .getAttributes (), logRecordData .getBody ().asString ())) {
199+ return logRecordData ;
200+ }
201+ return attributeProcessor .processActions (logRecordData );
202+ }
203+ }
204+
205+ private static final class LogBodyProcessorApplier implements ProcessorApplier {
206+ private final LogProcessor logProcessor ;
207+
208+ private LogBodyProcessorApplier (LogProcessor logProcessor ) {
209+ this .logProcessor = logProcessor ;
210+ }
211+
212+ @ Override
213+ public LogRecordData apply (LogRecordData logRecordData ) {
214+ AgentProcessor .IncludeExclude include = logProcessor .getInclude ();
215+ if (include != null
216+ && !include .isMatch (logRecordData .getAttributes (), logRecordData .getBody ().asString ())) {
217+ return logRecordData ;
218+ }
219+ AgentProcessor .IncludeExclude exclude = logProcessor .getExclude ();
220+ if (exclude != null
221+ && exclude .isMatch (logRecordData .getAttributes (), logRecordData .getBody ().asString ())) {
222+ return logRecordData ;
223+ }
224+
225+ LogRecordData updatedLog = logProcessor .processFromAttributes (logRecordData );
226+ return logProcessor .processToAttributes (updatedLog );
227+ }
228+ }
127229}
0 commit comments