Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import kotlinx.serialization.json.jsonObject

private const val REPLAY_EXPORTER_NAME = "RRwebGraphQLReplayLogExporter"

// size limit of accumulated continues canvas operations on the RRWeb player
private const val RRWEB_CANVAS_BUFFER_LIMIT = 10_000_000 // ~10mb

/**
* An [LogRecordExporter] that can send session replay capture logs to the backend using RRWeb syntax
* and GraphQL pushes for transport.
Expand All @@ -33,7 +36,8 @@ class RRwebGraphQLReplayLogExporter(
val backendUrl: String,
val serviceName: String,
val serviceVersion: String,
private val injectedReplayApiService: SessionReplayApiService? = null
private val injectedReplayApiService: SessionReplayApiService? = null,
private val canvasBufferLimit: Int = RRWEB_CANVAS_BUFFER_LIMIT
) : LogRecordExporter {
private val coroutineScope = CoroutineScope(DispatcherProviderHolder.current.io + SupervisorJob())

Expand All @@ -56,12 +60,16 @@ class RRwebGraphQLReplayLogExporter(
)

private var lastSeenState = LastSeenState(sessionId = null, height = 0, width = 0)
private var generatingCanvasSize = 0
private var pushedCanvasSize = 0

override fun export(logs: MutableCollection<LogRecordData>): CompletableResultCode {
val resultCode = CompletableResultCode()

coroutineScope.launch {
try {
generatingCanvasSize = pushedCanvasSize

// Map to collect events by session ID
val eventsBySession = mutableMapOf<String, MutableList<Event>>()
// Set to track sessions that need initialization
Expand All @@ -80,7 +88,8 @@ class RRwebGraphQLReplayLogExporter(

val stateChanged = capture.session != lastSeenState.sessionId ||
capture.origHeight != lastSeenState.height ||
capture.origWidth != lastSeenState.width
capture.origWidth != lastSeenState.width ||
generatingCanvasSize >= canvasBufferLimit

if (stateChanged) {
lastSeenState = LastSeenState(
Expand Down Expand Up @@ -126,6 +135,9 @@ class RRwebGraphQLReplayLogExporter(
if (events.isNotEmpty()) {
try {
replayApiService.pushPayload(sessionId, "${nextPayloadId()}", events)

// flushes generating canvas size into pushedCanvasSize
pushedCanvasSize = generatingCanvasSize
} catch (e: Exception) {
// TODO: O11Y-627 - pass in logger to implementation and use here
// Log.e(REPLAY_EXPORTER_NAME, "Error pushing payload for session $sessionId: ${e.message}", e)
Expand Down Expand Up @@ -259,6 +271,7 @@ class RRwebGraphQLReplayLogExporter(
Json.parseToJsonElement("""{"source":9,"id":6,"type":0,"commands":[{"property":"clearRect","args":[0,0,${captureEvent.origWidth},${captureEvent.origHeight}]},{"property":"drawImage","args":[{"rr_type":"ImageBitmap","args":[{"rr_type":"Blob","data":[{"rr_type":"ArrayBuffer","base64":"${captureEvent.imageBase64}"}],"type":"image/jpeg"}]},0,0,${captureEvent.origWidth},${captureEvent.origHeight}]}]}""")
)
)
generatingCanvasSize += captureEvent.imageBase64.length
eventsBatch.add(incrementalEvent)

return eventsBatch
Expand Down Expand Up @@ -342,6 +355,9 @@ class RRwebGraphQLReplayLogExporter(
)
),
)

// starting again canvas size
generatingCanvasSize = captureEvent.imageBase64.length
eventBatch.add(snapShotEvent)

val viewportEvent = Event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import java.util.concurrent.TimeUnit
import org.junit.jupiter.api.Disabled

class RRwebGraphQLReplayLogExporterTest {

Expand All @@ -24,7 +25,8 @@ class RRwebGraphQLReplayLogExporterTest {
backendUrl = "http://test.com",
serviceName = "test-service",
serviceVersion = "1.0.0",
injectedReplayApiService = mockService
injectedReplayApiService = mockService,
canvasBufferLimit = 20
)
}

Expand Down Expand Up @@ -61,65 +63,66 @@ class RRwebGraphQLReplayLogExporterTest {
}

@Test
@Disabled // Feature if handling multiples session is not done
fun `export should send full capture for first session and incremental for subsequent captures in same session`() = runTest {
// Arrange: Create captures for two different sessions
val sessionACaptureEvents = listOf(
CaptureEvent("base64data1", 800, 600, 1000L, "session-a"),
CaptureEvent("base64data2", 800, 600, 2000L, "session-a"),
CaptureEvent("base64data3", 800, 600, 3000L, "session-a")
)

val sessionBCaptureEvents = listOf(
CaptureEvent("base64data4", 1024, 768, 4000L, "session-b"),
CaptureEvent("base64data5", 1024, 768, 5000L, "session-b")
)

val allCaptures = sessionACaptureEvents + sessionBCaptureEvents
val logRecords = createLogRecordsFromCaptures(allCaptures)

// Capture the events sent to pushPayload
val capturedEvents = mutableListOf<List<Event>>()

// Mock the API service methods
coEvery { mockService.initializeReplaySession(any(), any()) } just Runs
coEvery { mockService.identifyReplaySession(any()) } just Runs
coEvery { mockService.pushPayload(any(), any(), capture(capturedEvents)) } just Runs

// Act: Export all log records
val result = exporter.export(logRecords.toMutableList())

// Assert: Verify the result completes successfully
assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess)

// Verify full capture calls for session A (first capture only)
coVerify(exactly = 1) {
mockService.initializeReplaySession("test-org", "session-a")
coVerify(exactly = 1) {
mockService.initializeReplaySession("test-org", "session-a")
}
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-a")
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-a")
}

// Verify full capture calls for session B (first capture only)
coVerify(exactly = 1) {
mockService.initializeReplaySession("test-org", "session-b")
coVerify(exactly = 1) {
mockService.initializeReplaySession("test-org", "session-b")
}
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-b")
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-b")
}

// Verify pushPayload is called for all captures (3 for session A + 2 for session B = 5 total)
coVerify(exactly = 5) {
mockService.pushPayload(any(), any(), any())
coVerify(exactly = 5) {
mockService.pushPayload(any(), any(), any())
}

// Verify event types: First capture should be full (3 events), subsequent should be incremental (2 events each)
assertEquals(5, capturedEvents.size)

// Session A: First capture (full) + 2 incremental captures
verifyFullCaptureEvents(capturedEvents[0]) // First capture should be full
verifyIncrementalCaptureEvents(capturedEvents[1]) // Second capture should be incremental
verifyIncrementalCaptureEvents(capturedEvents[2]) // Third capture should be incremental

// Session B: First capture (full) + 1 incremental capture
verifyFullCaptureEvents(capturedEvents[3]) // First capture should be full
verifyIncrementalCaptureEvents(capturedEvents[4]) // Second capture should be incremental
Expand All @@ -144,12 +147,12 @@ class RRwebGraphQLReplayLogExporterTest {
val logRecords = createLogRecordsFromCaptures(captureEvents)

// Capture the events sent to pushPayload
val capturedEvents = mutableListOf<List<Event>>()
val capturedEventsLists = mutableListOf<List<Event>>()

// Mock the API service methods
coEvery { mockService.initializeReplaySession(any(), any()) } just Runs
coEvery { mockService.identifyReplaySession(any()) } just Runs
coEvery { mockService.pushPayload(any(), any(), capture(capturedEvents)) } just Runs
coEvery { mockService.pushPayload(any(), any(), capture(capturedEventsLists)) } just Runs

// Act: Export all log records
val result = exporter.export(logRecords.toMutableList())
Expand All @@ -158,26 +161,64 @@ class RRwebGraphQLReplayLogExporterTest {
assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess)

// Verify initializeReplaySession is called twice (first capture + dimension change)
coVerify(exactly = 2) {
coVerify(exactly = 1) {
mockService.initializeReplaySession("test-org", "session-a")
}

// Verify identifyReplaySession is called twice (first capture + dimension change)
coVerify(exactly = 2) {
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-a")
}

// Verify pushPayload is called for all captures
coVerify(exactly = 4) {
coVerify(exactly = 1) {
mockService.pushPayload("session-a", any(), any())
}

// Verify event types: First and third captures should be full, second and fourth should be incremental
assertEquals(4, capturedEvents.size)
verifyFullCaptureEvents(capturedEvents[0]) // First capture - full
verifyIncrementalCaptureEvents(capturedEvents[1]) // Second capture - incremental
verifyFullCaptureEvents(capturedEvents[2]) // Third capture - full (dimension change)
verifyIncrementalCaptureEvents(capturedEvents[3]) // Fourth capture - incremental
val capturedEvents: List<Event> = capturedEventsLists[0]
verifyFullCaptureEvents(capturedEvents) // First capture - full
verifyIncrementalCaptureEvents(capturedEvents) // Second capture - incremental
}

@Test
fun `test canvas buffer limit`() = runTest {
// Arrange: Create captures for same session but with dimension changes
val captureEvents = listOf(
// small canvas
CaptureEvent("base64data1", 800, 600, 1000L, "session-a"), // First capture - full
// large canvases to cause overlimit
CaptureEvent("base64data2222222222222", 800, 600, 2000L, "session-a"), // Same dimensions - incremental
CaptureEvent("base64data3333333333333", 1024, 768, 3000L, "session-a"), // Dimension change - full
CaptureEvent(
"base64data444444444444",
1024,
768,
4000L,
"session-a"
) // Same dimensions - incremental
)

val logRecords = createLogRecordsFromCaptures(captureEvents)

// Capture the events sent to pushPayload
val capturedEventsLists = mutableListOf<List<Event>>()

// Mock the API service methods
coEvery { mockService.initializeReplaySession(any(), any()) } just Runs
coEvery { mockService.identifyReplaySession(any()) } just Runs
coEvery { mockService.pushPayload(any(), any(), capture(capturedEventsLists)) } just Runs

// Act: Export all log records
val result = exporter.export(logRecords.toMutableList())

// Assert: Verify the result completes successfully
assertTrue(result.join(5, TimeUnit.SECONDS).isSuccess)

// Verify event types: First and third captures should be full, second and fourth should be incremental
val capturedEvents: List<Event> = capturedEventsLists[0]
verifyFullCaptureEvents(capturedEvents, count = 3) // First capture - full
verifyIncrementalCaptureEvents(capturedEvents, 1) // Second capture - incremental
}

@Test
Expand Down Expand Up @@ -226,7 +267,7 @@ class RRwebGraphQLReplayLogExporterTest {
coVerify(exactly = 1) {
mockService.identifyReplaySession("session-a")
}
coVerify(exactly = 2) {
coVerify(exactly = 1) {
mockService.pushPayload("session-a", any(), any())
}
}
Expand Down Expand Up @@ -293,38 +334,39 @@ class RRwebGraphQLReplayLogExporterTest {
// Verify API calls: First capture should be full, second should be incremental
coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") }
coVerify(exactly = 1) { mockService.identifyReplaySession("session-a") }
coVerify(exactly = 2) { mockService.pushPayload("session-a", any(), any()) }
coVerify(exactly = 1) { mockService.pushPayload("session-a", any(), any()) }
}

@Test
@Disabled // Handling exceptions in initializeReplaySession and identifyReplaySession not done
fun `export should stop processing on first failure and not process remaining captures`() = runTest {
// Arrange: Create captures for two different sessions
val captureEvents = listOf(
CaptureEvent("base64data1", 800, 600, 1000L, "session-a"),
CaptureEvent("base64data2", 1024, 768, 2000L, "session-b")
)
val logRecords = createLogRecordsFromCaptures(captureEvents)

// Mock API service: first session succeeds, second session fails
coEvery { mockService.initializeReplaySession("test-org", "session-a") } just Runs
coEvery { mockService.identifyReplaySession("session-a") } just Runs
coEvery { mockService.pushPayload("session-a", any(), any()) } just Runs

coEvery { mockService.initializeReplaySession("test-org", "session-b") } throws RuntimeException("Network timeout")
coEvery { mockService.identifyReplaySession("session-b") } throws RuntimeException("Network timeout")
coEvery { mockService.pushPayload("session-b", any(), any()) } throws RuntimeException("Network timeout")

// Act: Export log records
val result = exporter.export(logRecords.toMutableList())

// Assert: Verify the result fails due to first failure
assertFalse(result.join(5, TimeUnit.SECONDS).isSuccess)

// Verify only first session was processed (second session should not be processed due to early termination)
coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-a") }
coVerify(exactly = 1) { mockService.identifyReplaySession("session-a") }
coVerify(exactly = 1) { mockService.pushPayload("session-a", any(), any()) }

// Verify second session was never processed
coVerify(exactly = 1) { mockService.initializeReplaySession("test-org", "session-b") }
coVerify(exactly = 0) { mockService.identifyReplaySession("session-b") }
Expand Down Expand Up @@ -428,17 +470,15 @@ class RRwebGraphQLReplayLogExporterTest {
/**
* Verifies that the events represent a full capture (META, FULL_SNAPSHOT, CUSTOM)
*/
private fun verifyFullCaptureEvents(events: List<Event>) {
assertEquals(3, events.size, "Full capture should have exactly 3 events")

private fun verifyFullCaptureEvents(events: List<Event>, count: Int = 2) {
// Verify META event
val metaEvent = events.find { it.type == EventType.META }
assertNotNull(metaEvent, "Full capture should contain a META event")

// Verify FULL_SNAPSHOT event
val fullSnapshotEvent = events.find { it.type == EventType.FULL_SNAPSHOT }
assertNotNull(fullSnapshotEvent, "Full capture should contain a FULL_SNAPSHOT event")
val fullSnapshotEvents = events.filter { it.type == EventType.FULL_SNAPSHOT }
assertEquals(count, fullSnapshotEvents.size, "Full capture should contain $(count) FULL_SNAPSHOT events")

// Verify CUSTOM event (viewport)
val customEvent = events.find { it.type == EventType.CUSTOM }
assertNotNull(customEvent, "Full capture should contain a CUSTOM event")
Expand All @@ -447,11 +487,9 @@ class RRwebGraphQLReplayLogExporterTest {
/**
* Verifies that the events represent an incremental capture (2 INCREMENTAL_SNAPSHOT events)
*/
private fun verifyIncrementalCaptureEvents(events: List<Event>) {
assertEquals(2, events.size, "Incremental capture should have exactly 2 events")

private fun verifyIncrementalCaptureEvents(events: List<Event>, count: Int = 2) {
// Verify both events are INCREMENTAL_SNAPSHOT
val incrementalEvents = events.filter { it.type == EventType.INCREMENTAL_SNAPSHOT }
assertEquals(2, incrementalEvents.size, "Incremental capture should contain 2 INCREMENTAL_SNAPSHOT events")
assertEquals(count, incrementalEvents.size, "Incremental capture should contain $(count) INCREMENTAL_SNAPSHOT events")
}
}
Loading