Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

final class QuickPulseDataCollector {
Expand All @@ -56,6 +58,9 @@ final class QuickPulseDataCollector {
private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator
= getCpuPerformanceCounterCalculator();

// used to prevent race condition between processing a telemetry item and reporting it to the Quick Pulse service
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private volatile QuickPulseStatus quickPulseStatus = QuickPulseStatus.QP_IS_OFF;

private volatile Supplier<String> instrumentationKeySupplier;
Expand Down Expand Up @@ -91,25 +96,35 @@ synchronized QuickPulseStatus getQuickPulseStatus() {
}

@Nullable
synchronized FinalCounters getAndRestart() {
FilteringConfiguration config = configuration.get();
Counters currentCounters
= counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors()));
if (currentCounters != null) {
return new FinalCounters(currentCounters);
}
FinalCounters getAndRestart() {
lock.writeLock().lock();
try {
FilteringConfiguration config = configuration.get();
Counters currentCounters
= counters.getAndSet(new Counters(config.getValidProjectionInitInfo(), config.getErrors()));
if (currentCounters != null) {
return new FinalCounters(currentCounters);
}

return null;
return null;
} finally {
lock.writeLock().unlock();
}
}

// only used by tests
@Nullable
synchronized FinalCounters peek() {
Counters currentCounters = this.counters.get(); // this should be the only differece
if (currentCounters != null) {
return new FinalCounters(currentCounters);
FinalCounters peek() {
lock.readLock().lock();
try {
Counters currentCounters = this.counters.get(); // this should be the only differece
if (currentCounters != null) {
return new FinalCounters(currentCounters);
}
return null;
} finally {
lock.readLock().unlock();
}
return null;
}

void add(TelemetryItem telemetryItem) {
Expand All @@ -130,15 +145,39 @@ void add(TelemetryItem telemetryItem) {
int itemCount = sampleRate == null ? 1 : Math.round(100 / sampleRate);
FilteringConfiguration currentConfig = configuration.get();
MonitorDomain data = telemetryItem.getData().getBaseData();
if (data instanceof RequestData) {
RequestData requestTelemetry = (RequestData) data;
addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig);
} else if (data instanceof RemoteDependencyData) {
addDependency((RemoteDependencyData) data, itemCount, currentConfig);
} else if (data instanceof TelemetryExceptionData) {
addException((TelemetryExceptionData) data, itemCount, currentConfig);
} else if (data instanceof MessageData) {
addTrace((MessageData) data, currentConfig);

if (!(data instanceof RequestData)
&& !(data instanceof RemoteDependencyData)
&& !(data instanceof TelemetryExceptionData)
&& !(data instanceof MessageData)) {
// optimization before acquiring lock
return;
}

Counters counters = this.counters.get();
if (counters == null) {
// optimization before acquiring lock
return;
}

lock.readLock().lock();
try {
counters = this.counters.get();
if (counters == null) {
return;
}
if (data instanceof RequestData) {
RequestData requestTelemetry = (RequestData) data;
addRequest(requestTelemetry, itemCount, getOperationName(telemetryItem), currentConfig, counters);
} else if (data instanceof RemoteDependencyData) {
addDependency((RemoteDependencyData) data, itemCount, currentConfig, counters);
} else if (data instanceof TelemetryExceptionData) {
addException((TelemetryExceptionData) data, itemCount, currentConfig, counters);
} else if (data instanceof MessageData) {
addTrace((MessageData) data, currentConfig, counters);
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -188,11 +227,9 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr
}
}

private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig) {
Counters counters = this.counters.get();
if (counters == null) {
return;
}
private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig,
Counters counters) {

long durationMillis = parseDurationToMillis(telemetry.getDuration());
counters.rddsAndDuations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis));
Boolean success = telemetry.isSuccess();
Expand Down Expand Up @@ -221,12 +258,8 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter
}
}

private void addException(TelemetryExceptionData exceptionData, int itemCount,
FilteringConfiguration currentConfig) {
Counters counters = this.counters.get();
if (counters == null) {
return;
}
private void addException(TelemetryExceptionData exceptionData, int itemCount, FilteringConfiguration currentConfig,
Counters counters) {

counters.exceptions.addAndGet(itemCount);

Expand Down Expand Up @@ -256,11 +289,8 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount,
}

private void addRequest(RequestData requestTelemetry, int itemCount, String operationName,
FilteringConfiguration currentConfig) {
Counters counters = this.counters.get();
if (counters == null) {
return;
}
FilteringConfiguration currentConfig, Counters counters) {

long durationMillis = parseDurationToMillis(requestTelemetry.getDuration());
counters.requestsAndDurations.addAndGet(Counters.encodeCountAndDuration(itemCount, durationMillis));
if (!requestTelemetry.isSuccess()) {
Expand Down Expand Up @@ -289,8 +319,8 @@ private void addRequest(RequestData requestTelemetry, int itemCount, String oper
}
}

private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig) {
Counters counters = this.counters.get();
private void addTrace(MessageData traceTelemetry, FilteringConfiguration currentConfig, Counters counters) {

TraceDataColumns columns = new TraceDataColumns(traceTelemetry);
applyMetricFilters(columns, TelemetryType.TRACE, currentConfig, counters);
List<String> documentStreamIds = new ArrayList<>();
Expand Down