Skip to content

Commit 3a9a5ec

Browse files
committed
Fix bad sample rate adjustment, add comments, simplify error logic
1 parent f2562d6 commit 3a9a5ec

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ object Telemetry: Subscriber {
7575

7676
var host: String = Constants.DEFAULT_API_HOST
7777
// 1.0 is 100%, will get set by Segment setting before start()
78-
var sampleRate: Double = 0.1
78+
// Values are adjusted by the sampleRate on send
79+
var sampleRate: Double = 1.0
7980
var flushTimer: Int = 30 * 1000 // 30s
8081
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory())
8182
var sendWriteKeyOnError: Boolean = true
@@ -96,6 +97,7 @@ object Telemetry: Subscriber {
9697
private val seenErrors = mutableMapOf<String, Int>()
9798
private var started = false
9899
private var rateLimitEndTime: Long = 0
100+
private var flushFirstError = true
99101
private val exceptionHandler = CoroutineExceptionHandler { _, t ->
100102
errorHandler?.let {
101103
it( Exception(
@@ -116,13 +118,9 @@ object Telemetry: Subscriber {
116118
if (!enable || started || sampleRate == 0.0) return
117119
started = true
118120

119-
// Assume sampleRate is now set and everything in the queue hasn't had it applied
121+
// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values
120122
if (Math.random() > sampleRate) {
121123
resetQueue()
122-
} else {
123-
queue.forEach {
124-
it.value = (it.value / sampleRate).roundToInt()
125-
}
126124
}
127125

128126
telemetryJob = telemetryScope.launch(telemetryDispatcher) {
@@ -191,9 +189,13 @@ object Telemetry: Subscriber {
191189
if (!metric.startsWith(METRICS_BASE_TAG)) return
192190
if (tags.isEmpty()) return
193191
if (queue.size >= maxQueueSize) return
192+
if (Math.random() > sampleRate) return
194193

195-
var filteredTags = tags.toMap()
196-
if (!sendWriteKeyOnError) filteredTags = tags.filterKeys { it.lowercase() != "writekey" }
194+
var filteredTags = if(sendWriteKeyOnError) {
195+
tags.toMap()
196+
} else {
197+
tags.filterKeys { it.lowercase() != "writekey" }
198+
}
197199
var logData: String? = null
198200
if (sendErrorLogData) {
199201
logData = if (log.length > errorLogSizeMax) {
@@ -203,23 +205,11 @@ object Telemetry: Subscriber {
203205
}
204206
}
205207

206-
val errorKey = tags["error"]
207-
if (errorKey != null) {
208-
if (seenErrors.containsKey(errorKey)) {
209-
seenErrors[errorKey] = seenErrors[errorKey]!! + 1
210-
if (Math.random() > sampleRate) return
211-
// Adjust how many we've seen after the first since we know for sure.
212-
addRemoteMetric(metric, filteredTags, log=logData,
213-
value = (seenErrors[errorKey]!! * sampleRate).toInt())
214-
seenErrors[errorKey] = 0
215-
} else {
216-
addRemoteMetric(metric, filteredTags, log=logData)
217-
flush()
218-
seenErrors[errorKey] = 0 // Zero because it's already been sent.
219-
}
220-
}
221-
else {
222-
addRemoteMetric(metric, filteredTags, log=logData)
208+
addRemoteMetric(metric, filteredTags, log=logData)
209+
210+
if(flushFirstError) {
211+
flushFirstError = false
212+
flush()
223213
}
224214
}
225215

@@ -343,12 +333,12 @@ object Telemetry: Subscriber {
343333
system.settings?.let { settings ->
344334
settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let {
345335
sampleRate = it
336+
// We don't want to start telemetry until two conditions are met:
337+
// Telemetry.enable is set to true
338+
// Settings from the server have adjusted the sampleRate
339+
// start is called in both places
340+
start()
346341
}
347-
// We don't want to start telemetry until two conditions are met:
348-
// Telemetry.enable is set to true
349-
// Settings from the server have adjusted the sampleRate
350-
// start is called in both places
351-
start()
352342
}
353343
}
354344

core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,22 @@ internal class Timeline {
7373
"Caught Exception while setting up plugin $plugin",
7474
Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) {
7575
it["error"] = t.toString()
76-
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
76+
if (plugin is DestinationPlugin && plugin.key != "") {
77+
it["plugin"] = "${plugin.type}-${plugin.key}"
78+
} else {
79+
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
80+
}
7781
it["writekey"] = analytics.configuration.writeKey
7882
it["message"] = "Exception executing plugin"
7983
}
8084
}
8185
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
8286
it["message"] = "added"
83-
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
87+
if (plugin is DestinationPlugin && plugin.key != "") {
88+
it["plugin"] = "${plugin.type}-${plugin.key}"
89+
} else {
90+
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
91+
}
8492
}
8593
plugins[plugin.type]?.add(plugin)
8694
with(analytics) {
@@ -109,7 +117,11 @@ internal class Timeline {
109117
list.remove(plugin)
110118
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
111119
it["message"] = "removed"
112-
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
120+
if (plugin is DestinationPlugin && plugin.key != "") {
121+
it["plugin"] = "${plugin.type}-${plugin.key}"
122+
} else {
123+
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
124+
}
113125
}
114126
}
115127
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ import java.net.HttpURLConnection
99
import java.util.concurrent.ConcurrentLinkedQueue
1010

1111
class TelemetryTest {
12+
fun TelemetryResetFlushFirstError() {
13+
val field: Field = Telemetry::class.java.getDeclaredField("flushFirstError")
14+
field.isAccessible = true
15+
field.set(true, true)
16+
}
1217
fun TelemetryQueueSize(): Int {
1318
val queueField: Field = Telemetry::class.java.getDeclaredField("queue")
1419
queueField.isAccessible = true
@@ -163,6 +168,7 @@ class TelemetryTest {
163168
@Test
164169
fun `Test HTTP Exception`() {
165170
mockTelemetryHTTPClient(shouldThrow = true)
171+
TelemetryResetFlushFirstError()
166172
Telemetry.enable = true
167173
Telemetry.start()
168174
Telemetry.error(Telemetry.INVOKE_METRIC,"log") { it["error"] = "test" }

0 commit comments

Comments
 (0)