From f2562d64a882dba5be104413ec3a3ceda192e3b4 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 20 Nov 2024 06:43:59 -0500 Subject: [PATCH 1/4] Fix startup metric scaling and plugin names --- .../main/java/com/segment/analytics/kotlin/core/Telemetry.kt | 4 ++++ .../com/segment/analytics/kotlin/core/platform/Mediator.kt | 2 +- .../com/segment/analytics/kotlin/core/platform/Timeline.kt | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt index 23d06146..5400b3e2 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt @@ -119,6 +119,10 @@ object Telemetry: Subscriber { // Assume sampleRate is now set and everything in the queue hasn't had it applied if (Math.random() > sampleRate) { resetQueue() + } else { + queue.forEach { + it.value = (it.value / sampleRate).roundToInt() + } } telemetryJob = telemetryScope.launch(telemetryDispatcher) { diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt index f897deda..a93cba7a 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt @@ -37,7 +37,7 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop try { Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "event-${event.type}" - "plugin" to "${plugin.type}-${plugin.javaClass}" + it["plugin"] = "${plugin.type}-${plugin.javaClass}" } when (plugin) { is DestinationPlugin -> { diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt index 02d37e1d..0f7cfcdb 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt @@ -80,7 +80,7 @@ internal class Timeline { } Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "added" - it["plugin"] = "${plugin.type.toString()}-${plugin.javaClass.toString()}" + it["plugin"] = "${plugin.type}-${plugin.javaClass}" } plugins[plugin.type]?.add(plugin) with(analytics) { @@ -109,7 +109,7 @@ internal class Timeline { list.remove(plugin) Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "removed" - it["plugin"] = "${plugin.type.toString()}-${plugin.javaClass.toString()}" + it["plugin"] = "${plugin.type}-${plugin.javaClass}" } } } From 3a9a5ec0ed974bab6207b91a139b1f7ec0e9e378 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 21 Nov 2024 21:02:20 -0500 Subject: [PATCH 2/4] Fix bad sample rate adjustment, add comments, simplify error logic --- .../analytics/kotlin/core/Telemetry.kt | 50 ++++++++----------- .../kotlin/core/platform/Timeline.kt | 18 +++++-- .../analytics/kotlin/core/TelemetryTest.kt | 6 +++ 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt index 5400b3e2..3c70842b 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt @@ -75,7 +75,8 @@ object Telemetry: Subscriber { var host: String = Constants.DEFAULT_API_HOST // 1.0 is 100%, will get set by Segment setting before start() - var sampleRate: Double = 0.1 + // Values are adjusted by the sampleRate on send + var sampleRate: Double = 1.0 var flushTimer: Int = 30 * 1000 // 30s var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory()) var sendWriteKeyOnError: Boolean = true @@ -96,6 +97,7 @@ object Telemetry: Subscriber { private val seenErrors = mutableMapOf() private var started = false private var rateLimitEndTime: Long = 0 + private var flushFirstError = true private val exceptionHandler = CoroutineExceptionHandler { _, t -> errorHandler?.let { it( Exception( @@ -116,13 +118,9 @@ object Telemetry: Subscriber { if (!enable || started || sampleRate == 0.0) return started = true - // Assume sampleRate is now set and everything in the queue hasn't had it applied + // Everything queued was sampled at default 100%, downsample adjustment and send will adjust values if (Math.random() > sampleRate) { resetQueue() - } else { - queue.forEach { - it.value = (it.value / sampleRate).roundToInt() - } } telemetryJob = telemetryScope.launch(telemetryDispatcher) { @@ -191,9 +189,13 @@ object Telemetry: Subscriber { if (!metric.startsWith(METRICS_BASE_TAG)) return if (tags.isEmpty()) return if (queue.size >= maxQueueSize) return + if (Math.random() > sampleRate) return - var filteredTags = tags.toMap() - if (!sendWriteKeyOnError) filteredTags = tags.filterKeys { it.lowercase() != "writekey" } + var filteredTags = if(sendWriteKeyOnError) { + tags.toMap() + } else { + tags.filterKeys { it.lowercase() != "writekey" } + } var logData: String? = null if (sendErrorLogData) { logData = if (log.length > errorLogSizeMax) { @@ -203,23 +205,11 @@ object Telemetry: Subscriber { } } - val errorKey = tags["error"] - if (errorKey != null) { - if (seenErrors.containsKey(errorKey)) { - seenErrors[errorKey] = seenErrors[errorKey]!! + 1 - if (Math.random() > sampleRate) return - // Adjust how many we've seen after the first since we know for sure. - addRemoteMetric(metric, filteredTags, log=logData, - value = (seenErrors[errorKey]!! * sampleRate).toInt()) - seenErrors[errorKey] = 0 - } else { - addRemoteMetric(metric, filteredTags, log=logData) - flush() - seenErrors[errorKey] = 0 // Zero because it's already been sent. - } - } - else { - addRemoteMetric(metric, filteredTags, log=logData) + addRemoteMetric(metric, filteredTags, log=logData) + + if(flushFirstError) { + flushFirstError = false + flush() } } @@ -343,12 +333,12 @@ object Telemetry: Subscriber { system.settings?.let { settings -> settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let { sampleRate = it + // We don't want to start telemetry until two conditions are met: + // Telemetry.enable is set to true + // Settings from the server have adjusted the sampleRate + // start is called in both places + start() } - // We don't want to start telemetry until two conditions are met: - // Telemetry.enable is set to true - // Settings from the server have adjusted the sampleRate - // start is called in both places - start() } } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt index 0f7cfcdb..57238e6d 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt @@ -73,14 +73,22 @@ internal class Timeline { "Caught Exception while setting up plugin $plugin", Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) { it["error"] = t.toString() - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } it["writekey"] = analytics.configuration.writeKey it["message"] = "Exception executing plugin" } } Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "added" - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } } plugins[plugin.type]?.add(plugin) with(analytics) { @@ -109,7 +117,11 @@ internal class Timeline { list.remove(plugin) Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "removed" - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } } } } diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt index d3a31c75..e92ea6d9 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt @@ -9,6 +9,11 @@ import java.net.HttpURLConnection import java.util.concurrent.ConcurrentLinkedQueue class TelemetryTest { + fun TelemetryResetFlushFirstError() { + val field: Field = Telemetry::class.java.getDeclaredField("flushFirstError") + field.isAccessible = true + field.set(true, true) + } fun TelemetryQueueSize(): Int { val queueField: Field = Telemetry::class.java.getDeclaredField("queue") queueField.isAccessible = true @@ -163,6 +168,7 @@ class TelemetryTest { @Test fun `Test HTTP Exception`() { mockTelemetryHTTPClient(shouldThrow = true) + TelemetryResetFlushFirstError() Telemetry.enable = true Telemetry.start() Telemetry.error(Telemetry.INVOKE_METRIC,"log") { it["error"] = "test" } From eb2860a2a22118f4afb61c0e97f70cb0a0b45ddb Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 21 Nov 2024 22:25:24 -0500 Subject: [PATCH 3/4] Adding destination plugin name update to mediator --- .../analytics/kotlin/core/platform/Mediator.kt | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt index a93cba7a..7d15139f 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt @@ -37,7 +37,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop try { Telemetry.increment(Telemetry.INTEGRATION_METRIC) { it["message"] = "event-${event.type}" - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } } when (plugin) { is DestinationPlugin -> { @@ -52,7 +56,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop reportErrorWithMetrics(null, t,"Caught Exception in plugin", Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) { it["error"] = t.toString() - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } it["writekey"] = plugin.analytics.configuration.writeKey it["message"] ="Exception executing plugin" } @@ -72,7 +80,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop "Caught Exception applying closure to plugin: $plugin", Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) { it["error"] = t.toString() - it["plugin"] = "${plugin.type}-${plugin.javaClass}" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } it["writekey"] = plugin.analytics.configuration.writeKey it["message"] = "Exception executing plugin" } From ef812202f52a10c4aace7b19164ad8d5358dbae0 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 22 Nov 2024 13:23:08 -0500 Subject: [PATCH 4/4] adding caller to error metrics --- .../main/java/com/segment/analytics/kotlin/core/Analytics.kt | 1 + .../main/java/com/segment/analytics/kotlin/core/HTTPClient.kt | 1 + .../main/java/com/segment/analytics/kotlin/core/Settings.kt | 1 + .../com/segment/analytics/kotlin/core/platform/Mediator.kt | 4 +++- 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index d9812078..a1ffb124 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -98,6 +98,7 @@ open class Analytics protected constructor( Telemetry.INVOKE_ERROR_METRIC, t.stackTraceToString()) { it["error"] = t.toString() it["message"] = "Exception in Analytics Scope" + it["caller"] = t.stackTrace[0].toString() } } override val analyticsScope = CoroutineScope(SupervisorJob() + exceptionHandler) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt b/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt index 753a5672..84b99b92 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt @@ -39,6 +39,7 @@ class HTTPClient( it["error"] = e.toString() it["writekey"] = writeKey it["message"] = "Malformed url" + it["caller"] = e.stackTrace[0].toString() } throw error } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt index 197dafa4..2cd2ddfc 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt @@ -124,6 +124,7 @@ internal fun Analytics.fetchSettings( it["error"] = ex.toString() it["writekey"] = writeKey it["message"] = "Error retrieving settings" + it["caller"] = ex.stackTrace[0].toString() } configuration.defaultSettings } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt index 7d15139f..eaf4d6ec 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt @@ -62,7 +62,8 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop it["plugin"] = "${plugin.type}-${plugin.javaClass}" } it["writekey"] = plugin.analytics.configuration.writeKey - it["message"] ="Exception executing plugin" + it["message"] = "Exception executing plugin" + it["caller"] = t.stackTrace[0].toString() } } } @@ -87,6 +88,7 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop } it["writekey"] = plugin.analytics.configuration.writeKey it["message"] = "Exception executing plugin" + it["caller"] = t.stackTrace[0].toString() } } }