diff --git a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java index c11f574997dd..d252f2e4c7ee 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java +++ b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java @@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; final class QuickPulseDataCollector { @@ -56,6 +58,9 @@ final class QuickPulseDataCollector { private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator = getCpuPerformanceCounterCalculator(); + // used to prevent race condition between processing a telemetry item and reporting it to the Quick Pulse service + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private volatile QuickPulseStatus quickPulseStatus = QuickPulseStatus.QP_IS_OFF; private volatile Supplier instrumentationKeySupplier; @@ -91,25 +96,35 @@ synchronized QuickPulseStatus getQuickPulseStatus() { } @Nullable - synchronized FinalCounters getAndRestart() { - FilteringConfiguration config = configuration.get(); - Counters currentCounters - = counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors())); - if (currentCounters != null) { - return new FinalCounters(currentCounters); - } + FinalCounters getAndRestart() { + lock.writeLock().lock(); + try { + FilteringConfiguration config = configuration.get(); + Counters currentCounters + = counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors())); + if (currentCounters != null) { + return new FinalCounters(currentCounters); + } - return null; + return null; + } finally { + lock.writeLock().unlock(); + } } // only used by tests @Nullable - synchronized FinalCounters peek() { - Counters currentCounters = this.counters.get(); // this should be the only differece - if (currentCounters != null) { - return new FinalCounters(currentCounters); + FinalCounters peek() { + lock.readLock().lock(); + try { + Counters currentCounters = this.counters.get(); // this should be the only differece + if (currentCounters != null) { + return new FinalCounters(currentCounters); + } + return null; + } finally { + lock.readLock().unlock(); } - return null; } void add(TelemetryItem telemetryItem) { @@ -130,15 +145,39 @@ void add(TelemetryItem telemetryItem) { int itemCount = sampleRate == null ? 1 : Math.round(100 / sampleRate); FilteringConfiguration currentConfig = configuration.get(); MonitorDomain data = telemetryItem.getData().getBaseData(); - if (data instanceof RequestData) { - RequestData requestTelemetry = (RequestData) data; - addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig); - } else if (data instanceof RemoteDependencyData) { - addDependency((RemoteDependencyData) data, itemCount, currentConfig); - } else if (data instanceof TelemetryExceptionData) { - addException((TelemetryExceptionData) data, itemCount, currentConfig); - } else if (data instanceof MessageData) { - addTrace((MessageData) data, currentConfig); + + if (!(data instanceof RequestData) + && !(data instanceof RemoteDependencyData) + && !(data instanceof TelemetryExceptionData) + && !(data instanceof MessageData)) { + // optimization before acquiring lock + return; + } + + Counters counters = this.counters.get(); + if (counters == null) { + // optimization before acquiring lock + return; + } + + lock.readLock().lock(); + try { + counters = this.counters.get(); + if (counters == null) { + return; + } + if (data instanceof RequestData) { + RequestData requestTelemetry = (RequestData) data; + addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig, counters); + } else if (data instanceof RemoteDependencyData) { + addDependency((RemoteDependencyData) data, itemCount, currentConfig, counters); + } else if (data instanceof TelemetryExceptionData) { + addException((TelemetryExceptionData) data, itemCount, currentConfig, counters); + } else if (data instanceof MessageData) { + addTrace((MessageData) data, currentConfig, counters); + } + } finally { + lock.readLock().unlock(); } } @@ -188,11 +227,9 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr } } - private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } + private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig, + Counters counters) { + long durationMillis = parseDurationToMillis(telemetry.getDuration()); counters.rddsAndDuations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis)); Boolean success = telemetry.isSuccess(); @@ -221,12 +258,8 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter } } - private void addException(TelemetryExceptionData exceptionData, int itemCount, - FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } + private void addException(TelemetryExceptionData exceptionData, int itemCount, FilteringConfiguration currentConfig, + Counters counters) { counters.exceptions.addAndGet(itemCount); @@ -256,11 +289,8 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount, } private void addRequest(RequestData requestTelemetry, int itemCount, String operationName, - FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } + FilteringConfiguration currentConfig, Counters counters) { + long durationMillis = parseDurationToMillis(requestTelemetry.getDuration()); counters.requestsAndDurations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis)); if (!requestTelemetry.isSuccess()) { @@ -289,8 +319,8 @@ private void addRequest(RequestData requestTelemetry, int itemCount, String oper } } - private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); + private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig, Counters counters) { + TraceDataColumns columns = new TraceDataColumns(traceTelemetry); applyMetricFilters(columns, TelemetryType.TRACE, currentConfig, counters); List documentStreamIds = new ArrayList<>();