Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import io.opentelemetry.sdk.logs.export.LogRecordExporter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.jsonArray
import kotlinx.serialization.json.jsonObject

private const val REPLAY_EXPORTER_NAME = "RRwebGraphQLReplayLogExporter"
// size limit of accumulated continues canvas operations on the RRWeb player
private const val RRWEB_CANVAS_BUFFER_LIMIT = 10_000_000 // ~10mb
private const val RRWEB_CANVAS_DRAW_ENTOURAGE = 300 // 300 bytes

/**
* An [LogRecordExporter] that can send session replay capture logs to the backend using RRWeb syntax
Expand All @@ -33,9 +37,12 @@ class RRwebGraphQLReplayLogExporter(
val backendUrl: String,
val serviceName: String,
val serviceVersion: String,
private val injectedReplayApiService: SessionReplayApiService? = null
private val injectedReplayApiService: SessionReplayApiService? = null,
private val canvasBufferLimit: Int = RRWEB_CANVAS_BUFFER_LIMIT,
private val canvasDrawEntourage: Int = RRWEB_CANVAS_DRAW_ENTOURAGE
) : LogRecordExporter {
private val coroutineScope = CoroutineScope(DispatcherProviderHolder.current.io + SupervisorJob())
private val exportMutex = Mutex()

private var graphqlClient: GraphQLClient = GraphQLClient(backendUrl)
private val replayApiService: SessionReplayApiService =
Expand All @@ -56,12 +63,18 @@ class RRwebGraphQLReplayLogExporter(
)

private var lastSeenState = LastSeenState(sessionId = null, height = 0, width = 0)
private var generatingCanvasSize = 0
private var pushedCanvasSize = 0

override fun export(logs: MutableCollection<LogRecordData>): CompletableResultCode {
val resultCode = CompletableResultCode()

coroutineScope.launch {
// payloadIdCounter and pushedCanvasSize require to have a single reentrancy
exportMutex.withLock {
try {
generatingCanvasSize = pushedCanvasSize

// Map to collect events by session ID
val eventsBySession = mutableMapOf<String, MutableList<Event>>()
// Set to track sessions that need initialization
Expand All @@ -80,7 +93,8 @@ class RRwebGraphQLReplayLogExporter(

val stateChanged = capture.session != lastSeenState.sessionId ||
capture.origHeight != lastSeenState.height ||
capture.origWidth != lastSeenState.width
capture.origWidth != lastSeenState.width ||
generatingCanvasSize >= canvasBufferLimit

if (stateChanged) {
lastSeenState = LastSeenState(
Expand Down Expand Up @@ -126,6 +140,9 @@ class RRwebGraphQLReplayLogExporter(
if (events.isNotEmpty()) {
try {
replayApiService.pushPayload(sessionId, "${nextPayloadId()}", events)

// flushes generating canvas size into pushedCanvasSize
pushedCanvasSize = generatingCanvasSize
} catch (e: Exception) {
// TODO: O11Y-627 - pass in logger to implementation and use here
// Log.e(REPLAY_EXPORTER_NAME, "Error pushing payload for session $sessionId: ${e.message}", e)
Expand All @@ -142,6 +159,7 @@ class RRwebGraphQLReplayLogExporter(
// Log.e("RRwebGraphQLReplayLogExporter", "Error during export: ${e.message}", e)
resultCode.fail()
}
}
}

return resultCode
Expand Down Expand Up @@ -259,6 +277,7 @@ class RRwebGraphQLReplayLogExporter(
Json.parseToJsonElement("""{"source":9,"id":6,"type":0,"commands":[{"property":"clearRect","args":[0,0,${captureEvent.origWidth},${captureEvent.origHeight}]},{"property":"drawImage","args":[{"rr_type":"ImageBitmap","args":[{"rr_type":"Blob","data":[{"rr_type":"ArrayBuffer","base64":"${captureEvent.imageBase64}"}],"type":"image/jpeg"}]},0,0,${captureEvent.origWidth},${captureEvent.origHeight}]}]}""")
)
)
generatingCanvasSize += captureEvent.imageBase64.length + canvasDrawEntourage
eventsBatch.add(incrementalEvent)

return eventsBatch
Expand Down Expand Up @@ -342,6 +361,9 @@ class RRwebGraphQLReplayLogExporter(
)
),
)

// starting again canvas size
generatingCanvasSize = captureEvent.imageBase64.length + canvasDrawEntourage
eventBatch.add(snapShotEvent)

val viewportEvent = Event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class RRwebGraphQLReplayLogExporterTest {
backendUrl = "http://test.com",
serviceName = "test-service",
serviceVersion = "1.0.0",
injectedReplayApiService = mockService
injectedReplayApiService = mockService,
canvasBufferLimit = 20,
canvasDrawEntourage = 1
)
}

Expand Down Expand Up @@ -62,7 +64,7 @@ class RRwebGraphQLReplayLogExporterTest {
}

@Test
@Disabled // Feature if handling multiples session is not done
@Disabled // Feature of handling multiples session is not done
fun `export should send full capture for first session and incremental for subsequent captures in same session`() = runTest {
// Arrange: Create captures for two different sessions
val sessionACaptureEvents = listOf(
Expand Down Expand Up @@ -176,8 +178,48 @@ class RRwebGraphQLReplayLogExporterTest {

// Verify event types: First and third captures should be full, second and fourth should be incremental
val capturedEvents: List<Event> = capturedEventsLists[0]
verifyFullCaptureEvents(capturedEvents)
verifyIncrementalCaptureEvents(capturedEvents)
verifyFullCaptureEvents(capturedEvents) // First capture - full
verifyIncrementalCaptureEvents(capturedEvents) // Second capture - incremental
}

@Test
fun `test canvas buffer limit`() = runTest {
// Arrange: Create captures for same session but with dimension changes
val captureEvents = listOf(
// small canvas
CaptureEvent("base64data1", 800, 600, 1000L, "session-a"), // First capture - full
// large canvases to cause overlimit
CaptureEvent("base64data2222222222222", 800, 600, 2000L, "session-a"), // Same dimensions - incremental
CaptureEvent("base64data3333333333333", 1024, 768, 3000L, "session-a"), // Dimension change - full
CaptureEvent(
"base64data444444444444",
1024,
768,
4000L,
"session-a"
) // Same dimensions - incremental
)

val logRecords = createLogRecordsFromCaptures(captureEvents)

// Capture the events sent to pushPayload
val capturedEventsLists = mutableListOf<List<Event>>()

// Mock the API service methods
coEvery { mockService.initializeReplaySession(any(), any()) } just Runs
coEvery { mockService.identifyReplaySession(any()) } just Runs
coEvery { mockService.pushPayload(any(), any(), capture(capturedEventsLists)) } just Runs

// Act: Export all log records
val result = exporter.export(logRecords.toMutableList())

// Assert: Verify the result completes successfully
assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess)

// Verify event types: First and third captures should be full, second and fourth should be incremental
val capturedEvents: List<Event> = capturedEventsLists[0]
verifyFullCaptureEvents(capturedEvents, count = 3) // First capture - full
verifyIncrementalCaptureEvents(capturedEvents, 1) // Second capture - incremental
}

@Test
Expand Down Expand Up @@ -429,15 +471,15 @@ class RRwebGraphQLReplayLogExporterTest {
/**
* Verifies that the events represent a full capture (META, FULL_SNAPSHOT, CUSTOM)
*/
private fun verifyFullCaptureEvents(events: List<Event>) {
private fun verifyFullCaptureEvents(events: List<Event>, count: Int = 2) {
// Verify META event
val metaEvent = events.find { it.type == EventType.META }
assertNotNull(metaEvent, "Full capture should contain a META event")

// Verify FULL_SNAPSHOT event
val fullSnapshotEvent = events.find { it.type == EventType.FULL_SNAPSHOT }
assertNotNull(fullSnapshotEvent, "Full capture should contain a FULL_SNAPSHOT event")
val fullSnapshotEvents = events.filter { it.type == EventType.FULL_SNAPSHOT }
assertEquals(count, fullSnapshotEvents.size, "Full capture should contain $count FULL_SNAPSHOT events")

// Verify CUSTOM event (viewport)
val customEvent = events.find { it.type == EventType.CUSTOM }
assertNotNull(customEvent, "Full capture should contain a CUSTOM event")
Expand All @@ -446,9 +488,9 @@ class RRwebGraphQLReplayLogExporterTest {
/**
* Verifies that the events represent an incremental capture (2 INCREMENTAL_SNAPSHOT events)
*/
private fun verifyIncrementalCaptureEvents(events: List<Event>) {
private fun verifyIncrementalCaptureEvents(events: List<Event>, count: Int = 2) {
// Verify both events are INCREMENTAL_SNAPSHOT
val incrementalEvents = events.filter { it.type == EventType.INCREMENTAL_SNAPSHOT }
assertEquals(2, incrementalEvents.size, "Incremental capture should contain 2 INCREMENTAL_SNAPSHOT events")
assertEquals(count, incrementalEvents.size, "Incremental capture should contain $count INCREMENTAL_SNAPSHOT events")
}
}
Loading