From b838c12e950dddf37608e9bdc8070f2dbf6e4716 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 8 Jul 2025 13:06:01 -0700 Subject: [PATCH 1/2] Fix live metrics race condition --- .../quickpulse/QuickPulseDataCollector.java | 106 +++++++++++------- 1 file changed, 66 insertions(+), 40 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java index c11f574997dd..e1a6603c231a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java +++ b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java @@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; final class QuickPulseDataCollector { @@ -53,8 +55,10 @@ final class QuickPulseDataCollector { private static final OperatingSystemMXBean operatingSystemMxBean = ManagementFactory.getOperatingSystemMXBean(); private final AtomicReference counters = new AtomicReference<>(null); - private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator - = getCpuPerformanceCounterCalculator(); + private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator = getCpuPerformanceCounterCalculator(); + + // used to prevent race condition between processing a telemetry item and reporting it to the Quick Pulse service + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private volatile QuickPulseStatus quickPulseStatus = QuickPulseStatus.QP_IS_OFF; @@ -91,25 +95,35 @@ synchronized QuickPulseStatus getQuickPulseStatus() { } @Nullable - synchronized FinalCounters getAndRestart() { - FilteringConfiguration config = configuration.get(); + FinalCounters getAndRestart() { + lock.writeLock().lock(); + try { + FilteringConfiguration config = configuration.get(); Counters currentCounters = counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors())); - if (currentCounters != null) { - return new FinalCounters(currentCounters); - } + if (currentCounters != null) { + return new FinalCounters(currentCounters); + } - return null; + return null; + } finally { + lock.writeLock().unlock(); + } } // only used by tests @Nullable - synchronized FinalCounters peek() { - Counters currentCounters = this.counters.get(); // this should be the only differece - if (currentCounters != null) { - return new FinalCounters(currentCounters); + FinalCounters peek() { + lock.readLock().lock(); + try { + Counters currentCounters = this.counters.get(); // this should be the only differece + if (currentCounters != null) { + return new FinalCounters(currentCounters); + } + return null; + } finally { + lock.readLock().unlock(); } - return null; } void add(TelemetryItem telemetryItem) { @@ -130,15 +144,37 @@ void add(TelemetryItem telemetryItem) { int itemCount = sampleRate == null ? 1 : Math.round(100 / sampleRate); FilteringConfiguration currentConfig = configuration.get(); MonitorDomain data = telemetryItem.getData().getBaseData(); - if (data instanceof RequestData) { - RequestData requestTelemetry = (RequestData) data; - addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig); - } else if (data instanceof RemoteDependencyData) { - addDependency((RemoteDependencyData) data, itemCount, currentConfig); - } else if (data instanceof TelemetryExceptionData) { - addException((TelemetryExceptionData) data, itemCount, currentConfig); - } else if (data instanceof MessageData) { - addTrace((MessageData) data, currentConfig); + + if (!(data instanceof RequestData) && !(data instanceof RemoteDependencyData) + && !(data instanceof TelemetryExceptionData) && !(data instanceof MessageData)) { + // optimization before acquiring lock + return; + } + + Counters counters = this.counters.get(); + if (counters == null) { + // optimization before acquiring lock + return; + } + + lock.readLock().lock(); + try { + counters = this.counters.get(); + if (counters == null) { + return; + } + if (data instanceof RequestData) { + RequestData requestTelemetry = (RequestData) data; + addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig, counters); + } else if (data instanceof RemoteDependencyData) { + addDependency((RemoteDependencyData) data, itemCount, currentConfig, counters); + } else if (data instanceof TelemetryExceptionData) { + addException((TelemetryExceptionData) data, itemCount, currentConfig, counters); + } else if (data instanceof MessageData) { + addTrace((MessageData) data, currentConfig, counters); + } + } finally { + lock.readLock().unlock(); } } @@ -188,11 +224,8 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr } } - private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } + private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig, Counters counters) { + long durationMillis = parseDurationToMillis(telemetry.getDuration()); counters.rddsAndDuations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis)); Boolean success = telemetry.isSuccess(); @@ -222,12 +255,8 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter } private void addException(TelemetryExceptionData exceptionData, int itemCount, - FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } - + FilteringConfiguration currentConfig, Counters counters) { + counters.exceptions.addAndGet(itemCount); ExceptionDataColumns columns = new ExceptionDataColumns(exceptionData); @@ -256,11 +285,8 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount, } private void addRequest(RequestData requestTelemetry, int itemCount, String operationName, - FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); - if (counters == null) { - return; - } + FilteringConfiguration currentConfig, Counters counters) { + long durationMillis = parseDurationToMillis(requestTelemetry.getDuration()); counters.requestsAndDurations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis)); if (!requestTelemetry.isSuccess()) { @@ -289,8 +315,8 @@ private void addRequest(RequestData requestTelemetry, int itemCount, String oper } } - private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig) { - Counters counters = this.counters.get(); + private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig, Counters counters) { + TraceDataColumns columns = new TraceDataColumns(traceTelemetry); applyMetricFilters(columns, TelemetryType.TRACE, currentConfig, counters); List documentStreamIds = new ArrayList<>(); From 0a53a531b4e7962f3b4d59cf6f37cb72db1de9d3 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 13 Jul 2025 14:04:21 -0700 Subject: [PATCH 2/2] format --- .../quickpulse/QuickPulseDataCollector.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java index e1a6603c231a..d252f2e4c7ee 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java +++ b/sdk/monitor/azure-monitor-opentelemetry-autoconfigure/src/main/java/com/azure/monitor/opentelemetry/autoconfigure/implementation/quickpulse/QuickPulseDataCollector.java @@ -55,7 +55,8 @@ final class QuickPulseDataCollector { private static final OperatingSystemMXBean operatingSystemMxBean = ManagementFactory.getOperatingSystemMXBean(); private final AtomicReference counters = new AtomicReference<>(null); - private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator = getCpuPerformanceCounterCalculator(); + private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator + = getCpuPerformanceCounterCalculator(); // used to prevent race condition between processing a telemetry item and reporting it to the Quick Pulse service private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -99,8 +100,8 @@ FinalCounters getAndRestart() { lock.writeLock().lock(); try { FilteringConfiguration config = configuration.get(); - Counters currentCounters - = counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors())); + Counters currentCounters + = counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors())); if (currentCounters != null) { return new FinalCounters(currentCounters); } @@ -145,8 +146,10 @@ void add(TelemetryItem telemetryItem) { FilteringConfiguration currentConfig = configuration.get(); MonitorDomain data = telemetryItem.getData().getBaseData(); - if (!(data instanceof RequestData) && !(data instanceof RemoteDependencyData) - && !(data instanceof TelemetryExceptionData) && !(data instanceof MessageData)) { + if (!(data instanceof RequestData) + && !(data instanceof RemoteDependencyData) + && !(data instanceof TelemetryExceptionData) + && !(data instanceof MessageData)) { // optimization before acquiring lock return; } @@ -224,7 +227,8 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr } } - private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig, Counters counters) { + private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig, + Counters counters) { long durationMillis = parseDurationToMillis(telemetry.getDuration()); counters.rddsAndDuations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis)); @@ -254,9 +258,9 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter } } - private void addException(TelemetryExceptionData exceptionData, int itemCount, - FilteringConfiguration currentConfig, Counters counters) { - + private void addException(TelemetryExceptionData exceptionData, int itemCount, FilteringConfiguration currentConfig, + Counters counters) { + counters.exceptions.addAndGet(itemCount); ExceptionDataColumns columns = new ExceptionDataColumns(exceptionData);