44
44
import java .util .concurrent .atomic .AtomicInteger ;
45
45
import java .util .concurrent .atomic .AtomicLong ;
46
46
import java .util .concurrent .atomic .AtomicReference ;
47
+ import java .util .concurrent .locks .ReadWriteLock ;
48
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
47
49
import java .util .function .Supplier ;
48
50
49
51
final class QuickPulseDataCollector {
@@ -56,6 +58,9 @@ final class QuickPulseDataCollector {
56
58
private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator
57
59
= getCpuPerformanceCounterCalculator ();
58
60
61
+ // used to prevent race condition between processing a telemetry item and reporting it to the Quick Pulse service
62
+ private final ReadWriteLock lock = new ReentrantReadWriteLock ();
63
+
59
64
private volatile QuickPulseStatus quickPulseStatus = QuickPulseStatus .QP_IS_OFF ;
60
65
61
66
private volatile Supplier <String > instrumentationKeySupplier ;
@@ -91,25 +96,35 @@ synchronized QuickPulseStatus getQuickPulseStatus() {
91
96
}
92
97
93
98
@ Nullable
94
- synchronized FinalCounters getAndRestart () {
95
- FilteringConfiguration config = configuration .get ();
96
- Counters currentCounters
97
- = counters .getAndSet (new Counters (config .getValidProjectionInitInfo (), config .getErrors ()));
98
- if (currentCounters != null ) {
99
- return new FinalCounters (currentCounters );
100
- }
99
+ FinalCounters getAndRestart () {
100
+ lock .writeLock ().lock ();
101
+ try {
102
+ FilteringConfiguration config = configuration .get ();
103
+ Counters currentCounters
104
+ = counters .getAndSet (new Counters (config .getValidProjectionInitInfo (), config .getErrors ()));
105
+ if (currentCounters != null ) {
106
+ return new FinalCounters (currentCounters );
107
+ }
101
108
102
- return null ;
109
+ return null ;
110
+ } finally {
111
+ lock .writeLock ().unlock ();
112
+ }
103
113
}
104
114
105
115
// only used by tests
106
116
@ Nullable
107
- synchronized FinalCounters peek () {
108
- Counters currentCounters = this .counters .get (); // this should be the only differece
109
- if (currentCounters != null ) {
110
- return new FinalCounters (currentCounters );
117
+ FinalCounters peek () {
118
+ lock .readLock ().lock ();
119
+ try {
120
+ Counters currentCounters = this .counters .get (); // this should be the only differece
121
+ if (currentCounters != null ) {
122
+ return new FinalCounters (currentCounters );
123
+ }
124
+ return null ;
125
+ } finally {
126
+ lock .readLock ().unlock ();
111
127
}
112
- return null ;
113
128
}
114
129
115
130
void add (TelemetryItem telemetryItem ) {
@@ -130,15 +145,39 @@ void add(TelemetryItem telemetryItem) {
130
145
int itemCount = sampleRate == null ? 1 : Math .round (100 / sampleRate );
131
146
FilteringConfiguration currentConfig = configuration .get ();
132
147
MonitorDomain data = telemetryItem .getData ().getBaseData ();
133
- if (data instanceof RequestData ) {
134
- RequestData requestTelemetry = (RequestData ) data ;
135
- addRequest (requestTelemetry , itemCount , getOperationName (telemetryItem ), currentConfig );
136
- } else if (data instanceof RemoteDependencyData ) {
137
- addDependency ((RemoteDependencyData ) data , itemCount , currentConfig );
138
- } else if (data instanceof TelemetryExceptionData ) {
139
- addException ((TelemetryExceptionData ) data , itemCount , currentConfig );
140
- } else if (data instanceof MessageData ) {
141
- addTrace ((MessageData ) data , currentConfig );
148
+
149
+ if (!(data instanceof RequestData )
150
+ && !(data instanceof RemoteDependencyData )
151
+ && !(data instanceof TelemetryExceptionData )
152
+ && !(data instanceof MessageData )) {
153
+ // optimization before acquiring lock
154
+ return ;
155
+ }
156
+
157
+ Counters counters = this .counters .get ();
158
+ if (counters == null ) {
159
+ // optimization before acquiring lock
160
+ return ;
161
+ }
162
+
163
+ lock .readLock ().lock ();
164
+ try {
165
+ counters = this .counters .get ();
166
+ if (counters == null ) {
167
+ return ;
168
+ }
169
+ if (data instanceof RequestData ) {
170
+ RequestData requestTelemetry = (RequestData ) data ;
171
+ addRequest (requestTelemetry , itemCount , getOperationName (telemetryItem ), currentConfig , counters );
172
+ } else if (data instanceof RemoteDependencyData ) {
173
+ addDependency ((RemoteDependencyData ) data , itemCount , currentConfig , counters );
174
+ } else if (data instanceof TelemetryExceptionData ) {
175
+ addException ((TelemetryExceptionData ) data , itemCount , currentConfig , counters );
176
+ } else if (data instanceof MessageData ) {
177
+ addTrace ((MessageData ) data , currentConfig , counters );
178
+ }
179
+ } finally {
180
+ lock .readLock ().unlock ();
142
181
}
143
182
}
144
183
@@ -188,11 +227,9 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr
188
227
}
189
228
}
190
229
191
- private void addDependency (RemoteDependencyData telemetry , int itemCount , FilteringConfiguration currentConfig ) {
192
- Counters counters = this .counters .get ();
193
- if (counters == null ) {
194
- return ;
195
- }
230
+ private void addDependency (RemoteDependencyData telemetry , int itemCount , FilteringConfiguration currentConfig ,
231
+ Counters counters ) {
232
+
196
233
long durationMillis = parseDurationToMillis (telemetry .getDuration ());
197
234
counters .rddsAndDuations .addAndGet (Counters .encodeCountAndDuration (itemCount , durationMillis ));
198
235
Boolean success = telemetry .isSuccess ();
@@ -221,12 +258,8 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter
221
258
}
222
259
}
223
260
224
- private void addException (TelemetryExceptionData exceptionData , int itemCount ,
225
- FilteringConfiguration currentConfig ) {
226
- Counters counters = this .counters .get ();
227
- if (counters == null ) {
228
- return ;
229
- }
261
+ private void addException (TelemetryExceptionData exceptionData , int itemCount , FilteringConfiguration currentConfig ,
262
+ Counters counters ) {
230
263
231
264
counters .exceptions .addAndGet (itemCount );
232
265
@@ -256,11 +289,8 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount,
256
289
}
257
290
258
291
private void addRequest (RequestData requestTelemetry , int itemCount , String operationName ,
259
- FilteringConfiguration currentConfig ) {
260
- Counters counters = this .counters .get ();
261
- if (counters == null ) {
262
- return ;
263
- }
292
+ FilteringConfiguration currentConfig , Counters counters ) {
293
+
264
294
long durationMillis = parseDurationToMillis (requestTelemetry .getDuration ());
265
295
counters .requestsAndDurations .addAndGet (Counters .encodeCountAndDuration (itemCount , durationMillis ));
266
296
if (!requestTelemetry .isSuccess ()) {
@@ -289,8 +319,8 @@ private void addRequest(RequestData requestTelemetry, int itemCount, String oper
289
319
}
290
320
}
291
321
292
- private void addTrace (MessageData traceTelemetry , FilteringConfiguration currentConfig ) {
293
- Counters counters = this . counters . get ();
322
+ private void addTrace (MessageData traceTelemetry , FilteringConfiguration currentConfig , Counters counters ) {
323
+
294
324
TraceDataColumns columns = new TraceDataColumns (traceTelemetry );
295
325
applyMetricFilters (columns , TelemetryType .TRACE , currentConfig , counters );
296
326
List <String > documentStreamIds = new ArrayList <>();
0 commit comments