Skip to content

Commit c016afc

Browse files
committed
Improving threading safety for telemetry
1 parent 03ef036 commit c016afc

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
1414
import java.util.concurrent.Executors
1515
import kotlin.math.min
1616
import kotlin.math.roundToInt
17+
import java.util.concurrent.atomic.AtomicBoolean
1718

1819
class MetricsRequestFactory : RequestFactory() {
1920
override fun upload(apiHost: String): HttpURLConnection {
@@ -76,7 +77,14 @@ object Telemetry: Subscriber {
7677
var host: String = Constants.DEFAULT_API_HOST
7778
// 1.0 is 100%, will get set by Segment setting before start()
7879
// Values are adjusted by the sampleRate on send
79-
var sampleRate: Double = 1.0
80+
@Volatile private var _sampleRate: Double = 1.0
81+
var sampleRate: Double
82+
get() = _sampleRate
83+
set(value) {
84+
synchronized(this) {
85+
_sampleRate = value
86+
}
87+
}
8088
var flushTimer: Int = 30 * 1000 // 30s
8189
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory())
8290
var sendWriteKeyOnError: Boolean = true
@@ -93,9 +101,9 @@ object Telemetry: Subscriber {
93101

94102
private val queue = ConcurrentLinkedQueue<RemoteMetric>()
95103
private var queueBytes = 0
96-
private var started = false
104+
private var started = AtomicBoolean(false)
97105
private var rateLimitEndTime: Long = 0
98-
private var flushFirstError = true
106+
private var flushFirstError = AtomicBoolean(true)
99107
private val exceptionHandler = CoroutineExceptionHandler { _, t ->
100108
errorHandler?.let {
101109
it( Exception(
@@ -113,8 +121,8 @@ object Telemetry: Subscriber {
113121
* Called automatically when Telemetry.enable is set to true and when configuration data is received from Segment.
114122
*/
115123
fun start() {
116-
if (!enable || started || sampleRate == 0.0) return
117-
started = true
124+
if (!enable || started.get() || sampleRate == 0.0) return
125+
started.set(true)
118126

119127
// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values
120128
if (Math.random() > sampleRate) {
@@ -124,7 +132,7 @@ object Telemetry: Subscriber {
124132
telemetryJob = telemetryScope.launch(telemetryDispatcher) {
125133
while (isActive) {
126134
if (!enable) {
127-
started = false
135+
started.set(false)
128136
return@launch
129137
}
130138
try {
@@ -148,7 +156,7 @@ object Telemetry: Subscriber {
148156
fun reset() {
149157
telemetryJob?.cancel()
150158
resetQueue()
151-
started = false
159+
started.set(false)
152160
rateLimitEndTime = 0
153161
}
154162

@@ -202,8 +210,8 @@ object Telemetry: Subscriber {
202210

203211
addRemoteMetric(metric, filteredTags, log=logData)
204212

205-
if(flushFirstError) {
206-
flushFirstError = false
213+
if(flushFirstError.get()) {
214+
flushFirstError.set(false)
207215
flush()
208216
}
209217
}
@@ -218,7 +226,6 @@ object Telemetry: Subscriber {
218226

219227
try {
220228
send()
221-
queueBytes = 0
222229
} catch (error: Throwable) {
223230
errorHandler?.invoke(error)
224231
sampleRate = 0.0
@@ -227,16 +234,14 @@ object Telemetry: Subscriber {
227234

228235
private fun send() {
229236
if (sampleRate == 0.0) return
230-
var queueCount = queue.size
231-
// Reset queue data size counter since all current queue items will be removed
232-
queueBytes = 0
233-
val sendQueue = mutableListOf<RemoteMetric>()
234-
while (queueCount-- > 0 && !queue.isEmpty()) {
235-
val m = queue.poll()
236-
if(m != null) {
237-
m.value = (m.value / sampleRate).roundToInt()
238-
sendQueue.add(m)
239-
}
237+
val sendQueue: MutableList<RemoteMetric>
238+
synchronized(queue) {
239+
sendQueue = queue.toMutableList()
240+
queue.clear()
241+
queueBytes = 0
242+
}
243+
sendQueue.forEach { m ->
244+
m.value = (m.value / sampleRate).roundToInt()
240245
}
241246
try {
242247
// Json.encodeToString by default does not include default values
@@ -309,9 +314,11 @@ object Telemetry: Subscriber {
309314
tags = fullTags
310315
)
311316
val newMetricSize = newMetric.toString().toByteArray().size
312-
if (queueBytes + newMetricSize <= maxQueueBytes) {
313-
queue.add(newMetric)
314-
queueBytes += newMetricSize
317+
synchronized(queue) {
318+
if (queueBytes + newMetricSize <= maxQueueBytes) {
319+
queue.add(newMetric)
320+
queueBytes += newMetricSize
321+
}
315322
}
316323
}
317324

@@ -338,7 +345,9 @@ object Telemetry: Subscriber {
338345
}
339346

340347
private fun resetQueue() {
341-
queue.clear()
342-
queueBytes = 0
348+
synchronized(queue) {
349+
queue.clear()
350+
queueBytes = 0
351+
}
343352
}
344353
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ import java.util.concurrent.ConcurrentLinkedQueue
1010
import java.util.concurrent.CountDownLatch
1111
import java.util.concurrent.Executors
1212
import java.util.concurrent.TimeUnit
13+
import java.util.concurrent.atomic.AtomicBoolean
1314
import kotlin.random.Random
1415

1516
class TelemetryTest {
1617
fun TelemetryResetFlushFirstError() {
1718
val field: Field = Telemetry::class.java.getDeclaredField("flushFirstError")
1819
field.isAccessible = true
19-
field.set(true, true)
20+
val atomicBoolean = field.get(Telemetry) as AtomicBoolean
21+
atomicBoolean.set(true)
2022
}
2123
fun TelemetryQueueSize(): Int {
2224
val queueField: Field = Telemetry::class.java.getDeclaredField("queue")
@@ -29,11 +31,11 @@ class TelemetryTest {
2931
queueBytesField.isAccessible = true
3032
return queueBytesField.get(Telemetry) as Int
3133
}
32-
var TelemetryStarted: Boolean
34+
var TelemetryStarted: AtomicBoolean
3335
get() {
3436
val startedField: Field = Telemetry::class.java.getDeclaredField("started")
3537
startedField.isAccessible = true
36-
return startedField.get(Telemetry) as Boolean
38+
return startedField.get(Telemetry) as AtomicBoolean
3739
}
3840
set(value) {
3941
val startedField: Field = Telemetry::class.java.getDeclaredField("started")
@@ -78,11 +80,11 @@ class TelemetryTest {
7880
Telemetry.sampleRate = 0.0
7981
Telemetry.enable = true
8082
Telemetry.start()
81-
assertEquals(false, TelemetryStarted)
83+
assertEquals(false, TelemetryStarted.get())
8284

8385
Telemetry.sampleRate = 1.0
8486
Telemetry.start()
85-
assertEquals(true, TelemetryStarted)
87+
assertEquals(true, TelemetryStarted.get())
8688
assertEquals(0,errors.size)
8789
}
8890

0 commit comments

Comments
 (0)