Skip to content

Commit c03f7a0

Browse files
committed
Another attempt without synchronized, tests need work
1 parent c016afc commit c03f7a0

File tree

2 files changed

+77
-45
lines changed

2 files changed

+77
-45
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import java.util.concurrent.Executors
1515
import kotlin.math.min
1616
import kotlin.math.roundToInt
1717
import java.util.concurrent.atomic.AtomicBoolean
18+
import java.util.concurrent.atomic.AtomicInteger
19+
import java.util.concurrent.atomic.AtomicReference
20+
import kotlinx.coroutines.channels.Channel
1821

1922
class MetricsRequestFactory : RequestFactory() {
2023
override fun upload(apiHost: String): HttpURLConnection {
@@ -77,21 +80,14 @@ object Telemetry: Subscriber {
7780
var host: String = Constants.DEFAULT_API_HOST
7881
// 1.0 is 100%, will get set by Segment setting before start()
7982
// Values are adjusted by the sampleRate on send
80-
@Volatile private var _sampleRate: Double = 1.0
81-
var sampleRate: Double
82-
get() = _sampleRate
83-
set(value) {
84-
synchronized(this) {
85-
_sampleRate = value
86-
}
87-
}
88-
var flushTimer: Int = 30 * 1000 // 30s
83+
private var sampleRate = AtomicReference<Double>(1.0)
84+
private var flushTimer: Int = 30 * 1000 // 30s
8985
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory())
9086
var sendWriteKeyOnError: Boolean = true
9187
var sendErrorLogData: Boolean = false
9288
var errorHandler: ((Throwable) -> Unit)? = ::logError
93-
var maxQueueSize: Int = 20
94-
var errorLogSizeMax: Int = 4000
89+
private var maxQueueSize: Int = 20
90+
private var errorLogSizeMax: Int = 4000
9591

9692
private const val MAX_QUEUE_BYTES = 28000
9793
var maxQueueBytes: Int = MAX_QUEUE_BYTES
@@ -100,7 +96,7 @@ object Telemetry: Subscriber {
10096
}
10197

10298
private val queue = ConcurrentLinkedQueue<RemoteMetric>()
103-
private var queueBytes = 0
99+
private var queueBytes = AtomicInteger(0)
104100
private var started = AtomicBoolean(false)
105101
private var rateLimitEndTime: Long = 0
106102
private var flushFirstError = AtomicBoolean(true)
@@ -116,16 +112,27 @@ object Telemetry: Subscriber {
116112
private var telemetryDispatcher: ExecutorCoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
117113
private var telemetryJob: Job? = null
118114

115+
private val flushChannel = Channel<Unit>(Channel.UNLIMITED)
116+
117+
// Start a coroutine to process flush requests
118+
init {
119+
telemetryScope.launch(telemetryDispatcher) {
120+
for (event in flushChannel) {
121+
performFlush()
122+
}
123+
}
124+
}
125+
119126
/**
120127
* Starts the telemetry if it is enabled and not already started, and the sample rate is greater than 0.
121128
* Called automatically when Telemetry.enable is set to true and when configuration data is received from Segment.
122129
*/
123130
fun start() {
124-
if (!enable || started.get() || sampleRate == 0.0) return
131+
if (!enable || started.get() || sampleRate.get() == 0.0) return
125132
started.set(true)
126133

127134
// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values
128-
if (Math.random() > sampleRate) {
135+
if (Math.random() > sampleRate.get()) {
129136
resetQueue()
130137
}
131138

@@ -170,10 +177,10 @@ object Telemetry: Subscriber {
170177
val tags = mutableMapOf<String, String>()
171178
buildTags(tags)
172179

173-
if (!enable || sampleRate == 0.0) return
180+
if (!enable || sampleRate.get() == 0.0) return
174181
if (!metric.startsWith(METRICS_BASE_TAG)) return
175182
if (tags.isEmpty()) return
176-
if (Math.random() > sampleRate) return
183+
if (Math.random() > sampleRate.get()) return
177184

178185
addRemoteMetric(metric, tags)
179186
}
@@ -189,10 +196,10 @@ object Telemetry: Subscriber {
189196
val tags = mutableMapOf<String, String>()
190197
buildTags(tags)
191198

192-
if (!enable || sampleRate == 0.0) return
199+
if (!enable || sampleRate.get() == 0.0) return
193200
if (!metric.startsWith(METRICS_BASE_TAG)) return
194201
if (tags.isEmpty()) return
195-
if (Math.random() > sampleRate) return
202+
if (Math.random() > sampleRate.get()) return
196203

197204
var filteredTags = if(sendWriteKeyOnError) {
198205
tags.toMap()
@@ -216,33 +223,41 @@ object Telemetry: Subscriber {
216223
}
217224
}
218225

219-
@Synchronized
220226
fun flush() {
227+
if (!enable) return
228+
flushChannel.trySend(Unit)
229+
}
230+
231+
private fun performFlush() {
221232
if (!enable || queue.isEmpty()) return
222233
if (rateLimitEndTime > (System.currentTimeMillis() / 1000).toInt()) {
223234
return
224235
}
225236
rateLimitEndTime = 0
226-
237+
flushFirstError.set(false)
227238
try {
228239
send()
229240
} catch (error: Throwable) {
230241
errorHandler?.invoke(error)
231-
sampleRate = 0.0
242+
sampleRate.set(0.0)
232243
}
233244
}
234245

235246
private fun send() {
236-
if (sampleRate == 0.0) return
237-
val sendQueue: MutableList<RemoteMetric>
238-
synchronized(queue) {
239-
sendQueue = queue.toMutableList()
240-
queue.clear()
241-
queueBytes = 0
242-
}
243-
sendQueue.forEach { m ->
244-
m.value = (m.value / sampleRate).roundToInt()
247+
if (sampleRate.get() == 0.0) return
248+
val sendQueue = mutableListOf<RemoteMetric>()
249+
// Reset queue data size counter since all current queue items will be removed
250+
queueBytes.set(0)
251+
var queueCount = queue.size
252+
while(queueCount > 0 && !queue.isEmpty()) {
253+
--queueCount
254+
val m = queue.poll()
255+
if(m != null) {
256+
m.value = (m.value / sampleRate.get()).roundToInt()
257+
sendQueue.add(m)
258+
}
245259
}
260+
assert(queue.size == 0)
246261
try {
247262
// Json.encodeToString by default does not include default values
248263
// We're using this to leave off the 'log' parameter if unset.
@@ -314,10 +329,12 @@ object Telemetry: Subscriber {
314329
tags = fullTags
315330
)
316331
val newMetricSize = newMetric.toString().toByteArray().size
317-
synchronized(queue) {
318-
if (queueBytes + newMetricSize <= maxQueueBytes) {
319-
queue.add(newMetric)
320-
queueBytes += newMetricSize
332+
// Avoid synchronization issue by adding the size before checking.
333+
if (queueBytes.addAndGet(newMetricSize) <= maxQueueBytes) {
334+
queue.add(newMetric)
335+
} else {
336+
if(queueBytes.addAndGet(-newMetricSize) < 0) {
337+
queueBytes.set(0)
321338
}
322339
}
323340
}
@@ -334,7 +351,7 @@ object Telemetry: Subscriber {
334351
private suspend fun systemUpdate(system: com.segment.analytics.kotlin.core.System) {
335352
system.settings?.let { settings ->
336353
settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let {
337-
sampleRate = it
354+
sampleRate.set(it)
338355
// We don't want to start telemetry until two conditions are met:
339356
// Telemetry.enable is set to true
340357
// Settings from the server have adjusted the sampleRate
@@ -345,9 +362,7 @@ object Telemetry: Subscriber {
345362
}
346363

347364
private fun resetQueue() {
348-
synchronized(queue) {
349-
queue.clear()
350-
queueBytes = 0
351-
}
365+
queue.clear()
366+
queueBytes.set(0)
352367
}
353368
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import java.util.concurrent.CountDownLatch
1111
import java.util.concurrent.Executors
1212
import java.util.concurrent.TimeUnit
1313
import java.util.concurrent.atomic.AtomicBoolean
14+
import java.util.concurrent.atomic.AtomicReference
1415
import kotlin.random.Random
1516

1617
class TelemetryTest {
@@ -31,6 +32,22 @@ class TelemetryTest {
3132
queueBytesField.isAccessible = true
3233
return queueBytesField.get(Telemetry) as Int
3334
}
35+
fun TelemetryMaxQueueSize(): Int {
36+
val maxQueueSizeField: Field = Telemetry::class.java.getDeclaredField("maxQueueSize")
37+
maxQueueSizeField.isAccessible = true
38+
return maxQueueSizeField.get(Telemetry) as Int
39+
}
40+
var TelemetrySampleRate: Double
41+
get() {
42+
val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate")
43+
sampleRateField.isAccessible = true
44+
return (sampleRateField.get(Telemetry) as AtomicReference<Double>).get()
45+
}
46+
set(value) {
47+
val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate")
48+
sampleRateField.isAccessible = true
49+
(sampleRateField.get(Telemetry) as AtomicReference<Double>).set(value)
50+
}
3451
var TelemetryStarted: AtomicBoolean
3552
get() {
3653
val startedField: Field = Telemetry::class.java.getDeclaredField("started")
@@ -69,20 +86,20 @@ class TelemetryTest {
6986
Telemetry.reset()
7087
Telemetry.errorHandler = ::errorHandler
7188
errors.clear()
72-
Telemetry.sampleRate = 1.0
89+
TelemetrySampleRate = 1.0
7390
MockKAnnotations.init(this)
7491
mockTelemetryHTTPClient()
7592
// Telemetry.enable = true <- this will call start(), so don't do it here
7693
}
7794

7895
@Test
7996
fun `Test telemetry start`() {
80-
Telemetry.sampleRate = 0.0
97+
TelemetrySampleRate = 0.0
8198
Telemetry.enable = true
8299
Telemetry.start()
83100
assertEquals(false, TelemetryStarted.get())
84101

85-
Telemetry.sampleRate = 1.0
102+
TelemetrySampleRate = 1.0
86103
Telemetry.start()
87104
assertEquals(true, TelemetryStarted.get())
88105
assertEquals(0,errors.size)
@@ -186,11 +203,11 @@ class TelemetryTest {
186203
fun `Test increment and error methods when queue is full`() {
187204
Telemetry.enable = true
188205
Telemetry.start()
189-
for (i in 1..Telemetry.maxQueueSize + 1) {
206+
for (i in 1..TelemetryMaxQueueSize() + 1) {
190207
Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i }
191208
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i }
192209
}
193-
assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize())
210+
assertEquals(TelemetryMaxQueueSize(), TelemetryQueueSize())
194211
}
195212

196213
@Test
@@ -239,6 +256,6 @@ class TelemetryTest {
239256
} finally {
240257
executor.shutdown()
241258
}
242-
assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize)
259+
assertTrue(TelemetryQueueSize() == TelemetryMaxQueueSize())
243260
}
244261
}

0 commit comments

Comments
 (0)