Skip to content

Pre-sampled telemetry for Live Metrics #4139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AgentLogExporter implements LogRecordExporter {
private final SamplingOverrides exceptionSamplingOverrides;
private final LogDataMapper mapper;
private final Consumer<TelemetryItem> telemetryItemConsumer;
private final QuickPulse quickPulse;

public AgentLogExporter(
int severityThreshold,
Expand All @@ -55,14 +56,12 @@ public AgentLogExporter(
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
this.severityThreshold = severityThreshold;
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides);
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides, quickPulse);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides, quickPulse);
this.quickPulse = quickPulse;
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
if (quickPulse != null) {
quickPulse.add(telemetryItem);
}
TelemetryObservers.INSTANCE
.getObservers()
.forEach(consumer -> consumer.accept(telemetryItem));
Expand Down Expand Up @@ -120,6 +119,13 @@ private void internalExport(LogRecordData log) {
SpanContext spanContext = log.getSpanContext();
Double parentSpanSampleRate = log.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);

TelemetryItem telemetryItem = null;
if (quickPulse != null && quickPulse.isEnabled()) {
telemetryItem = mapper.map(log, stack, parentSpanSampleRate);
logger.debug("adding log to quick pulse: {}", telemetryItem.toJsonString());
quickPulse.add(telemetryItem);
}

AiFixedPercentageSampler sampler = samplingOverrides.getOverride(log.getAttributes());

