diff --git a/e2e/android/app/src/main/java/com/example/androidobservability/BaseApplication.kt b/e2e/android/app/src/main/java/com/example/androidobservability/BaseApplication.kt index 9512726b3..3db8e0c90 100644 --- a/e2e/android/app/src/main/java/com/example/androidobservability/BaseApplication.kt +++ b/e2e/android/app/src/main/java/com/example/androidobservability/BaseApplication.kt @@ -9,6 +9,7 @@ import com.launchdarkly.sdk.android.Components import com.launchdarkly.sdk.android.LDClient import com.launchdarkly.sdk.android.LDConfig import com.launchdarkly.observability.plugin.Observability +import com.launchdarkly.observability.replay.ReplayInstrumentation import com.launchdarkly.sdk.android.LDAndroidLogging import com.launchdarkly.sdk.android.integrations.Plugin import io.opentelemetry.api.common.AttributeKey @@ -29,6 +30,10 @@ open class BaseApplication : Application() { ), debug = true, logAdapter = LDAndroidLogging.adapter(), + // TODO: consider these being factories so that the obs plugin can pass instantiation data, log adapter + instrumentations = listOf( + ReplayInstrumentation() + ), ) var telemetryInspector: TelemetryInspector? = null diff --git a/e2e/android/app/src/test/java/com/example/androidobservability/DisablingConfigOptionsE2ETest.kt b/e2e/android/app/src/test/java/com/example/androidobservability/DisablingConfigOptionsE2ETest.kt index bcb04bead..82d5d59fc 100644 --- a/e2e/android/app/src/test/java/com/example/androidobservability/DisablingConfigOptionsE2ETest.kt +++ b/e2e/android/app/src/test/java/com/example/androidobservability/DisablingConfigOptionsE2ETest.kt @@ -2,18 +2,17 @@ package com.example.androidobservability import android.app.Application import androidx.test.core.app.ApplicationProvider +import com.example.androidobservability.TestUtils.TelemetryType import com.example.androidobservability.TestUtils.waitForTelemetryData +import com.launchdarkly.observability.api.Options import com.launchdarkly.observability.interfaces.Metric import com.launchdarkly.observability.sdk.LDObserve import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.logs.Severity -import com.example.androidobservability.TestUtils.TelemetryType -import com.launchdarkly.observability.api.Options import junit.framework.TestCase.assertEquals import junit.framework.TestCase.assertFalse import junit.framework.TestCase.assertNotNull -import junit.framework.TestCase.assertNull import junit.framework.TestCase.assertTrue import org.junit.Test import org.junit.runner.RunWith @@ -96,7 +95,6 @@ class DisablingConfigOptionsE2ETest { LDObserve.flush() waitForTelemetryData(telemetryInspector = application.telemetryInspector, telemetryType = TelemetryType.METRICS) - assertNull(application.telemetryInspector?.metricExporter) assertFalse(requestsContainsUrl(metricsUrl)) } @@ -110,7 +108,6 @@ class DisablingConfigOptionsE2ETest { LDObserve.flush() waitForTelemetryData(telemetryInspector = application.telemetryInspector, telemetryType = TelemetryType.METRICS) - assertNotNull(application.telemetryInspector?.metricExporter) assertTrue(requestsContainsUrl(metricsUrl)) } diff --git a/sdk/@launchdarkly/observability-android/lib/build.gradle.kts b/sdk/@launchdarkly/observability-android/lib/build.gradle.kts index aea5e6407..8d7042e87 100644 --- a/sdk/@launchdarkly/observability-android/lib/build.gradle.kts +++ b/sdk/@launchdarkly/observability-android/lib/build.gradle.kts @@ -55,6 +55,11 @@ dependencies { // Android crash instrumentation implementation("io.opentelemetry.android.instrumentation:crash:0.11.0-alpha") + // TODO: O11Y-626 - move replay instrumentation and associated compose dependencies into dedicated package + // Compose dependencies for capture functionality + implementation("androidx.compose.ui:ui:1.7.5") + implementation("androidx.compose.ui:ui-tooling:1.7.5") + // Use JUnit Jupiter for testing. testImplementation(platform("org.junit:junit-bom:5.13.4")) testImplementation("org.junit.jupiter:junit-jupiter") diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/api/Options.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/api/Options.kt index af001910c..8f19f400e 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/api/Options.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/api/Options.kt @@ -2,13 +2,15 @@ package com.launchdarkly.observability.api import com.launchdarkly.logging.LDLogAdapter import com.launchdarkly.observability.BuildConfig +import com.launchdarkly.observability.interfaces.LDExtendedInstrumentation import com.launchdarkly.sdk.android.LDTimberLogging import io.opentelemetry.api.common.Attributes import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes -private const val DEFAULT_OTLP_ENDPOINT = "https://otel.observability.app.launchdarkly.com:4318" -private const val DEFAULT_BACKEND_URL = "https://pub.observability.app.launchdarkly.com" +const val DEFAULT_SERVICE_NAME = "observability-android" +const val DEFAULT_OTLP_ENDPOINT = "https://otel.observability.app.launchdarkly.com:4318" +const val DEFAULT_BACKEND_URL = "https://pub.observability.app.launchdarkly.com" /** * Configuration options for the Observability plugin. @@ -27,9 +29,10 @@ private const val DEFAULT_BACKEND_URL = "https://pub.observability.app.launchdar * @property disableMetrics Disables metrics if true. Defaults to false. * @property logAdapter The log adapter to use. Defaults to using the LaunchDarkly SDK's LDTimberLogging.adapter(). Use LDAndroidLogging.adapter() to use the Android logging adapter. * @property loggerName The name of the logger to use. Defaults to "LaunchDarklyObservabilityPlugin". + * @property instrumentations List of additional instrumentations to use */ data class Options( - val serviceName: String = "observability-android", + val serviceName: String = DEFAULT_SERVICE_NAME, val serviceVersion: String = BuildConfig.OBSERVABILITY_SDK_VERSION, val otlpEndpoint: String = DEFAULT_OTLP_ENDPOINT, val backendUrl: String = DEFAULT_BACKEND_URL, @@ -37,11 +40,12 @@ data class Options( val customHeaders: Map = emptyMap(), val sessionBackgroundTimeout: Duration = 15.minutes, val debug: Boolean = false, - // TODO O11Y-398: implement disable config options after all other instrumentations are implemented val disableErrorTracking: Boolean = false, val disableLogs: Boolean = false, val disableTraces: Boolean = false, val disableMetrics: Boolean = false, val logAdapter: LDLogAdapter = LDTimberLogging.adapter(), // this follows the LaunchDarkly SDK's default log adapter - val loggerName: String = "LaunchDarklyObservabilityPlugin" + val loggerName: String = "LaunchDarklyObservabilityPlugin", + // TODO: update this to provide a list of factories instead of instances + val instrumentations: List = emptyList() ) diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/InstrumentationManager.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/InstrumentationManager.kt index 4aa75e2da..45aa6c5bd 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/InstrumentationManager.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/InstrumentationManager.kt @@ -7,6 +7,7 @@ import com.launchdarkly.observability.interfaces.Metric import com.launchdarkly.observability.network.GraphQLClient import com.launchdarkly.observability.network.SamplingApiService import com.launchdarkly.observability.sampling.CustomSampler +import com.launchdarkly.observability.sampling.ExportSampler import com.launchdarkly.observability.sampling.SamplingConfig import com.launchdarkly.observability.sampling.SamplingLogExporter import com.launchdarkly.observability.sampling.SamplingTraceExporter @@ -29,6 +30,7 @@ import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.logs.LogRecordProcessor import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor import io.opentelemetry.sdk.logs.export.LogRecordExporter @@ -37,9 +39,6 @@ import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter -import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter -import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder import io.opentelemetry.sdk.trace.export.BatchSpanProcessor import io.opentelemetry.sdk.trace.export.SpanExporter @@ -66,20 +65,6 @@ class InstrumentationManager( private val logger: LDLogger, private val options: Options, ) { - companion object { - private const val METRICS_PATH = "/v1/metrics" - private const val LOGS_PATH = "/v1/logs" - private const val TRACES_PATH = "/v1/traces" - private const val INSTRUMENTATION_SCOPE_NAME = "com.launchdarkly.observability" - const val ERROR_SPAN_NAME = "highlight.error" - private const val BATCH_MAX_QUEUE_SIZE = 100 - private const val BATCH_SCHEDULE_DELAY_MS = 1000L - private const val BATCH_EXPORTER_TIMEOUT_MS = 5000L - private const val BATCH_MAX_EXPORT_SIZE = 10 - private const val METRICS_EXPORT_INTERVAL_MS = 10_000L - private const val FLUSH_TIMEOUT_SECONDS = 5L - } - private val otelRUM: OpenTelemetryRum private var otelMeter: Meter private var otelLogger: Logger @@ -87,12 +72,9 @@ class InstrumentationManager( private var customSampler = CustomSampler() private val graphqlClient = GraphQLClient(options.backendUrl) private val samplingApiService = SamplingApiService(graphqlClient) - private var inMemorySpanExporter: InMemorySpanExporter? = null - private var inMemoryLogExporter: InMemoryLogRecordExporter? = null - private var inMemoryMetricExporter: InMemoryMetricExporter? = null private var telemetryInspector: TelemetryInspector? = null private var spanProcessor: BatchSpanProcessor? = null - private var logProcessor: BatchLogRecordProcessor? = null + private var logProcessor: LogRecordProcessor? = null private var metricsReader: PeriodicMetricReader? = null private var launchTimeInstrumentation: LaunchTimeInstrumentation? = null private val gaugeCache = ConcurrentHashMap() @@ -104,14 +86,26 @@ class InstrumentationManager( private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) init { + initializeTelemetryInspector() val otelRumConfig = createOtelRumConfig() val rumBuilder = OpenTelemetryRum.builder(application, otelRumConfig) .addLoggerProviderCustomizer { sdkLoggerProviderBuilder, _ -> + // TODO: O11Y-627 - need to refactor this so that the disableLogs option is specific to core logging functionality. when logs are disabled, session replay logs should not be blocked return@addLoggerProviderCustomizer if (options.disableLogs && options.disableErrorTracking) { sdkLoggerProviderBuilder } else { - configureLoggerProvider(sdkLoggerProviderBuilder) + val processor = createLoggerProcessor( + sdkLoggerProviderBuilder, + customSampler, + sdkKey, + resources, + logger, + telemetryInspector, + options + ) + logProcessor = processor + sdkLoggerProviderBuilder.addLogRecordProcessor(processor) } } .addTracerProviderCustomizer { sdkTracerProviderBuilder, _ -> @@ -129,6 +123,10 @@ class InstrumentationManager( } } + for (instrumentation in options.instrumentations) { + rumBuilder.addInstrumentation(instrumentation) + } + if (!options.disableMetrics) { launchTimeInstrumentation = LaunchTimeInstrumentation( application = application, @@ -139,8 +137,6 @@ class InstrumentationManager( } otelRUM = rumBuilder.build() - - initializeTelemetryInspector() loadSamplingConfigAsync() otelMeter = otelRUM.openTelemetry.meterProvider.get(INSTRUMENTATION_SCOPE_NAME) @@ -171,17 +167,6 @@ class InstrumentationManager( return !options.disableLogs || !options.disableTraces || !options.disableMetrics || !options.disableErrorTracking } - private fun configureLoggerProvider(sdkLoggerProviderBuilder: SdkLoggerProviderBuilder): SdkLoggerProviderBuilder { - val primaryLogExporter = createOtlpLogExporter() - sdkLoggerProviderBuilder.setResource(resources) - - val finalExporter = createLogExporter(primaryLogExporter) - val processor = createBatchLogRecordProcessor(finalExporter) - - logProcessor = processor - return sdkLoggerProviderBuilder.addLogRecordProcessor(processor) - } - private fun configureTracerProvider(sdkTracerProviderBuilder: SdkTracerProviderBuilder): SdkTracerProviderBuilder { val primarySpanExporter = createOtlpSpanExporter() sdkTracerProviderBuilder.setResource(resources) @@ -205,13 +190,6 @@ class InstrumentationManager( .registerMetricReader(metricReader) } - private fun createOtlpLogExporter(): LogRecordExporter { - return OtlpHttpLogRecordExporter.builder() - .setEndpoint(options.otlpEndpoint + LOGS_PATH) - .setHeaders { options.customHeaders } - .build() - } - private fun createOtlpSpanExporter(): SpanExporter { return OtlpHttpSpanExporter.builder() .setEndpoint(options.otlpEndpoint + TRACES_PATH) @@ -227,35 +205,13 @@ class InstrumentationManager( .build() } - private fun createLogExporter(primaryExporter: LogRecordExporter): LogRecordExporter { - val baseExporter = if (options.debug) { - LogRecordExporter.composite( - buildList { - add(primaryExporter) - add(DebugLogExporter(logger)) - add(InMemoryLogRecordExporter.create().also { inMemoryLogExporter = it }) - } - ) - } else { - primaryExporter - } - - val conditionalExporter = ConditionalLogRecordExporter( - delegate = baseExporter, - allowNormalLogs = !options.disableLogs, - allowCrashes = !options.disableErrorTracking - ) - - return SamplingLogExporter(conditionalExporter, customSampler) - } - private fun createSpanExporter(primaryExporter: SpanExporter): SpanExporter { val baseExporter = if (options.debug) { SpanExporter.composite( buildList { add(primaryExporter) add(DebugSpanExporter(logger)) - add(InMemorySpanExporter.create().also { inMemorySpanExporter = it }) + telemetryInspector?.let { add(it.spanExporter) } } ) } else { @@ -277,7 +233,7 @@ class InstrumentationManager( buildList { add(primaryExporter) add(DebugMetricExporter(logger)) - add(InMemoryMetricExporter.create().also { inMemoryMetricExporter = it }) + telemetryInspector?.let { add(it.metricExporter) } } ) } else { @@ -294,7 +250,7 @@ class InstrumentationManager( private fun initializeTelemetryInspector() { if (options.debug) { - telemetryInspector = TelemetryInspector(inMemorySpanExporter, inMemoryLogExporter, inMemoryMetricExporter) + telemetryInspector = TelemetryInspector() } } @@ -308,15 +264,6 @@ class InstrumentationManager( } } - private fun createBatchLogRecordProcessor(logRecordExporter: LogRecordExporter): BatchLogRecordProcessor { - return BatchLogRecordProcessor.builder(logRecordExporter) - .setMaxQueueSize(BATCH_MAX_QUEUE_SIZE) - .setScheduleDelay(BATCH_SCHEDULE_DELAY_MS, TimeUnit.MILLISECONDS) - .setExporterTimeout(BATCH_EXPORTER_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .setMaxExportBatchSize(BATCH_MAX_EXPORT_SIZE) - .build() - } - private fun createBatchSpanProcessor(spanExporter: SpanExporter): BatchSpanProcessor { return BatchSpanProcessor.builder(spanExporter) .setMaxQueueSize(BATCH_MAX_QUEUE_SIZE) @@ -437,4 +384,101 @@ class InstrumentationManager( null } } + + companion object { + private const val METRICS_PATH = "/v1/metrics" + private const val LOGS_PATH = "/v1/logs" + private const val TRACES_PATH = "/v1/traces" + private const val INSTRUMENTATION_SCOPE_NAME = "com.launchdarkly.observability" + const val ERROR_SPAN_NAME = "highlight.error" + private const val BATCH_MAX_QUEUE_SIZE = 100 + private const val BATCH_SCHEDULE_DELAY_MS = 1000L + private const val BATCH_EXPORTER_TIMEOUT_MS = 5000L + private const val BATCH_MAX_EXPORT_SIZE = 10 + private const val METRICS_EXPORT_INTERVAL_MS = 10_000L + private const val FLUSH_TIMEOUT_SECONDS = 5L + + internal fun createLoggerProcessor( + sdkLoggerProviderBuilder: SdkLoggerProviderBuilder, + exportSampler: ExportSampler, + sdkKey: String, + resource: Resource, + logger: LDLogger, + telemetryInspector: TelemetryInspector?, + options: Options, + ): LogRecordProcessor { + val primaryLogExporter = createOtlpLogExporter(options) + sdkLoggerProviderBuilder.setResource(resource) + + val finalExporter = createLogExporter( + primaryLogExporter, + exportSampler, + logger, + telemetryInspector, + options + ) + val baseProcessor = createBatchLogRecordProcessor(finalExporter) + + // Here we set up a routing log processor that will route logs with a matching scope name to the + // respective instrumentation's log record processor. If the log's scope name does not match + // an instrumentation's scope name, it will fall through to the base processor. This was + // originally added to route replay instrumentation logs through a separate log processing + // pipeline to provide instrumentation specific caching and export. + val routingLogRecordProcessor = + RoutingLogRecordProcessor(fallthroughProcessor = baseProcessor) + options.instrumentations.forEach { instrumentation -> + instrumentation.getLogRecordProcessor(credential = sdkKey)?.let { processor -> + instrumentation.getLoggerScopeName().let { scopeName -> + routingLogRecordProcessor.addProcessor(scopeName, processor) + } + } + } + + return routingLogRecordProcessor + } + + private fun createOtlpLogExporter(options: Options): LogRecordExporter { + return OtlpHttpLogRecordExporter.builder() + .setEndpoint(options.otlpEndpoint + LOGS_PATH) + .setHeaders { options.customHeaders } + .build() + } + + private fun createLogExporter( + primaryExporter: LogRecordExporter, + exportSampler: ExportSampler, + logger: LDLogger, + telemetryInspector: TelemetryInspector?, + options: Options + ): LogRecordExporter { + val baseExporter = if (options.debug) { + LogRecordExporter.composite( + buildList { + add(primaryExporter) + add(DebugLogExporter(logger)) + telemetryInspector?.let { add(it.logExporter) } + } + ) + } else { + primaryExporter + } + + val conditionalExporter = ConditionalLogRecordExporter( + delegate = baseExporter, + allowNormalLogs = !options.disableLogs, + allowCrashes = !options.disableErrorTracking + ) + + return SamplingLogExporter(conditionalExporter, exportSampler) + } + + fun createBatchLogRecordProcessor(logRecordExporter: LogRecordExporter): BatchLogRecordProcessor { + return BatchLogRecordProcessor.builder(logRecordExporter) + .setMaxQueueSize(BATCH_MAX_QUEUE_SIZE) + .setScheduleDelay(BATCH_SCHEDULE_DELAY_MS, TimeUnit.MILLISECONDS) + .setExporterTimeout(BATCH_EXPORTER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .setMaxExportBatchSize(BATCH_MAX_EXPORT_SIZE) + .build() + } + } } diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/NoopLogRecordProcessor.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/NoopLogRecordProcessor.kt new file mode 100644 index 000000000..1ea03ebbf --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/NoopLogRecordProcessor.kt @@ -0,0 +1,12 @@ +package com.launchdarkly.observability.client + +import io.opentelemetry.context.Context +import io.opentelemetry.sdk.logs.LogRecordProcessor +import io.opentelemetry.sdk.logs.ReadWriteLogRecord + +/** + * A [LogRecordProcessor] that, surprise, does nothing. + */ +internal object NoopLogRecordProcessor : LogRecordProcessor { + override fun onEmit(context: Context, logRecord: ReadWriteLogRecord) {} +} \ No newline at end of file diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessor.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessor.kt new file mode 100644 index 000000000..8b1a0225c --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessor.kt @@ -0,0 +1,27 @@ +package com.launchdarkly.observability.client + +import io.opentelemetry.context.Context +import io.opentelemetry.sdk.logs.LogRecordProcessor +import io.opentelemetry.sdk.logs.ReadWriteLogRecord +import java.util.concurrent.ConcurrentHashMap + +/** + * A [LogRecordProcessor] that implements a routing pattern to other registered [LogRecordProcessor]s + * using scope name as routing criteria. If no [LogRecordProcessor] is registered for the given + * scope name, the [fallthroughProcessor] is called to handle the log. + */ +class RoutingLogRecordProcessor( + private val fallthroughProcessor: LogRecordProcessor = NoopLogRecordProcessor +) : LogRecordProcessor { + private val processors = ConcurrentHashMap() + + fun addProcessor(scopeName: String, processor: LogRecordProcessor) { + processors[scopeName] = processor + } + + override fun onEmit(context: Context, logRecord: ReadWriteLogRecord) { + val scopeName = logRecord.instrumentationScopeInfo.name + val processor = processors[scopeName] ?: fallthroughProcessor + processor.onEmit(context, logRecord) + } +} \ No newline at end of file diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/TelemetryInspector.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/TelemetryInspector.kt index 6d857a5b9..bd53635d2 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/TelemetryInspector.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/TelemetryInspector.kt @@ -12,7 +12,8 @@ import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter * @param metricExporter The in-memory metric exporter to read from */ class TelemetryInspector( - val spanExporter: InMemorySpanExporter?, - val logExporter: InMemoryLogRecordExporter?, - val metricExporter: InMemoryMetricExporter? -) +) { + val spanExporter: InMemorySpanExporter by lazy { InMemorySpanExporter.create() } + val logExporter:InMemoryLogRecordExporter by lazy { InMemoryLogRecordExporter.create() } + val metricExporter: InMemoryMetricExporter by lazy { InMemoryMetricExporter.create() } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/interfaces/LDExtendedInstrumentation.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/interfaces/LDExtendedInstrumentation.kt new file mode 100644 index 000000000..ec4bcdc3d --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/interfaces/LDExtendedInstrumentation.kt @@ -0,0 +1,22 @@ +package com.launchdarkly.observability.interfaces + +import io.opentelemetry.android.instrumentation.AndroidInstrumentation +import io.opentelemetry.sdk.logs.LogRecordProcessor + +// This interface is for internal LaunchDarkly use only. +interface LDExtendedInstrumentation : AndroidInstrumentation { + + /** + * @return the scope name that this instrumentation will use for its logs + */ + fun getLoggerScopeName(): String + + /** + * @param credential the credential that will be used by exporters for authenticating with + * services + * + * @return the instrumentation specific [LogRecordProcessor] for handling this instrumentations + * logs, or null if this instrumentation does not need to provide any specific handling. + */ + fun getLogRecordProcessor(credential: String): LogRecordProcessor? = null +} \ No newline at end of file diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/GraphQLClient.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/GraphQLClient.kt index cb4efeb75..b7e5c2b08 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/GraphQLClient.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/GraphQLClient.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement import java.io.IOException import java.net.HttpURLConnection import java.net.URL @@ -12,7 +13,7 @@ import java.net.URL @Serializable data class GraphQLRequest( val query: String, - val variables: Map = emptyMap() + val variables: Map = emptyMap() ) @Serializable @@ -69,7 +70,7 @@ class GraphQLClient( */ suspend fun execute( queryFileName: String, - variables: Map = emptyMap(), + variables: Map = emptyMap(), dataSerializer: KSerializer ): GraphQLResponse = withContext(Dispatchers.IO) { try { diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/SamplingApiService.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/SamplingApiService.kt index 8bf2fb050..e5d718535 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/SamplingApiService.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/network/SamplingApiService.kt @@ -1,6 +1,7 @@ package com.launchdarkly.observability.network import com.launchdarkly.observability.sampling.SamplingConfig +import kotlinx.serialization.json.JsonPrimitive /** * Service for fetching sampling configuration @@ -20,7 +21,7 @@ class SamplingApiService( */ suspend fun getSamplingConfig(organizationVerboseId: String): SamplingConfig? { try { - val variables = mapOf("organization_verbose_id" to organizationVerboseId) + val variables = mapOf("organization_verbose_id" to JsonPrimitive(organizationVerboseId)) val response = graphqlClient.execute( queryFileName = GET_SAMPLING_CONFIG_QUERY_FILE_PATH, variables = variables, diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/Capture.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/Capture.kt new file mode 100644 index 000000000..768910527 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/Capture.kt @@ -0,0 +1,19 @@ +package com.launchdarkly.observability.replay + +/** + * Represents a capture for the replay instrumentation + * + * @property imageBase64 The capture encoded as a Base64 string. + * @property origHeight The original height of the capture in pixels. + * @property origWidth The original width of the captured in pixels. + * @property timestamp The timestamp when the capture was taken, in milliseconds since epoch. + * @property session The unique session identifier that this capture belongs to. This links + * the capture to a specific user session. + */ +data class Capture( + val imageBase64: String, + val origHeight: Int, + val origWidth: Int, + val timestamp: Long, + val session: String +) diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/CaptureSource.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/CaptureSource.kt new file mode 100644 index 000000000..167c69da6 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/CaptureSource.kt @@ -0,0 +1,362 @@ +package com.launchdarkly.observability.replay + +import android.app.Activity +import android.app.Application +import android.graphics.Bitmap +import android.graphics.Canvas +import android.graphics.Color +import android.graphics.Paint +import android.graphics.Rect +import android.os.Build +import android.os.Bundle +import android.os.Handler +import android.os.Looper +import android.util.Base64 +import android.view.PixelCopy +import android.view.View +import android.view.ViewGroup +import androidx.compose.ui.platform.ComposeView +import androidx.compose.ui.semantics.SemanticsNode +import androidx.compose.ui.semantics.SemanticsOwner +import androidx.compose.ui.semantics.SemanticsProperties +import androidx.compose.ui.semantics.getOrNull +import io.opentelemetry.android.session.SessionManager +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext +import java.io.ByteArrayOutputStream +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import androidx.compose.ui.geometry.Rect as ComposeRect + +/** + * A source of [Capture]s taken from the most recently resumed [Activity]s window. Captures + * are emitted on the [captureFlow] property of this class. + * + * @param sessionManager Used to get current session for tagging [Capture] with session id + */ +class CaptureSource( + private val sessionManager: SessionManager, + private val privacyProfile: PrivacyProfile, + // TODO: O11Y-628 - add captureQuality options +) : Application.ActivityLifecycleCallbacks { + + private var _activity: Activity? = null + + private val _captureFlow = MutableSharedFlow() + val captureFlow: SharedFlow = _captureFlow.asSharedFlow() + + /** + * Attaches the [CaptureSource] to the [Application] whose [Activity]s will be captured. + */ + fun attachToApplication(application: Application) { + application.registerActivityLifecycleCallbacks(this) + } + + /** + * Detaches the [CaptureSource] from the [Application]. + */ + fun detachFromApplication(application: Application) { + application.unregisterActivityLifecycleCallbacks(this) + } + + /** + * Requests a [Capture] be taken now. + */ + suspend fun captureNow() { + val capture = doCapture() + if (capture != null) { + _captureFlow.emit(capture) + } + } + + override fun onActivityCreated(activity: Activity, savedInstanceState: Bundle?) { + // Noop + } + + override fun onActivityStarted(activity: Activity) { + // Noop + } + + override fun onActivityResumed(activity: Activity) { + _activity = activity + } + + override fun onActivityPaused(activity: Activity) { + _activity = null + } + + override fun onActivityStopped(activity: Activity) { + // Noop + } + + override fun onActivitySaveInstanceState(activity: Activity, outState: Bundle) { + // Noop + } + + override fun onActivityDestroyed(activity: Activity) { + // Noop + } + + /** + * Internal capture routine. + */ + private suspend fun doCapture(): Capture? = withContext(Dispatchers.Main) { + val activity = _activity ?: return@withContext null + + try { + val window = activity.window + val decorView = window.decorView + val decorViewWidth = decorView.width + val decorViewHeight = decorView.height + + val rect = Rect(0, 0, decorViewWidth, decorViewHeight) + + // protect against race condition where decor view has no size + if (decorViewWidth <= 0 || decorViewHeight <= 0) { + return@withContext null + } + + // TODO: O11Y-625 - optimize memory allocations + // TODO: O11Y-625 - see if holding bitmap is more efficient than base64 encoding immediately after compression + // TODO: O11Y-628 - use captureQuality option for scaling and adjust this bitmap accordingly, may need to investigate power of 2 rounding for performance + // Create a bitmap with the window dimensions + val bitmap = Bitmap.createBitmap(decorViewWidth, decorViewHeight, Bitmap.Config.ARGB_8888) + + // Use PixelCopy to capture the window content + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { + suspendCancellableCoroutine { continuation -> + // TODO: O11Y-624 - read PixelCopy exception recommendations and adjust logic to account for such cases + PixelCopy.request( + window, + rect, + bitmap, + { result -> + // record attributes immediately to provide accurate stamping + val timestamp = System.currentTimeMillis() + val session = sessionManager.getSessionId() + + if (result == PixelCopy.SUCCESS) { + // Offload heavy bitmap work to a background dispatcher + CoroutineScope(Dispatchers.Default).launch { + try { + val postMask = bitmap; + // TODO: O11Y-620 - masking +// val postMask: Bitmap = +// if (privacyProfile == PrivacyProfile.STRICT) { +// maskSensitiveAreas(bitmap, activity) +// } else { +// bitmap +// } + + // TODO: O11Y-625 - optimize memory allocations here, re-use byte arrays and such + val outputStream = ByteArrayOutputStream() + // TODO: O11Y-628 - calculate quality using captureQuality options + postMask.compress(Bitmap.CompressFormat.WEBP, 30, outputStream) + val byteArray = outputStream.toByteArray() + val compressedImage = + Base64.encodeToString(byteArray, Base64.NO_WRAP) + + val capture = Capture( + imageBase64 = compressedImage, + origWidth = decorViewWidth, + origHeight = decorViewHeight, + timestamp = timestamp, + session = session, + ) + continuation.resume(capture) + } catch (e: Exception) { + continuation.resumeWithException(e) + } + } + } else { + // TODO: O11Y-624 - implement handling/shutdown for errors and unsupported API levels + continuation.resumeWithException(Exception("PixelCopy failed with result: $result")) + } + }, + Handler(Looper.getMainLooper()) // Handler for main thread + ) + } + } else { + // TODO: O11Y-624 - implement handling/shutdown for errors and unsupported API levels + throw NotImplementedError("CaptureSource does not work on unsupported Android SDK version") + } + } catch (e: Exception) { + // TODO: O11Y-624 - implement handling/shutdown for errors and unsupported API levels + throw RuntimeException(e) + } + } + + /** + * Applies masking rectangles to the provided [bitmap] by inspecting the provided [activity] for + * content that needs to be masked. + * + * @param bitmap The bitmap to mask + * @param activity The activity that the bitmap was captured from. + */ + private fun maskSensitiveAreas(bitmap: Bitmap, activity: Activity): Bitmap { + // TODO: O11Y-625 - remove this bitmap copy if possible for memory optimization purposes + val maskedBitmap = bitmap.copy(Bitmap.Config.ARGB_8888, true) + val canvas = Canvas(maskedBitmap) + val paint = Paint().apply { + color = Color.BLACK + style = Paint.Style.FILL + } + + // Find sensitive areas using Compose semantics + val sensitiveComposeRects = findSensitiveComposeAreasFromActivity(activity) + + // Mask sensitive Compose areas found via semantics + sensitiveComposeRects.forEach { composeRect -> + val rect = Rect( + composeRect.left.toInt(), + composeRect.top.toInt(), + composeRect.right.toInt(), + composeRect.bottom.toInt() + ) + canvas.drawRect(rect, paint) + } + + return maskedBitmap + } + + /** + * Find sensitive Compose areas from all ComposeViews in the activity. + * + * @return a list of rects that represent sensitive areas that need to be masked + */ + private fun findSensitiveComposeAreasFromActivity(activity: Activity): List { + val allSensitiveRects = mutableListOf() + + try { + // Find all ComposeViews in the activity + val composeViews = findComposeViews(activity.window.decorView) + + // Process each ComposeView to find sensitive areas + composeViews.forEach { composeView -> + val semanticsOwner = getSemanticsOwner(composeView) + val rootSemanticsNode = semanticsOwner?.rootSemanticsNode + if (rootSemanticsNode != null) { + val sensitiveRects = findSensitiveComposeAreas(rootSemanticsNode, composeView) + allSensitiveRects.addAll(sensitiveRects) + } + } + } catch (e: Exception) { + // Handle cases where ComposeView access fails + } + + return allSensitiveRects + } + + /** + * Recursively find all ComposeViews in the view hierarchy. + * + * @return list of compose views + */ + private fun findComposeViews(view: View): List { + val composeViews = mutableListOf() + + if (view is ComposeView) { + composeViews.add(view) + } + + if (view is ViewGroup) { + for (i in 0 until view.childCount) { + val child = view.getChildAt(i) + composeViews.addAll(findComposeViews(child)) + } + } + + return composeViews + } + + /** + * Gets the SemanticsOwner from a ComposeView using reflection. This is necessary because + * AndroidComposeView and semanticsOwner are not publicly exposed. + */ + private fun getSemanticsOwner(composeView: ComposeView): SemanticsOwner? { + return try { + // ComposeView contains an AndroidComposeView which has the semanticsOwner + if (composeView.childCount > 0) { + val androidComposeView = composeView.getChildAt(0) + + // TODO: O11Y-620 - determine if there is a more robust long term way to achieve this, this reflection is fragile. + // Use reflection to check if this is an AndroidComposeView + val androidComposeViewClass = + Class.forName("androidx.compose.ui.platform.AndroidComposeView") + if (androidComposeViewClass.isInstance(androidComposeView)) { + // Use reflection to access the semanticsOwner field + val field = androidComposeViewClass.getDeclaredField("semanticsOwner") + field.isAccessible = true + field.get(androidComposeView) as? SemanticsOwner + } else null + } else null + } catch (e: Exception) { + null + } + } + + /** + * Find sensitive Compose areas by traversing the semantic node tree. + */ + private fun findSensitiveComposeAreas( + rootSemanticsNode: SemanticsNode, + composeView: ComposeView + ): List { + val sensitiveRects = mutableListOf() + + try { + // Recursively traverse the semantic node tree to find sensitive areas + traverseSemanticNode(rootSemanticsNode, sensitiveRects, composeView) + + } catch (e: Exception) { + // Handle cases where semantic node traversal fails + // This could happen if the semantic tree is not available or corrupted + } + + return sensitiveRects + } + + /** + * Recursively traverse a semantic node and its children to find sensitive areas. + */ + private fun traverseSemanticNode( + node: SemanticsNode, + sensitiveRects: MutableList, + composeView: ComposeView + ) { + // Check if this node is marked as sensitive + if (isSensitiveNode(node)) { + // Convert bounds to absolute coordinates + val boundsInWindow = node.boundsInWindow + val absoluteRect = ComposeRect( + left = boundsInWindow.left, + top = boundsInWindow.top, + right = boundsInWindow.right, + bottom = boundsInWindow.bottom + ) + sensitiveRects.add(absoluteRect) + } + + // Recursively traverse all children + node.children.forEach { child -> + traverseSemanticNode(child, sensitiveRects, composeView) + } + } + + /** + * Check if a semantic node contains sensitive content based on test tags or content descriptions. + */ + private fun isSensitiveNode(node: SemanticsNode): Boolean { + // TODO: O11Y-620 - refactor to utilize generic MaskMatchers + + // Check for content description containing "sensitive" + val contentDescriptions = node.config.getOrNull(SemanticsProperties.ContentDescription) + return contentDescriptions?.any { it.contains("sensitive", ignoreCase = true) } == true + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/PrivacyProfile.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/PrivacyProfile.kt new file mode 100644 index 000000000..7f3391098 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/PrivacyProfile.kt @@ -0,0 +1,6 @@ +package com.launchdarkly.observability.replay + +// TODO: O11Y-620 - implement full PrivacyProfiles and MaskingMatchers +enum class PrivacyProfile { + NO_MASK, STRICT +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporter.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporter.kt new file mode 100644 index 000000000..2e59645c7 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporter.kt @@ -0,0 +1,296 @@ +package com.launchdarkly.observability.replay + +import com.launchdarkly.observability.network.GraphQLClient +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.logs.data.LogRecordData +import io.opentelemetry.sdk.logs.export.LogRecordExporter +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import kotlinx.serialization.json.Json + +private const val REPLAY_EXPORTER_NAME = "RRwebGraphQLReplayLogExporter" + +/** + * An [LogRecordExporter] that can send session replay capture logs to the backend using RRWeb syntax + * and GraphQL pushes for transport. + * + * @param organizationVerboseId the organization verbose id for the LaunchDarkly customer + * @param backendUrl The backend URL the GraphQL operations + * @param serviceName The service name + * @param serviceVersion The service version + * @param injectedReplayApiService Optional SessionReplayApiService for testing. If null, a default service will be created. + */ +class RRwebGraphQLReplayLogExporter( + val organizationVerboseId: String, + val backendUrl: String, + val serviceName: String, + val serviceVersion: String, + private val injectedReplayApiService: SessionReplayApiService? = null +) : LogRecordExporter { + private val coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + + private var graphqlClient: GraphQLClient = GraphQLClient(backendUrl) + private val replayApiService: SessionReplayApiService = injectedReplayApiService ?: SessionReplayApiService( + graphqlClient = graphqlClient, + serviceName = serviceName, + serviceVersion = serviceVersion, + ) + + // TODO: O11Y-624 - need to implement sid, payloadId reset when multiple sessions occur in one application process lifecycle. + private var sidCounter = 0 + private var payloadIdCounter = 0 + + private data class LastSentState( + val sessionId: String?, + val height: Int, + val width: Int, + ) + + private var lastSentState = LastSentState(sessionId = null, height = 0, width = 0) + + override fun export(logs: MutableCollection): CompletableResultCode { + val resultCode = CompletableResultCode() + + coroutineScope.launch { + try { + for (log in logs) { + val capture = extractCaptureFromLog(log) + if (capture != null) { + // TODO: O11Y-624 - investigate if there is a size limit on the push that is imposed server side. + val success = + if (capture.session != lastSentState.sessionId || capture.origHeight != lastSentState.height || capture.origWidth != lastSentState.width) { + // we need to send a full capture if the session id changes or there is a resize/orientation change + sendCaptureFull(capture) + } else { + sendCaptureIncremental(capture) + } + if (!success) { + // Stop processing immediately on first failure + resultCode.fail() + return@launch + } + } + } + + // All captures processed successfully + resultCode.succeed() + } catch (e: Exception) { + // TODO: O11Y-627 - pass in logger to implementation and use here + // Log.e("RRwebGraphQLReplayLogExporter", "Error during export: ${e.message}", e) + resultCode.fail() + } + } + + return resultCode + } + + override fun flush(): CompletableResultCode { + // TODO: O11Y-621 - Handle flush + return CompletableResultCode.ofSuccess() + } + + override fun shutdown(): CompletableResultCode { + // TODO: O11Y-621 - Handle shutdown + return CompletableResultCode.ofSuccess() + } + + fun nextSid(): Int { + sidCounter++ + return sidCounter + } + + fun nextPayloadId(): Int { + payloadIdCounter++ + return payloadIdCounter + } + + // Returns null if unable to extract a valid capture from the log record + private fun extractCaptureFromLog(log: LogRecordData): Capture? { + val attributes = log.attributes + val eventDomain = attributes.get(AttributeKey.stringKey("event.domain")) + val imageWidth = attributes.get(AttributeKey.longKey("image.width")) + val imageHeight = attributes.get(AttributeKey.longKey("image.height")) + val imageData = attributes.get(AttributeKey.stringKey("image.data")) + val sessionId = attributes.get(AttributeKey.stringKey("session.id")) + + // Return null if any required attribute is missing + if (eventDomain != "media" || imageWidth == null || imageHeight == null || imageData == null || sessionId == null) { + return null + } + + return Capture( + imageBase64 = imageData, + origHeight = imageHeight.toInt(), + origWidth = imageWidth.toInt(), + timestamp = log.observedTimestampEpochNanos / 1_000_000, // Convert nanoseconds to milliseconds + session = sessionId + ) + } + + /** + * Sends an incremental capture. Used after [sendCaptureFull] has already been called for a previous capture in the same session. + * + * @param capture the capture to be sent + */ + suspend fun sendCaptureIncremental(capture: Capture): Boolean = withContext(Dispatchers.IO) { + try { + val eventsBatch = mutableListOf() + val timestamp = System.currentTimeMillis() + + // TODO: O11Y-625 - optimize JSON usage for performance since this region of code is essentially static + val incrementalEvent = Event( + type = EventType.INCREMENTAL_SNAPSHOT, + timestamp = timestamp, + sid = nextSid(), + data = EventDataUnion.CustomEventDataWrapper( + Json.parseToJsonElement("""{"source":9,"id":6,"type":0,"commands":[{"property":"clearRect","args":[0,0,${capture.origWidth},${capture.origHeight}]},{"property":"drawImage","args":[{"rr_type":"ImageBitmap","args":[{"rr_type":"Blob","data":[{"rr_type":"ArrayBuffer","base64":"${capture.imageBase64}"}],"type":"image/jpeg"}]},0,0,${capture.origWidth},${capture.origHeight}]}]}""") + ) + ) + eventsBatch.add(incrementalEvent) + + // TODO: O11Y-629 - remove this spoofed mouse interaction when proper user interaction is instrumented + // This spoofed mouse interaction is necessary to make the session look like it had activity + eventsBatch.add( + Event( + type = EventType.INCREMENTAL_SNAPSHOT, + timestamp = timestamp, + sid = nextSid(), + data = EventDataUnion.CustomEventDataWrapper( + Json.parseToJsonElement("""{"source":2,"type":2,"x":1, "y":1}""") + ) + ) + ) + + replayApiService.pushPayload(capture.session, "${nextPayloadId()}", eventsBatch) + + // record last sent state only after successful completion + lastSentState = LastSentState(sessionId = capture.session, height = capture.origHeight, width = capture.origWidth) + + true + } catch (e: Exception) { + // TODO: O11Y-627 - pass in logger to implementation and use here +// Log.e( +// REPLAY_EXPORTER_NAME, +// "Error sending incremental capture for session: ${e.message}", +// e +// ) + false + } + } + + /** + * Sends a full capture. May be invoked multiple times for a single session if a substantial + * change occurs requiring a full capture to be sent. + * + * @param capture the capture to be sent + */ + suspend fun sendCaptureFull(capture: Capture): Boolean = withContext(Dispatchers.IO) { + try { + replayApiService.initializeReplaySession(organizationVerboseId, capture.session) + replayApiService.identifyReplaySession(capture.session) + + val eventBatch = mutableListOf() + + // TODO: O11Y-625 - optimize JSON usage for performance since this region of code is essentially static + + val timestamp = System.currentTimeMillis() + val metaEvent = Event( + type = EventType.META, + timestamp = timestamp, + sid = nextSid(), + data = EventDataUnion.StandardEventData( + EventData( + width = capture.origWidth, + height = capture.origHeight, + ) + ), + ) + eventBatch.add(metaEvent) + + val snapShotEvent = Event( + type = EventType.FULL_SNAPSHOT, + timestamp = timestamp, + sid = nextSid(), + data = EventDataUnion.StandardEventData( + EventData( + node = EventNode( + id = 1, + type = NodeType.DOCUMENT, + childNodes = listOf( + EventNode( + id = 2, + type = NodeType.DOCUMENT_TYPE, + name = "html", + ), + EventNode( + id = 3, + type = NodeType.ELEMENT, + tagName = "html", + attributes = mapOf("lang" to "en"), + childNodes = listOf( + EventNode( + id = 4, + type = NodeType.ELEMENT, + tagName = "head", + attributes = emptyMap(), + ), + EventNode( + id = 5, + type = NodeType.ELEMENT, + tagName = "body", + attributes = emptyMap(), + childNodes = listOf( + EventNode( + id = 6, + type = NodeType.ELEMENT, + tagName = "canvas", + attributes = mapOf( + "rr_dataURL" to "data:image/jpeg;base64,${capture.imageBase64}", + "width" to "${capture.origWidth}", + "height" to "${capture.origHeight}" + ), + childNodes = listOf(), + ) + ) + ) + ) + ) + ), + ), + ) + ), + ) + eventBatch.add(snapShotEvent) + + val viewportEvent = Event( + type = EventType.CUSTOM, + timestamp = timestamp, + sid = nextSid(), + data = EventDataUnion.CustomEventDataWrapper( + Json.parseToJsonElement("""{"tag":"Viewport","payload":{"width":${capture.origWidth},"height":${capture.origHeight},"availWidth":${capture.origWidth},"availHeight":${capture.origHeight},"colorDepth":30,"pixelDepth":30,"orientation":0}}""") + ) + ) + eventBatch.add(viewportEvent) + + // TODO: O11Y-624 - double check error case handling, may need to add retries per api service request, should subsequent requests wait for previous requests to succeed? + replayApiService.pushPayload(capture.session, "${nextPayloadId()}", eventBatch) + + // record last sent state only after successful completion + lastSentState = LastSentState(sessionId = capture.session, height = capture.origHeight, width = capture.origWidth) + + true + } catch (e: Exception) { + // TODO: O11Y-627 - pass in logger to implementation and use here +// Log.e( +// REPLAY_EXPORTER_NAME, +// "Error sending initial capture for session: ${e.message}", +// e +// ) + false + } + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayInstrumentation.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayInstrumentation.kt new file mode 100644 index 000000000..99473c29d --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayInstrumentation.kt @@ -0,0 +1,165 @@ +package com.launchdarkly.observability.replay + +import com.launchdarkly.observability.interfaces.LDExtendedInstrumentation +import io.opentelemetry.android.instrumentation.AndroidInstrumentation +import io.opentelemetry.android.instrumentation.InstallationContext +import io.opentelemetry.api.logs.Logger +import io.opentelemetry.sdk.logs.LogRecordProcessor +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +private const val INSTRUMENTATION_SCOPE_NAME = "com.launchdarkly.observability.replay" + +// TODO: O11Y-625 - determine where these should be defined ultimately and tune accordingly. Perhaps +// we don't need a batching exporter in this layer. Perhaps this layer shouldn't be the one +// that decides the parameters of the batching exporter. Perhaps the batching should be +// controlled by the instrumentation manager. +private const val BATCH_MAX_QUEUE_SIZE = 100 +private const val BATCH_SCHEDULE_DELAY_MS = 1000L +private const val BATCH_EXPORTER_TIMEOUT_MS = 5000L +private const val BATCH_MAX_EXPORT_SIZE = 10 + +/** + * Provides session replay instrumentation. Session replays that are sampled will appear on the LaunchDarkly dashboard. + * + * @param options Configuration options for replay behavior including privacy settings, capture interval, and backend URL + * + * @sample + * ```kotlin + * val ldConfig = LDConfig.Builder(LDConfig.Builder.AutoEnvAttributes.Enabled) + * .mobileKey("mobile-key-123abc") + * .plugins( + * Components.plugins().setPlugins( + * Collections.singletonList( + * Observability( + * this@BaseApplication, + * Options( + * resourceAttributes = Attributes.of( + * AttributeKey.stringKey("serviceName"), "example-service" + * ), + * instrumentations = listOf( + * ReplayInstrumentation( + * options = ReplayOptions( + * privacyProfile = PrivacyProfile.STRICT, + * ) + * ) + * ) + * ) + * ) + * ) + * ) + * ) + * .build(); + * ``` + * + * @see ReplayOptions for configuration options + * @see PrivacyProfile for privacy settings + */ +class ReplayInstrumentation( + private val options: ReplayOptions = ReplayOptions(), +) : LDExtendedInstrumentation { + + private lateinit var _otelLogger: Logger + private lateinit var _captureSource: CaptureSource + + private var _captureJob: Job? = null + private var _isPaused: Boolean = false + private val _captureMutex = Mutex() + + override val name: String = INSTRUMENTATION_SCOPE_NAME + + override fun install(ctx: InstallationContext) { + _otelLogger = ctx.openTelemetry.logsBridge.get(INSTRUMENTATION_SCOPE_NAME) + + _captureSource = CaptureSource(ctx.sessionManager, options.privacyProfile) + _captureSource.attachToApplication(ctx.application) + + // TODO: O11Y-621 - don't use global scope + // TODO: O11Y-621 - shutdown procedure and cleanup of dispatched jobs + GlobalScope.launch(Dispatchers.Default) { + _captureSource.captureFlow.collect { capture -> + _otelLogger.logRecordBuilder() + .setAttribute("event.domain", "media") + .setAttribute("image.width", capture.origWidth.toLong()) + .setAttribute("image.height", capture.origHeight.toLong()) + .setAttribute("image.data", capture.imageBase64) + .setAttribute("session.id", capture.session) + .setTimestamp(capture.timestamp, TimeUnit.MILLISECONDS) + .emit() + } + } + + // Start periodic capture automatically + internalStartCapture() + } + + // TODO: O11Y-622 - implement mechanism for customer code to invoke this method + suspend fun runCapture() { + _captureMutex.withLock { + // If already running (not paused), do nothing + if (!_isPaused) { + return + } + + // Clear paused flag and start/resume periodic capture + _isPaused = false + internalStartCapture() + } + } + + // TODO: O11Y-622 - implement mechanism for customer code to invoke this method + suspend fun pauseCapture() { + _captureMutex.withLock { + // if already paused, do nothing + if (_isPaused) { + return + } + + // pause the periodic capture by terminating the job + _isPaused = true + _captureJob?.cancel() + _captureJob = null + } + } + + private fun internalStartCapture() { + // TODO: O11Y-621 - don't use global scope + _captureJob = GlobalScope.launch(Dispatchers.Default) { + try { + while (true) { + // Perform capture + _captureSource.captureNow() + delay(options.capturePeriodMillis) + } + } finally { + // Job completed or was cancelled + } + } + } + + override fun getLoggerScopeName(): String = INSTRUMENTATION_SCOPE_NAME + + override fun getLogRecordProcessor(credential: String): LogRecordProcessor { + val exporter = RRwebGraphQLReplayLogExporter( + organizationVerboseId = credential, // the SDK credential is used as the organization ID intentionally + backendUrl = options.backendUrl, + serviceName = options.serviceName, + serviceVersion = options.serviceVersion, + ) + + return BatchLogRecordProcessor.builder(exporter) + .setMaxQueueSize(BATCH_MAX_QUEUE_SIZE) + .setScheduleDelay(BATCH_SCHEDULE_DELAY_MS, TimeUnit.MILLISECONDS) + .setExporterTimeout(BATCH_EXPORTER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .setMaxExportBatchSize(BATCH_MAX_EXPORT_SIZE) + .build() + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayOptions.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayOptions.kt new file mode 100644 index 000000000..e1a552c10 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplayOptions.kt @@ -0,0 +1,22 @@ +package com.launchdarkly.observability.replay + +import com.launchdarkly.observability.BuildConfig +import com.launchdarkly.observability.api.DEFAULT_BACKEND_URL + +/** + * Options for the [ReplayInstrumentation] + * + * @property backendUrl The backend URL for sending replay data. Defaults to LaunchDarkly url. + * @property debug enables verbose logging if true as well as other debug functionality. Defaults to false. + * @property privacyProfile privacy profile that controls masking behavior + * @property capturePeriodMillis period between captures + */ +data class ReplayOptions( + val serviceName: String = "observability-android", + val serviceVersion: String = BuildConfig.OBSERVABILITY_SDK_VERSION, + val backendUrl: String = DEFAULT_BACKEND_URL, + val debug: Boolean = false, + val privacyProfile: PrivacyProfile = PrivacyProfile.STRICT, + val capturePeriodMillis: Long = 1000, // defaults to ever 1 second + // TODO O11Y-623 - Add storage options +) diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplaySessionProtocol.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplaySessionProtocol.kt new file mode 100644 index 000000000..5aa23048f --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/ReplaySessionProtocol.kt @@ -0,0 +1,251 @@ +package com.launchdarkly.observability.replay + +import com.launchdarkly.observability.network.SamplingConfigResponse +import com.launchdarkly.observability.sampling.SamplingConfig +import kotlinx.serialization.EncodeDefault +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +import kotlin.reflect.KClass + +@Serializable +data class InitializeReplaySessionResponse( + val initializeSession: InitializeSessionResponse? +) + +@Serializable +data class IdentifySessionResponse( + val identifySession: String? = null +) + +data class SessionInitializationEntity( + val secureId: String?, + val projectId: String?, + val sampling: SamplingConfig? +) + +@Serializable +data class InitializeSessionResponse( + @SerialName("secure_id") + val secureId: String? = null, + @SerialName("project_id") + val projectId: String? = null, + val sampling: SamplingConfigResponse? = null +) { + fun mapToEntity(): SessionInitializationEntity? { + return SessionInitializationEntity( + secureId = secureId, + projectId = projectId, + sampling = sampling?.mapToEntity() + ) + } +} + +@Serializable(with = EventTypeSerializer::class) +enum class EventType(val value: Int) { + DOM_CONTENT_LOADED(0), + LOAD(1), + FULL_SNAPSHOT(2), + INCREMENTAL_SNAPSHOT(3), + META(4), + CUSTOM(5), + PLUGIN(6) +} + +object EventTypeSerializer : IntEnumSerializer(EventType::class, "EventType", EventType::value) + +@Serializable(with = NodeTypeSerializer::class) +enum class NodeType(val value: Int) { + DOCUMENT(0), + DOCUMENT_TYPE(1), + ELEMENT(2), + TEXT(3), + CDATA(4), + COMMENT(5) +} + +object NodeTypeSerializer : IntEnumSerializer(NodeType::class, "NodeType", NodeType::value) + +@Serializable(with = IncrementalSourceSerializer::class) +enum class IncrementalSource(val value: Int) { + MUTATION(0), + MOUSE_MOVE(1), + MOUSE_INTERACTION(2), + SCROLL(3), + VIEWPORT_RESIZE(4), + INPUT(5), + TOUCH_MOVE(6), + MEDIA_INTERACTION(7), + STYLE_SHEET_RULE(8), + CANVAS_MUTATION(9), + FONT(10), + LOG(11), + DRAG(12), + STYLE_DECLARATION(13), + SELECTION(14), + ADOPTED_STYLE_SHEET(15), + CUSTOM_ELEMENT(16) +} + +object IncrementalSourceSerializer : IntEnumSerializer(IncrementalSource::class, "IncrementalSource", IncrementalSource::value) + +@Serializable(with = MouseInteractionsSerializer::class) +enum class MouseInteractions(val value: Int) { + MOUSE_UP(0), + MOUSE_DOWN(1), + CLICK(2), + CONTEXT_MENU(3), + DBL_CLICK(4), + FOCUS(5), + BLUR(6), + TOUCH_START(7), + TOUCH_MOVE_DEPARTED(8), + TOUCH_END(9), + TOUCH_CANCEL(10) +} + +object MouseInteractionsSerializer : IntEnumSerializer(MouseInteractions::class, "MouseInteractions", MouseInteractions::value) + +open class IntEnumSerializer>( + enumClass: KClass, + private val serialName: String, + private val valueSelector: (T) -> Int +) : KSerializer { + private val entries: List = enumClass.java.enumConstants?.toList() ?: emptyList() + private val lookup: Map = entries.associateBy(valueSelector) + + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor(serialName, PrimitiveKind.INT) + + override fun serialize(encoder: Encoder, value: T) { + encoder.encodeInt(valueSelector(value)) + } + + override fun deserialize(decoder: Decoder): T { + val intValue = decoder.decodeInt() + // TODO: O11Y-624 - determine better error handling + return lookup[intValue] + ?: throw IllegalArgumentException("Unknown $serialName value: $intValue") + } +} + +@Serializable +data class EventNode( + val type: NodeType, + val name: String? = null, + val tagName: String? = null, + val attributes: Map? = null, + // This EncodeDefault is needed as a workaround, rrweb replay is expecting childNodes to be present even when empty list + @EncodeDefault val childNodes: List = emptyList(), + val rootId: Int? = null, + val id: Int? = null +) + +@Serializable +data class Attributes( + val id: Int? = null, + val attributes: Map? = null +) + +@Serializable +data class Removal( + val parentId: Int, + val id: Int +) + +@Serializable +data class Addition( + val parentId: Int, + val nextId: Int? = null, + val node: EventNode +) + +@Serializable +data class EventData( + val source: IncrementalSource? = null, + val type: MouseInteractions? = null, + val texts: List? = null, + val attributes: List? = null, + val href: String? = null, + val width: Int? = null, + val height: Int? = null, + val node: EventNode? = null, + val removes: List? = null, + val adds: List? = null, + val id: Int? = null, + val x: Double? = null, + val y: Double? = null, +) + +object EventDataUnionSerializer : KSerializer { + override val descriptor: SerialDescriptor = buildClassSerialDescriptor("EventDataUnion") + + override fun serialize(encoder: Encoder, value: EventDataUnion) { + when (value) { + is EventDataUnion.StandardEventData -> { + encoder.encodeSerializableValue(EventData.serializer(), value.data) + } + is EventDataUnion.CustomEventDataWrapper -> { + encoder.encodeSerializableValue(JsonElement.serializer(), value.data) + } + } + } + + override fun deserialize(decoder: Decoder): EventDataUnion { + // For deserialization, we need to determine the type based on the content + // This is a simplified implementation - in practice, you might need more sophisticated logic + // to determine whether the data should be StandardEventData or CustomEventDataWrapper + val jsonElement = decoder.decodeSerializableValue(JsonElement.serializer()) + + // Try to deserialize as StandardEventData first + return try { + val eventData = Json.decodeFromJsonElement(EventData.serializer(), jsonElement) + EventDataUnion.StandardEventData(eventData) + } catch (e: Exception) { + // If that fails, treat as CustomEventDataWrapper with JsonElement + EventDataUnion.CustomEventDataWrapper(jsonElement) + } + } +} + +@Serializable(with = EventDataUnionSerializer::class) +sealed class EventDataUnion { + @Serializable + data class StandardEventData(val data: EventData) : EventDataUnion() + + @Serializable + data class CustomEventDataWrapper(val data: JsonElement) : EventDataUnion() +} + +@Serializable +data class Event( + val type: EventType, + val data: EventDataUnion, + val timestamp: Long? = null, + @SerialName("_sid") + val sid: Int +) + +@Serializable +data class ReplayEventsInput( + val events: List +) + +@Serializable +data class ErrorObjectInput( + val message: String? = null, + val stack: String? = null, + val timestamp: Long? = null +) + +@Serializable +data class PushPayloadResponse( + val pushPayload: Int? = null +) diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/SessionReplayApiService.kt b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/SessionReplayApiService.kt new file mode 100644 index 000000000..1b8c39426 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/replay/SessionReplayApiService.kt @@ -0,0 +1,141 @@ +package com.launchdarkly.observability.replay + +import android.util.Log +import com.launchdarkly.observability.BuildConfig +import com.launchdarkly.observability.network.GraphQLClient +import com.launchdarkly.observability.network.GraphQLResponse +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonNull +import kotlinx.serialization.json.JsonPrimitive + +// TODO: O11Y-627 - Refactor logging handling in this class +class SessionReplayApiService( + private val graphqlClient: GraphQLClient, + val serviceName: String, + val serviceVersion: String, +) { + private val json: Json = Json { + isLenient = true + ignoreUnknownKeys = true + } + + companion object { + private const val INITIALIZE_REPLAY_SESSION_QUERY_FILE_PATH = "graphql/InitializeReplaySession.graphql" + private const val IDENTIFY_REPLAY_SESSION_QUERY_FILE_PATH = "graphql/IdentifyReplaySession.graphql" + private const val PUSH_PAYLOAD_QUERY_FILE_PATH = "graphql/PushPayload.graphql" + } + + /** + * Initializes a replay session + * @param organizationVerboseId The organization verbose ID + */ + suspend fun initializeReplaySession(organizationVerboseId: String, sessionSecureId: String) { + try { + val variables = mapOf( + "organization_verbose_id" to JsonPrimitive(organizationVerboseId), + "session_secure_id" to JsonPrimitive(sessionSecureId), + "enable_strict_privacy" to JsonPrimitive(false), + "enable_recording_network_contents" to JsonPrimitive(false), + "clientVersion" to JsonPrimitive(BuildConfig.OBSERVABILITY_SDK_VERSION), + "firstloadVersion" to JsonPrimitive(BuildConfig.OBSERVABILITY_SDK_VERSION), + "clientConfig" to JsonPrimitive("{}"), // TODO: O11Y-631 - remove hardcoded params + "environment" to JsonPrimitive(""), // TODO: O11Y-631 - remove hardcoded params + "appVersion" to JsonPrimitive(serviceVersion), + "serviceName" to JsonPrimitive(serviceName), + "fingerprint" to JsonPrimitive(""), // TODO: O11Y-631 - remove hardcoded params + "client_id" to JsonPrimitive(""), // TODO: O11Y-631 - remove hardcoded params + "network_recording_domains" to JsonArray(emptyList()), + "privacy_setting" to JsonPrimitive("none"), // TODO: O11Y-631 - remove hardcoded params + "id" to JsonPrimitive("") // TODO: O11Y-631 - remove hardcoded params + ) + val response = graphqlClient.execute( + queryFileName = INITIALIZE_REPLAY_SESSION_QUERY_FILE_PATH, + variables = variables, + dataSerializer = InitializeReplaySessionResponse.serializer() + ) + + // TODO: O11Y-624 - check graphql requests can generate errors when necessary and add error handling + if (response.errors?.isNotEmpty() == true) { + printErrors(response) + } + } catch (e: Exception) { + Log.e("SessionReplayApiService", "Error initializing replay session: ${e.message}") + } + } + + /** + * Identifies a replay session with user information + * @param sessionSecureId The session secure ID + * @param userIdentifier The user identifier (defaults to "unknown") + * @param userObject Optional user object with key-value pairs + */ + suspend fun identifyReplaySession( + sessionSecureId: String, + userIdentifier: String = "", // TODO: O11Y-631 - remove hardcoded params + userObject: JsonElement = JsonNull + ) { + try { + val variables = mapOf( + "session_secure_id" to JsonPrimitive(sessionSecureId), + "user_identifier" to JsonPrimitive(userIdentifier), + "user_object" to userObject + ) + + val response = graphqlClient.execute( + queryFileName = IDENTIFY_REPLAY_SESSION_QUERY_FILE_PATH, + variables = variables, + dataSerializer = IdentifySessionResponse.serializer() + ) + + if (response.errors?.isNotEmpty() == true) { + printErrors(response) + } + } catch (e: Exception) { + Log.e("SessionReplayApiService", "Error identifying replay session: ${e.message}") + + } + } + + /** + * Pushes session replay events + * @param sessionSecureId The session secure ID + * @param payloadId The payload ID + * @param events The list of events to push + */ + suspend fun pushPayload(sessionSecureId: String, payloadId: String, events: List) { + try { + val variables = mapOf( + "session_secure_id" to JsonPrimitive(sessionSecureId), + "payload_id" to JsonPrimitive(payloadId), + "events" to json.encodeToJsonElement(ReplayEventsInput.serializer(), ReplayEventsInput(events)), + "messages" to JsonPrimitive("{\"messages\":[]}"), + "resources" to JsonPrimitive("{\"resources\":[]}"), + "web_socket_events" to JsonPrimitive("{\"webSocketEvents\":[]}"), + "errors" to JsonArray(emptyList()), + ) + + val response = graphqlClient.execute( + queryFileName = PUSH_PAYLOAD_QUERY_FILE_PATH, + variables = variables, + dataSerializer = PushPayloadResponse.serializer() + ) + + if (response.errors?.isNotEmpty() == true) { + printErrors(response) + } + } catch (e: Exception) { + Log.e("SessionReplayApiService", "Error pushing payload: ${e.message}") + } + } + + private fun printErrors(response: GraphQLResponse) { + response.errors?.forEach { error -> + Log.e("SessionReplayApiService", "GraphQL Error: ${error.message}") + error.locations?.forEach { location -> + Log.e("SessionReplayApiService", " at line ${location.line}, column ${location.column}") + } + } + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/IdentifyReplaySession.graphql b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/IdentifyReplaySession.graphql new file mode 100644 index 000000000..a3ac9c261 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/IdentifyReplaySession.graphql @@ -0,0 +1,11 @@ +mutation identifySession( + $session_secure_id: String! + $user_identifier: String! + $user_object: Any +) { + identifySession( + session_secure_id: $session_secure_id + user_identifier: $user_identifier + user_object: $user_object + ) +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/InitializeReplaySession.graphql b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/InitializeReplaySession.graphql new file mode 100644 index 000000000..b0274b197 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/InitializeReplaySession.graphql @@ -0,0 +1,87 @@ +fragment MatchParts on MatchConfig { + regexValue + matchValue +} + +mutation initializeSession( + $session_secure_id: String! + $organization_verbose_id: String! + $enable_strict_privacy: Boolean! + $privacy_setting: String! + $enable_recording_network_contents: Boolean! + $clientVersion: String! + $firstloadVersion: String! + $clientConfig: String! + $environment: String! + $id: String! + $appVersion: String + $serviceName: String! + $client_id: String! + $network_recording_domains: [String!] +) { + initializeSession( + session_secure_id: $session_secure_id + organization_verbose_id: $organization_verbose_id + enable_strict_privacy: $enable_strict_privacy + enable_recording_network_contents: $enable_recording_network_contents + clientVersion: $clientVersion + firstloadVersion: $firstloadVersion + clientConfig: $clientConfig + environment: $environment + appVersion: $appVersion + serviceName: $serviceName + fingerprint: $id + client_id: $client_id + network_recording_domains: $network_recording_domains + privacy_setting: $privacy_setting + ) { + secure_id + project_id + sampling { + spans { + name { + ...MatchParts + } + attributes { + key { + ...MatchParts + } + attribute { + ...MatchParts + } + } + events { + name { + ...MatchParts + } + attributes { + key { + ...MatchParts + } + attribute { + ...MatchParts + } + } + } + samplingRatio + } + logs { + message { + ...MatchParts + } + severityText { + ...MatchParts + } + attributes { + key { + ...MatchParts + } + attribute { + ...MatchParts + } + } + samplingRatio + } + } + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/PushPayload.graphql b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/PushPayload.graphql new file mode 100644 index 000000000..08a0039f1 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/main/resources/graphql/PushPayload.graphql @@ -0,0 +1,19 @@ +mutation PushPayload( + $session_secure_id: String! + $payload_id: ID! + $events: ReplayEventsInput! + $messages: String! + $resources: String! + $web_socket_events: String! + $errors: [ErrorObjectInput]! +) { + pushPayload( + session_secure_id: $session_secure_id + payload_id: $payload_id + events: $events + messages: $messages + resources: $resources + web_socket_events: $web_socket_events + errors: $errors + ) +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/InstrumentationManagerTest.kt b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/InstrumentationManagerTest.kt new file mode 100644 index 000000000..091175d18 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/InstrumentationManagerTest.kt @@ -0,0 +1,139 @@ +package com.launchdarkly.observability.client + +import com.launchdarkly.logging.LDLogger +import com.launchdarkly.observability.api.Options +import com.launchdarkly.observability.interfaces.LDExtendedInstrumentation +import com.launchdarkly.observability.sampling.ExportSampler +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder +import io.opentelemetry.sdk.resources.Resource +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.* + +/** + * Test class focused on testing the createLoggerProcessor method logic. + * This test verifies that the RoutingLogRecordProcessor is properly configured + * with instrumentation-specific log record processors. + */ +class InstrumentationManagerTest { + + private lateinit var mockSdkLoggerProviderBuilder: SdkLoggerProviderBuilder + private lateinit var mockExportSampler: ExportSampler + private lateinit var mockLogger: LDLogger + private lateinit var testResource: Resource + private lateinit var testSdkKey: String + private lateinit var testOptions: Options + + @BeforeEach + fun setup() { + mockSdkLoggerProviderBuilder = mockk(relaxed = true) + mockExportSampler = mockk(relaxed = true) + mockLogger = mockk(relaxed = true) + testResource = Resource.create(Attributes.empty()) + testSdkKey = "test-sdk-key" + testOptions = Options() + } + + @Test + fun `createLoggerProcessor should register instrumentation log record processors with correct scope names`() { + // Arrange + val mockInstrumentation1 = mockk(relaxed = true) + val mockInstrumentation2 = mockk(relaxed = true) + val mockLogRecordProcessor1 = mockk(relaxed = true) + val mockLogRecordProcessor2 = mockk(relaxed = true) + + val scopeName1 = "com.test.instrumentation1" + val scopeName2 = "com.test.instrumentation2" + + every { mockInstrumentation1.getLoggerScopeName() } returns scopeName1 + every { mockInstrumentation1.getLogRecordProcessor(testSdkKey) } returns mockLogRecordProcessor1 + every { mockInstrumentation2.getLoggerScopeName() } returns scopeName2 + every { mockInstrumentation2.getLogRecordProcessor(testSdkKey) } returns mockLogRecordProcessor2 + + testOptions = Options(instrumentations = listOf(mockInstrumentation1, mockInstrumentation2)) + + // Act + val logProcessor = InstrumentationManager.createLoggerProcessor( + mockSdkLoggerProviderBuilder, + mockExportSampler, + testSdkKey, + testResource, + mockLogger, + null, + testOptions + ) + + // Assert + assertNotNull(logProcessor) + + // Verify that the logger provider builder was configured with resource + verify { mockSdkLoggerProviderBuilder.setResource(testResource) } + + // Verify that instrumentation methods were called + verify { mockInstrumentation1.getLoggerScopeName() } + verify { mockInstrumentation1.getLogRecordProcessor(testSdkKey) } + verify { mockInstrumentation2.getLoggerScopeName() } + verify { mockInstrumentation2.getLogRecordProcessor(testSdkKey) } + } + + @Test + fun `createLoggerProcessor should handle instrumentations with null log record processors`() { + // Arrange + val mockInstrumentation = mockk(relaxed = true) + val scopeName = "com.test.instrumentation" + + every { mockInstrumentation.getLoggerScopeName() } returns scopeName + every { mockInstrumentation.getLogRecordProcessor(testSdkKey) } returns null + + testOptions = Options(instrumentations = listOf(mockInstrumentation)) + + // Act + val logProcessor = InstrumentationManager.createLoggerProcessor( + mockSdkLoggerProviderBuilder, + mockExportSampler, + testSdkKey, + testResource, + mockLogger, + null, + testOptions + ) + + // Assert + assertNotNull(logProcessor) + + // Verify that the logger provider builder was configured + verify { mockSdkLoggerProviderBuilder.setResource(testResource) } + + // Verify that instrumentation methods were called + verify { mockInstrumentation.getLogRecordProcessor(testSdkKey) } + // Verify that getLoggerScopeName() is NOT called when getLogRecordProcessor returns null + verify(exactly = 0) { mockInstrumentation.getLoggerScopeName() } + } + + @Test + fun `createLoggerProcessor should handle empty instrumentations list`() { + // Arrange + testOptions = Options(instrumentations = emptyList()) + + // Act + val logProcessor = InstrumentationManager.createLoggerProcessor( + mockSdkLoggerProviderBuilder, + mockExportSampler, + testSdkKey, + testResource, + mockLogger, + null, + testOptions + ) + + // Assert + assertNotNull(logProcessor) + + // Verify that the logger provider builder was configured + verify { mockSdkLoggerProviderBuilder.setResource(testResource) } + } +} \ No newline at end of file diff --git a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessorTest.kt b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessorTest.kt new file mode 100644 index 000000000..3b168daf8 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/client/RoutingLogRecordProcessorTest.kt @@ -0,0 +1,156 @@ +package com.launchdarkly.observability.client + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import io.opentelemetry.context.Context +import io.opentelemetry.sdk.common.InstrumentationScopeInfo +import io.opentelemetry.sdk.logs.LogRecordProcessor +import io.opentelemetry.sdk.logs.ReadWriteLogRecord +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class RoutingLogRecordProcessorTest { + + private lateinit var mockLogRecordProcessor: LogRecordProcessor + private lateinit var mockFallthroughProcessor: LogRecordProcessor + private lateinit var routingProcessor: RoutingLogRecordProcessor + + @BeforeEach + fun setup() { + mockLogRecordProcessor = mockk(relaxed = true) + mockFallthroughProcessor = mockk(relaxed = true) + routingProcessor = RoutingLogRecordProcessor(fallthroughProcessor = mockFallthroughProcessor) + } + + @Test + fun `should route logs to registered processor when scope name matches`() { + // Arrange + val testScopeName = "com.test.instrumentation" + val mockContext = Context.current() + + // Create mock log record with matching scope + val mockLogRecord = mockk(relaxed = true) + val mockInstrumentationScopeInfo = mockk(relaxed = true) + + every { mockLogRecord.instrumentationScopeInfo } returns mockInstrumentationScopeInfo + every { mockInstrumentationScopeInfo.name } returns testScopeName + + // Register the processor for the test scope + routingProcessor.addProcessor(testScopeName, mockLogRecordProcessor) + + // Act + routingProcessor.onEmit(mockContext, mockLogRecord) + + // Assert - Should route to the registered processor + verify { mockLogRecordProcessor.onEmit(mockContext, mockLogRecord) } + verify(exactly = 0) { mockFallthroughProcessor.onEmit(any(), any()) } + } + + @Test + fun `should route logs to fallthrough processor when scope name does not match`() { + // Arrange + val testScopeName = "com.test.instrumentation" + val nonMatchingScopeName = "com.random.scope" + val mockContext = Context.current() + + // Create mock log record with non-matching scope + val mockLogRecord = mockk(relaxed = true) + val mockInstrumentationScopeInfo = mockk(relaxed = true) + + every { mockLogRecord.instrumentationScopeInfo } returns mockInstrumentationScopeInfo + every { mockInstrumentationScopeInfo.name } returns nonMatchingScopeName + + // Register the processor for a different scope + routingProcessor.addProcessor(testScopeName, mockLogRecordProcessor) + + // Act + routingProcessor.onEmit(mockContext, mockLogRecord) + + // Assert - Should route to fallthrough processor since scope doesn't match + verify { mockFallthroughProcessor.onEmit(mockContext, mockLogRecord) } + verify(exactly = 0) { mockLogRecordProcessor.onEmit(any(), any()) } + } + + @Test + fun `should handle multiple registered processors correctly`() { + // Arrange + val scopeName1 = "com.test.instrumentation1" + val scopeName2 = "com.test.instrumentation2" + val mockContext = Context.current() + + val mockProcessor1 = mockk(relaxed = true) + val mockProcessor2 = mockk(relaxed = true) + + // Create mock log records for each scope + val mockLogRecord1 = mockk(relaxed = true) + val mockLogRecord2 = mockk(relaxed = true) + + val mockScopeInfo1 = mockk(relaxed = true) + val mockScopeInfo2 = mockk(relaxed = true) + + every { mockLogRecord1.instrumentationScopeInfo } returns mockScopeInfo1 + every { mockLogRecord2.instrumentationScopeInfo } returns mockScopeInfo2 + every { mockScopeInfo1.name } returns scopeName1 + every { mockScopeInfo2.name } returns scopeName2 + + // Register both processors + routingProcessor.addProcessor(scopeName1, mockProcessor1) + routingProcessor.addProcessor(scopeName2, mockProcessor2) + + // Act & Assert - Test first processor + routingProcessor.onEmit(mockContext, mockLogRecord1) + verify { mockProcessor1.onEmit(mockContext, mockLogRecord1) } + + // Act & Assert - Test second processor + routingProcessor.onEmit(mockContext, mockLogRecord2) + verify { mockProcessor2.onEmit(mockContext, mockLogRecord2) } + } + + @Test + fun `should use fallthrough processor when no processors are registered`() { + // Arrange + val testScopeName = "com.test.instrumentation" + val mockContext = Context.current() + + // Create mock log record + val mockLogRecord = mockk(relaxed = true) + val mockInstrumentationScopeInfo = mockk(relaxed = true) + + every { mockLogRecord.instrumentationScopeInfo } returns mockInstrumentationScopeInfo + every { mockInstrumentationScopeInfo.name } returns testScopeName + + // Don't register any processors + + // Act + routingProcessor.onEmit(mockContext, mockLogRecord) + + // Assert - Should use fallthrough processor since no processors are registered + verify { mockFallthroughProcessor.onEmit(mockContext, mockLogRecord) } + } + + @Test + fun `should handle empty scope name correctly`() { + // Arrange + val testScopeName = "com.test.instrumentation" + val emptyScopeName = "" + val mockContext = Context.current() + + // Create mock log record with empty scope name + val mockLogRecord = mockk(relaxed = true) + val mockInstrumentationScopeInfo = mockk(relaxed = true) + + every { mockLogRecord.instrumentationScopeInfo } returns mockInstrumentationScopeInfo + every { mockInstrumentationScopeInfo.name } returns emptyScopeName + + // Register processor for a different scope + routingProcessor.addProcessor(testScopeName, mockLogRecordProcessor) + + // Act + routingProcessor.onEmit(mockContext, mockLogRecord) + + // Assert - Should use fallthrough processor since scope names don't match + verify { mockFallthroughProcessor.onEmit(mockContext, mockLogRecord) } + verify(exactly = 0) { mockLogRecordProcessor.onEmit(any(), any()) } + } +} diff --git a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/GraphQLClientTest.kt b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/GraphQLClientTest.kt index a0c147f3a..d4399c797 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/GraphQLClientTest.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/GraphQLClientTest.kt @@ -6,6 +6,7 @@ import io.mockk.unmockkAll import io.mockk.verify import kotlinx.coroutines.test.runTest import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonPrimitive import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -70,7 +71,7 @@ class GraphQLClientTest { val result = graphQLClient.execute( queryFileName = "test-query.graphql", - variables = mapOf("test_variable" to "567"), + variables = mapOf("test_variable" to JsonPrimitive("567")), dataSerializer = TestData.serializer() ) diff --git a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/SamplingApiServiceTest.kt b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/SamplingApiServiceTest.kt index 85a82afc2..4e3f24ff0 100644 --- a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/SamplingApiServiceTest.kt +++ b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/network/SamplingApiServiceTest.kt @@ -4,6 +4,7 @@ import io.mockk.coEvery import io.mockk.coVerify import io.mockk.mockk import kotlinx.coroutines.test.runTest +import kotlinx.serialization.json.JsonPrimitive import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Test @@ -54,7 +55,7 @@ class SamplingApiServiceTest { coEvery { mockGraphqlClient.execute( "graphql/GetSamplingConfigQuery.graphql", - mapOf("organization_verbose_id" to organizationId), + mapOf("organization_verbose_id" to JsonPrimitive(organizationId)), SamplingResponse.serializer() ) } returns graphqlResponse @@ -67,7 +68,7 @@ class SamplingApiServiceTest { coVerify(exactly = 1) { mockGraphqlClient.execute( "graphql/GetSamplingConfigQuery.graphql", - mapOf("organization_verbose_id" to organizationId), + mapOf("organization_verbose_id" to JsonPrimitive(organizationId)), SamplingResponse.serializer() ) } diff --git a/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporterTest.kt b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporterTest.kt new file mode 100644 index 000000000..94db10030 --- /dev/null +++ b/sdk/@launchdarkly/observability-android/lib/src/test/kotlin/com/launchdarkly/observability/replay/RRwebGraphQLReplayLogExporterTest.kt @@ -0,0 +1,450 @@ +package com.launchdarkly.observability.replay + +import io.mockk.* +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.logs.data.LogRecordData +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.BeforeEach +import java.util.concurrent.TimeUnit + +class RRwebGraphQLReplayLogExporterTest { + + private lateinit var mockService: SessionReplayApiService + private lateinit var exporter: RRwebGraphQLReplayLogExporter + + @BeforeEach + fun setUp() { + mockService = mockk(relaxed = true) + exporter = RRwebGraphQLReplayLogExporter( + organizationVerboseId = "test-org", + backendUrl = "http://test.com", + serviceName = "test-service", + serviceVersion = "1.0.0", + injectedReplayApiService = mockService + ) + } + + @Test + fun `constructor with mock service should use injected service`() = runTest { + // Create a mock service + val mockService = mockk(relaxed = true) + + // Create the exporter with the mock service + val exporter = RRwebGraphQLReplayLogExporter( + organizationVerboseId = "test-org", + backendUrl = "http://test.com", + serviceName = "test-service", + serviceVersion = "1.0.0", + injectedReplayApiService = mockService + ) + + // Verify the exporter was created successfully + assertNotNull(exporter) + } + + @Test + fun `constructor without mock service should create default service`() = runTest { + // Create the exporter without injecting a service (should create default) + val exporter = RRwebGraphQLReplayLogExporter( + organizationVerboseId = "test-org", + backendUrl = "http://test.com", + serviceName = "test-service", + serviceVersion = "1.0.0" + ) + + // Verify the exporter was created successfully + assertNotNull(exporter) + } + + @Test + fun `export should send full capture for first session and incremental for subsequent captures in same session`() = runTest { + // Arrange: Create captures for two different sessions + val sessionACaptures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), + Capture("base64data2", 800, 600, 2000L, "session-a"), + Capture("base64data3", 800, 600, 3000L, "session-a") + ) + + val sessionBCaptures = listOf( + Capture("base64data4", 1024, 768, 4000L, "session-b"), + Capture("base64data5", 1024, 768, 5000L, "session-b") + ) + + val allCaptures = sessionACaptures + sessionBCaptures + val logRecords = createLogRecordsFromCaptures(allCaptures) + + // Capture the events sent to pushPayload + val capturedEvents = mutableListOf>() + + // Mock the API service methods + coEvery { mockService.initializeReplaySession(any(), any()) } just Runs + coEvery { mockService.identifyReplaySession(any()) } just Runs + coEvery { mockService.pushPayload(any(), any(), capture(capturedEvents)) } just Runs + + // Act: Export all log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result completes successfully + assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify full capture calls for session A (first capture only) + coVerify(exactly = 1) { + mockService.initializeReplaySession("test-org", "session-a") + } + coVerify(exactly = 1) { + mockService.identifyReplaySession("session-a") + } + + // Verify full capture calls for session B (first capture only) + coVerify(exactly = 1) { + mockService.initializeReplaySession("test-org", "session-b") + } + coVerify(exactly = 1) { + mockService.identifyReplaySession("session-b") + } + + // Verify pushPayload is called for all captures (3 for session A + 2 for session B = 5 total) + coVerify(exactly = 5) { + mockService.pushPayload(any(), any(), any()) + } + + // Verify event types: First capture should be full (3 events), subsequent should be incremental (2 events each) + assertEquals(5, capturedEvents.size) + + // Session A: First capture (full) + 2 incremental captures + verifyFullCaptureEvents(capturedEvents[0]) // First capture should be full + verifyIncrementalCaptureEvents(capturedEvents[1]) // Second capture should be incremental + verifyIncrementalCaptureEvents(capturedEvents[2]) // Third capture should be incremental + + // Session B: First capture (full) + 1 incremental capture + verifyFullCaptureEvents(capturedEvents[3]) // First capture should be full + verifyIncrementalCaptureEvents(capturedEvents[4]) // Second capture should be incremental + } + + @Test + fun `export should send full capture when dimensions change within same session`() = runTest { + // Arrange: Create captures for same session but with dimension changes + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), // First capture - full + Capture("base64data2", 800, 600, 2000L, "session-a"), // Same dimensions - incremental + Capture("base64data3", 1024, 768, 3000L, "session-a"), // Dimension change - full + Capture("base64data4", 1024, 768, 4000L, "session-a") // Same dimensions - incremental + ) + + val logRecords = createLogRecordsFromCaptures(captures) + + // Capture the events sent to pushPayload + val capturedEvents = mutableListOf>() + + // Mock the API service methods + coEvery { mockService.initializeReplaySession(any(), any()) } just Runs + coEvery { mockService.identifyReplaySession(any()) } just Runs + coEvery { mockService.pushPayload(any(), any(), capture(capturedEvents)) } just Runs + + // Act: Export all log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result completes successfully + assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify initializeReplaySession is called twice (first capture + dimension change) + coVerify(exactly = 2) { + mockService.initializeReplaySession("test-org", "session-a") + } + + // Verify identifyReplaySession is called twice (first capture + dimension change) + coVerify(exactly = 2) { + mockService.identifyReplaySession("session-a") + } + + // Verify pushPayload is called for all captures + coVerify(exactly = 4) { + mockService.pushPayload("session-a", any(), any()) + } + + // Verify event types: First and third captures should be full, second and fourth should be incremental + assertEquals(4, capturedEvents.size) + verifyFullCaptureEvents(capturedEvents[0]) // First capture - full + verifyIncrementalCaptureEvents(capturedEvents[1]) // Second capture - incremental + verifyFullCaptureEvents(capturedEvents[2]) // Third capture - full (dimension change) + verifyIncrementalCaptureEvents(capturedEvents[3]) // Fourth capture - incremental + } + + @Test + fun `export should handle mixed valid and invalid log records`() = runTest { + // Arrange: Create mix of valid and invalid log records + val validCaptures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), + Capture("base64data2", 800, 600, 2000L, "session-a") + ) + + val validLogRecords = createLogRecordsFromCaptures(validCaptures) + val invalidLogRecords = listOf( + createLogRecordWithAttributes( + eventDomain = "invalid-domain", // Wrong domain + imageWidth = 800L, + imageHeight = 600L, + imageData = "base64data", + sessionId = "session-a" + ), + createLogRecordWithAttributes( + eventDomain = "media", + imageWidth = null, // Missing width + imageHeight = 600L, + imageData = "base64data", + sessionId = "session-a" + ) + ) + + val allLogRecords = validLogRecords + invalidLogRecords + + // Mock the API service methods + coEvery { mockService.initializeReplaySession(any(), any()) } just Runs + coEvery { mockService.identifyReplaySession(any()) } just Runs + coEvery { mockService.pushPayload(any(), any(), any()) } just Runs + + // Act: Export all log records + val result = exporter.export(allLogRecords.toMutableList()) + + // Assert: Verify the result completes successfully + assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify only valid captures are processed + coVerify(exactly = 1) { + mockService.initializeReplaySession("test-org", "session-a") + } + coVerify(exactly = 1) { + mockService.identifyReplaySession("session-a") + } + coVerify(exactly = 2) { + mockService.pushPayload("session-a", any(), any()) + } + } + + @Test + fun `export should handle empty log collection`() = runTest { + // Act: Export empty collection + val result = exporter.export(mutableListOf()) + + // Assert: Verify the result completes successfully + assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify no API calls are made + coVerify(exactly = 0) { mockService.initializeReplaySession(any(), any()) } + coVerify(exactly = 0) { mockService.identifyReplaySession(any()) } + coVerify(exactly = 0) { mockService.pushPayload(any(), any(), any()) } + } + + @Test + fun `export should handle API service failures gracefully`() = runTest { + // Arrange: Create a single capture to test basic failure handling + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a") + ) + val logRecords = createLogRecordsFromCaptures(captures) + + // Mock API service to throw exceptions + coEvery { mockService.initializeReplaySession(any(), any()) } throws RuntimeException("Network error") + coEvery { mockService.identifyReplaySession(any()) } throws RuntimeException("Authentication failed") + coEvery { mockService.pushPayload(any(), any(), any()) } throws RuntimeException("Server error") + + // Act: Export log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result fails due to API errors + assertFalse(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify API methods were called despite failures + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") } + coVerify(exactly = 0) { mockService.identifyReplaySession("session-a") } + coVerify(exactly = 0) { mockService.pushPayload("session-a", any(), any()) } + } + + @Test + fun `export should handle multiple captures in same session with proper state tracking`() = runTest { + // Arrange: Create two captures with same session and dimensions + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), + Capture("base64data2", 800, 600, 2000L, "session-a") + ) + val logRecords = createLogRecordsFromCaptures(captures) + + // Mock API service methods + coEvery { mockService.initializeReplaySession(any(), any()) } just Runs + coEvery { mockService.identifyReplaySession(any()) } just Runs + coEvery { mockService.pushPayload(any(), any(), any()) } just Runs + + // Act: Export log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result completes successfully + assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify API calls: First capture should be full, second should be incremental + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") } + coVerify(exactly = 1) { mockService.identifyReplaySession("session-a") } + coVerify(exactly = 2) { mockService.pushPayload("session-a", any(), any()) } + } + + @Test + fun `export should stop processing on first failure and not process remaining captures`() = runTest { + // Arrange: Create captures for two different sessions + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), + Capture("base64data2", 1024, 768, 2000L, "session-b") + ) + val logRecords = createLogRecordsFromCaptures(captures) + + // Mock API service: first session succeeds, second session fails + coEvery { mockService.initializeReplaySession("test-org", "session-a") } just Runs + coEvery { mockService.identifyReplaySession("session-a") } just Runs + coEvery { mockService.pushPayload("session-a", any(), any()) } just Runs + + coEvery { mockService.initializeReplaySession("test-org", "session-b") } throws RuntimeException("Network timeout") + coEvery { mockService.identifyReplaySession("session-b") } throws RuntimeException("Network timeout") + coEvery { mockService.pushPayload("session-b", any(), any()) } throws RuntimeException("Network timeout") + + // Act: Export log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result fails due to first failure + assertFalse(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify only first session was processed (second session should not be processed due to early termination) + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") } + coVerify(exactly = 1) { mockService.identifyReplaySession("session-a") } + coVerify(exactly = 1) { mockService.pushPayload("session-a", any(), any()) } + + // Verify second session was never processed + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-b") } + coVerify(exactly = 0) { mockService.identifyReplaySession("session-b") } + coVerify(exactly = 0) { mockService.pushPayload("session-b", any(), any()) } + } + + @Test + fun `export should handle pushPayload failure after successful initialization`() = runTest { + // Arrange: Create a single capture + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a") + ) + val logRecords = createLogRecordsFromCaptures(captures) + + // Mock API service: initialization succeeds but pushPayload fails + coEvery { mockService.initializeReplaySession(any(), any()) } just Runs + coEvery { mockService.identifyReplaySession(any()) } just Runs + coEvery { mockService.pushPayload(any(), any(), any()) } throws RuntimeException("Payload too large") + + // Act: Export log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result fails due to pushPayload failure + assertFalse(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify all API methods were called + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") } + coVerify(exactly = 1) { mockService.identifyReplaySession("session-a") } + coVerify(exactly = 1) { mockService.pushPayload("session-a", any(), any()) } + } + + @Test + fun `export should stop processing when first capture fails in same session`() = runTest { + // Arrange: Create two captures with same session and dimensions + val captures = listOf( + Capture("base64data1", 800, 600, 1000L, "session-a"), + Capture("base64data2", 800, 600, 2000L, "session-a") + ) + val logRecords = createLogRecordsFromCaptures(captures) + + // Mock API service: first capture fails, second should not be processed + coEvery { mockService.initializeReplaySession(any(), any()) } throws RuntimeException("Network error") + coEvery { mockService.identifyReplaySession(any()) } throws RuntimeException("Authentication failed") + coEvery { mockService.pushPayload(any(), any(), any()) } throws RuntimeException("Server error") + + // Act: Export log records + val result = exporter.export(logRecords.toMutableList()) + + // Assert: Verify the result fails due to first capture failure + assertFalse(result.join(5, TimeUnit.SECONDS).isSuccess) + + // Verify only first capture was attempted (second should not be processed due to early termination) + coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") } + coVerify(exactly = 0) { mockService.identifyReplaySession("session-a") } // Should not be called due to initializeReplaySession failure + coVerify(exactly = 0) { mockService.pushPayload("session-a", any(), any()) } // Should not be called due to initializeReplaySession failure + } + + // Helper functions + + /** + * Creates a list of LogRecordData from a list of Capture objects + */ + private fun createLogRecordsFromCaptures(captures: List): List { + return captures.map { capture -> + createLogRecordWithAttributes( + eventDomain = "media", + imageWidth = capture.origWidth.toLong(), + imageHeight = capture.origHeight.toLong(), + imageData = capture.imageBase64, + sessionId = capture.session, + timestamp = capture.timestamp * 1_000_000 // Convert to nanoseconds + ) + } + } + + /** + * Creates a LogRecordData with the specified attributes for testing + */ + private fun createLogRecordWithAttributes( + eventDomain: String?, + imageWidth: Long?, + imageHeight: Long?, + imageData: String?, + sessionId: String?, + timestamp: Long = System.currentTimeMillis() * 1_000_000 + ): LogRecordData { + val attributesBuilder = Attributes.builder() + + eventDomain?.let { attributesBuilder.put(AttributeKey.stringKey("event.domain"), it) } + imageWidth?.let { attributesBuilder.put(AttributeKey.longKey("image.width"), it) } + imageHeight?.let { attributesBuilder.put(AttributeKey.longKey("image.height"), it) } + imageData?.let { attributesBuilder.put(AttributeKey.stringKey("image.data"), it) } + sessionId?.let { attributesBuilder.put(AttributeKey.stringKey("session.id"), it) } + + return mockk().apply { + every { getAttributes() } returns attributesBuilder.build() + every { observedTimestampEpochNanos } returns timestamp + } + } + + /** + * Verifies that the events represent a full capture (META, FULL_SNAPSHOT, CUSTOM) + */ + private fun verifyFullCaptureEvents(events: List) { + assertEquals(3, events.size, "Full capture should have exactly 3 events") + + // Verify META event + val metaEvent = events.find { it.type == EventType.META } + assertNotNull(metaEvent, "Full capture should contain a META event") + + // Verify FULL_SNAPSHOT event + val fullSnapshotEvent = events.find { it.type == EventType.FULL_SNAPSHOT } + assertNotNull(fullSnapshotEvent, "Full capture should contain a FULL_SNAPSHOT event") + + // Verify CUSTOM event (viewport) + val customEvent = events.find { it.type == EventType.CUSTOM } + assertNotNull(customEvent, "Full capture should contain a CUSTOM event") + } + + /** + * Verifies that the events represent an incremental capture (2 INCREMENTAL_SNAPSHOT events) + */ + private fun verifyIncrementalCaptureEvents(events: List) { + assertEquals(2, events.size, "Incremental capture should have exactly 2 events") + + // Verify both events are INCREMENTAL_SNAPSHOT + val incrementalEvents = events.filter { it.type == EventType.INCREMENTAL_SNAPSHOT } + assertEquals(2, incrementalEvents.size, "Incremental capture should contain 2 INCREMENTAL_SNAPSHOT events") + } +} \ No newline at end of file