boolean hasSamplingOverride = sampler != null;
Expand All @@ -136,6 +142,7 @@ private void internalExport(LogRecordData log) {
if (hasSamplingOverride) {
SamplingResult samplingResult = sampler.shouldSampleLog(spanContext, parentSpanSampleRate);
if (samplingResult.getDecision() != SamplingDecision.RECORD_AND_SAMPLE) {
logger.info("Sampling out log for Breeze: {}", log.getBodyValue().asString());
return;
}
sampleRate = samplingResult.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE);
Expand All @@ -148,7 +155,9 @@ private void internalExport(LogRecordData log) {
logger.debug("exporting log: {}", log);

// TODO (trask) no longer need to check AiSemanticAttributes.SAMPLE_RATE in map() method
TelemetryItem telemetryItem = mapper.map(log, stack, sampleRate);
if (telemetryItem == null) {
telemetryItem = mapper.map(log, stack, sampleRate);
}
telemetryItemConsumer.accept(telemetryItem);

exportingLogLogger.recordSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.azure.monitor.opentelemetry.autoconfigure.implementation.SpanDataMapper;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.internal.telemetry.BatchItemProcessor;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
Expand All @@ -18,7 +17,6 @@
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,14 +32,10 @@ public final class AgentSpanExporter implements SpanExporter {

public AgentSpanExporter(
SpanDataMapper mapper,
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
if (quickPulse != null) {
quickPulse.add(telemetryItem);
}
TelemetryObservers.INSTANCE
.getObservers()
.forEach(consumer -> consumer.accept(telemetryItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ch.qos.logback.classic.LoggerContext;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.heartbeat.HeartbeatExporter;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.internal.classicsdk.BytecodeUtilImpl;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void apply(RuntimeConfiguration runtimeConfig) {
|| !Objects.equals(runtimeConfig.sampling.percentage, currentConfig.sampling.percentage)
|| !Objects.equals(
runtimeConfig.sampling.requestsPerSecond, currentConfig.sampling.requestsPerSecond)) {
updateSampling(enabled, runtimeConfig.sampling, runtimeConfig.samplingPreview);
updateSampling(enabled, runtimeConfig.sampling, runtimeConfig.samplingPreview, telemetryClient.getQuickPulse());
}

// initialize Profiler
Expand Down Expand Up @@ -198,15 +199,17 @@ static void updatePropagation(
static void updateSampling(
boolean enabled,
Configuration.Sampling sampling,
Configuration.SamplingPreview samplingPreview) {
Configuration.SamplingPreview samplingPreview,
QuickPulse quickPulse) {

if (!enabled) {
DelegatingSampler.getInstance().reset();
BytecodeUtilImpl.samplingPercentage = 0;
return;
}

DelegatingSampler.getInstance().setDelegate(Samplers.getSampler(sampling, samplingPreview));
DelegatingSampler.getInstance().setDelegate(Samplers.getSampler(sampling, samplingPreview, quickPulse));
// call setQuickPulse method here in delegate
if (sampling.percentage != null) {
BytecodeUtilImpl.samplingPercentage = sampling.percentage.floatValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.monitor.opentelemetry.autoconfigure.implementation.AzureMonitorLogRecordExporterProvider;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.AzureMonitorMetricExporterProvider;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.AzureMonitorSpanExporterProvider;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.LiveMetricsSpanProcessor;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.LogDataMapper;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.MetricDataMapper;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.SpanDataMapper;
Expand Down Expand Up @@ -166,6 +167,28 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {

TelemetryClient.setActive(telemetryClient);

QuickPulse quickPulse;
if (configuration.preview.liveMetrics.enabled) {
quickPulse =
QuickPulse.create(
LazyHttpClient.newHttpPipeLineWithDefaultRedirect(configuration.authentication),
() -> {
ConnectionString connectionString = telemetryClient.getConnectionString();
return connectionString == null ? null : connectionString.getLiveEndpoint();
},
telemetryClient::getInstrumentationKey,
telemetryClient.getRoleName(),
telemetryClient.getRoleInstance(),
FirstEntryPoint.getAgentVersion());
} else {
quickPulse = null;
}
// quickPulse needs to be set before the runtimeConfigurator is created, so that when
// the telemetry client is passed to the runtime configurator, the runtime configurator can
// use it to pass quickPulse to the RuntimeConfigurator.updateSampling method. Sampling may
// use quickPulse.isEnabled to determine if telemetry should be dropped or record only.
telemetryClient.setQuickPulse(quickPulse);

// TODO (heya) remove duplicate code in both RuntimeConfigurator and SecondEntryPoint
RuntimeConfigurator runtimeConfigurator =
new RuntimeConfigurator(
Expand Down Expand Up @@ -220,26 +243,15 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
// TODO (trask) add this method to AutoConfigurationCustomizer upstream?
((AutoConfiguredOpenTelemetrySdkBuilder) autoConfiguration).disableShutdownHook();

QuickPulse quickPulse;
if (configuration.preview.liveMetrics.enabled) {
quickPulse =
QuickPulse.create(
LazyHttpClient.newHttpPipeLineWithDefaultRedirect(configuration.authentication),
() -> {
ConnectionString connectionString = telemetryClient.getConnectionString();
return connectionString == null ? null : connectionString.getLiveEndpoint();
},
telemetryClient::getInstrumentationKey,
telemetryClient.getRoleName(),
telemetryClient.getRoleInstance(),
FirstEntryPoint.getAgentVersion());
} else {
quickPulse = null;
}
telemetryClient.setQuickPulse(quickPulse);

AtomicBoolean firstLogRecordProcessor = new AtomicBoolean(true);


List<Configuration.SamplingOverride> exceptionSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.EXCEPTION)
.collect(Collectors.toList());
SpanDataMapper mapper = createSpanDataMapper(telemetryClient, configuration.preview.captureHttpServer4xxAsError, new SamplingOverrides(exceptionSamplingOverrides, quickPulse));

autoConfiguration
.addPropertiesSupplier(
() -> {
Expand Down Expand Up @@ -279,7 +291,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
.addSpanExporterCustomizer(
(spanExporter, configProperties) -> {
if (spanExporter instanceof AzureMonitorSpanExporterProvider.MarkerSpanExporter) {
return buildTraceExporter(configuration, telemetryClient, quickPulse);
return buildTraceExporter(configuration, telemetryClient, mapper);
}
return wrapSpanExporter(spanExporter, configuration);
})
Expand All @@ -302,7 +314,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
}
})
.addTracerProviderCustomizer(
(builder, otelConfig) -> configureTracing(builder, configuration))
(builder, otelConfig) -> configureTracing(builder, configuration, quickPulse, mapper))
.addMeterProviderCustomizer(
(builder, otelConfig) -> configureMetrics(builder, configuration));

Expand All @@ -320,17 +332,12 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
}

private static SpanExporter buildTraceExporter(
Configuration configuration, TelemetryClient telemetryClient, QuickPulse quickPulse) {
List<Configuration.SamplingOverride> exceptionSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.EXCEPTION)
.collect(Collectors.toList());
Configuration configuration, TelemetryClient telemetryClient, SpanDataMapper mapper) {
startupLogger.info("calling createSpanExporter");
SpanExporter spanExporter =
createSpanExporter(
telemetryClient,
quickPulse,
configuration.preview.captureHttpServer4xxAsError,
new SamplingOverrides(exceptionSamplingOverrides));
mapper);

return wrapSpanExporter(spanExporter, configuration);
}
Expand Down Expand Up @@ -519,15 +526,15 @@ private static Set<Feature> initStatsbeatFeatureSet(Configuration config) {
}

private static SdkTracerProviderBuilder configureTracing(
SdkTracerProviderBuilder tracerProvider, Configuration configuration) {
SdkTracerProviderBuilder tracerProvider, Configuration configuration, QuickPulse quickPulse, SpanDataMapper mapper) {

boolean enabled = !Strings.isNullOrEmpty(configuration.connectionString);
RuntimeConfigurator.updatePropagation(
!configuration.preview.disablePropagation && enabled,
configuration.preview.additionalPropagators,
configuration.preview.legacyRequestIdPropagation.enabled);
RuntimeConfigurator.updateSampling(
enabled, configuration.sampling, configuration.preview.sampling);
enabled, configuration.sampling, configuration.preview.sampling, quickPulse);

tracerProvider.addSpanProcessor(new AzureMonitorSpanProcessor());
if (!configuration.preview.inheritedAttributes.isEmpty()) {
Expand All @@ -551,51 +558,60 @@ private static SdkTracerProviderBuilder configureTracing(
tracerProvider.addSpanProcessor(new AiLegacyHeaderSpanProcessor());
}

if (quickPulse != null) {
tracerProvider.addSpanProcessor(new LiveMetricsSpanProcessor(quickPulse, mapper));
}

return tracerProvider;
}

private static SpanExporter createSpanExporter(
private static SpanDataMapper createSpanDataMapper(
TelemetryClient telemetryClient,
@Nullable QuickPulse quickPulse,
boolean captureHttpServer4xxAsError,
SamplingOverrides exceptionSamplingOverrides) {
return new SpanDataMapper(
captureHttpServer4xxAsError,
telemetryClient::populateDefaults,
(event, instrumentationName) -> {
boolean lettuce51 = instrumentationName.equals("io.opentelemetry.lettuce-5.1");
if (lettuce51 && event.getName().startsWith("redis.encode.")) {
// special case as these are noisy and come from the underlying library itself
return true;
}
boolean grpc16 = instrumentationName.equals("io.opentelemetry.grpc-1.6");
if (grpc16 && event.getName().equals("message")) {
// OpenTelemetry semantic conventions define semi-noisy grpc events
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md#events
//
// we want to suppress these (at least by default)
return true;
}
return false;
},
(span, event) -> {
AiFixedPercentageSampler sampler =
exceptionSamplingOverrides.getOverride(event.getAttributes());
startupLogger.info("Calling span sampling function with span id {}, event name {}, is sampler null {}, sampler decision {}",
span.getSpanContext().getSpanId(), event.getName(), sampler != null);
return sampler != null
&& sampler
.shouldSampleLog(
span.getSpanContext(),
span.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE))
.getDecision()
== SamplingDecision.DROP;
});
}

SpanDataMapper mapper =
new SpanDataMapper(
captureHttpServer4xxAsError,
telemetryClient::populateDefaults,
(event, instrumentationName) -> {
boolean lettuce51 = instrumentationName.equals("io.opentelemetry.lettuce-5.1");
if (lettuce51 && event.getName().startsWith("redis.encode.")) {
// special case as these are noisy and come from the underlying library itself
return true;
}
boolean grpc16 = instrumentationName.equals("io.opentelemetry.grpc-1.6");
if (grpc16 && event.getName().equals("message")) {
// OpenTelemetry semantic conventions define semi-noisy grpc events
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md#events
//
// we want to suppress these (at least by default)
return true;
}
return false;
},
(span, event) -> {
AiFixedPercentageSampler sampler =
exceptionSamplingOverrides.getOverride(event.getAttributes());
return sampler != null
&& sampler
.shouldSampleLog(
span.getSpanContext(),
span.getAttributes().get(AiSemanticAttributes.SAMPLE_RATE))
.getDecision()
== SamplingDecision.DROP;
});

BatchItemProcessor batchItemProcessor = telemetryClient.getGeneralBatchItemProcessor();
private static SpanExporter createSpanExporter(
TelemetryClient telemetryClient,
SpanDataMapper mapper) {

BatchItemProcessor batchItemProcessor = telemetryClient.getGeneralBatchItemProcessor();
startupLogger.info("Create agentSpanExporter for statsbeat");
return new StatsbeatSpanExporter(
new AgentSpanExporter(mapper, quickPulse, batchItemProcessor),
new AgentSpanExporter(mapper, batchItemProcessor),
telemetryClient.getStatsbeatModule());
}

Expand All @@ -611,9 +627,11 @@ private static SpanExporter wrapSpanExporter(
for (ProcessorConfig processorConfig : processorConfigs) {
switch (processorConfig.type) {
case ATTRIBUTE:
startupLogger.info("Adding attribute processor to span exporter");
spanExporter = new SpanExporterWithAttributeProcessor(processorConfig, spanExporter);
break;
case SPAN:
startupLogger.info("Adding span processor to span exporter");
spanExporter = new ExporterWithSpanProcessor(processorConfig, spanExporter);
break;
default:
Expand Down
Loading
Loading