Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,8 @@ internal class ExporterImpl(
"Exporter: exporting batch ${batch.batchId} with ${events.size} events and ${spans.size} spans",
)

// Remove events that have an invalid file path
val validEvents = events.filter {
if (it.serializedDataFilePath != null) {
val isValid = fileStorage.validateFile(it.serializedDataFilePath)
if (!isValid) {
logger.log(
LogLevel.Error,
"Exporter: failed to read event data, discarding event ${it.eventId}",
)
}
isValid
} else {
true
}
}

val response = networkClient.execute(batch.batchId, validEvents, spans)
handleEventsExportResponse(response, batch.batchId, validEvents, spans)
val response = networkClient.execute(batch.batchId, events, spans)
handleEventsExportResponse(response, batch.batchId, events, spans)

return response is HttpResponse.Success
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,19 @@ internal class NetworkClientImpl(
) {
sink.writeUtf8("{\"events\":[")

eventPackets.forEachIndexed { index, event ->
if (index > 0) sink.writeUtf8(",")
var first = true
eventPackets.forEach { event ->
if (event.serializedDataFilePath != null &&
fileStorage.getFile(event.serializedDataFilePath) == null
) {
logger.log(
LogLevel.Error,
"Exporter: event data file missing, skipping event ${event.eventId}",
)
return@forEach
}
if (!first) sink.writeUtf8(",")
first = false
writeEventPacket(sink, event)
}

Expand Down Expand Up @@ -194,10 +205,8 @@ internal class NetworkClientImpl(
}

val source = file.inputStream().source()
try {
source.use { source ->
sink.writeAll(source)
} finally {
source.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,11 @@ internal class DataCleanupServiceImpl(
}
database.getOldestSession()?.let {
if (it != currentSessionId) {
val eventIds = database.getEventsForSession(it)
fileStorage.deleteEventsIfExist(eventIds)
val attachmentIds = database.getAttachmentsForEvents(eventIds)
// Only delete files for events not in a batch. Batched events
// are being exported and their files will be cleaned up after export.
val unbatchedEventIds = database.getUnbatchedEventsForSession(it)
fileStorage.deleteEventsIfExist(unbatchedEventIds)
val attachmentIds = database.getAttachmentsForEvents(unbatchedEventIds)
fileStorage.deleteAttachmentsIfExist(attachmentIds)

// deleting sessions from db will also delete events, spans and attachments for the session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ internal interface Database : Closeable {
*/
fun getEventsForSession(session: String): List<String>

/**
* Returns event IDs belonging to a session that are not part of any batch.
*
* @param session The session ID.
*/
fun getUnbatchedEventsForSession(session: String): List<String>

/**
* Returns the total count of events across all sessions.
*/
Expand Down Expand Up @@ -568,6 +575,18 @@ internal class DatabaseImpl(
return eventIds
}

override fun getUnbatchedEventsForSession(session: String): List<String> {
val eventIds = mutableListOf<String>()
readableDatabase.rawQuery(Sql.getUnbatchedEventsForSession(session), null).use {
while (it.moveToNext()) {
val eventIdIndex = it.getColumnIndex(EventTable.COL_ID)
val eventId = it.getString(eventIdIndex)
eventIds.add(eventId)
}
}
return eventIds
}

override fun getEventsCount(): Int {
val count: Int
readableDatabase.rawQuery(Sql.getEventsCount(), null).use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@ internal object Sql {
WHERE ${EventTable.COL_SESSION_ID} = '$session'
""".trimIndent()

fun getUnbatchedEventsForSession(session: String): String = """
SELECT e.${EventTable.COL_ID}
FROM ${EventTable.TABLE_NAME} e
LEFT JOIN ${EventsBatchTable.TABLE_NAME} eb
ON e.${EventTable.COL_ID} = eb.${EventsBatchTable.COL_EVENT_ID}
WHERE e.${EventTable.COL_SESSION_ID} = '$session'
AND eb.${EventsBatchTable.COL_EVENT_ID} IS NULL
""".trimIndent()

fun getAttachmentsForEvents(events: List<String>): String = """
SELECT ${AttachmentV1Table.COL_ID}
FROM ${AttachmentV1Table.TABLE_NAME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ internal interface FileStorage {
* Returns the path to the file where the [sh.measure.android.config.DynamicConfig] is stored.
*/
fun getConfigPath(): String

/**
* Validates that the file exists.
*/
fun validateFile(path: String): Boolean
}

private const val MEASURE_DIR = "measure"
Expand Down Expand Up @@ -199,11 +194,6 @@ internal class FileStorageImpl(

override fun getConfigPath(): String = "$rootDir/$MEASURE_DIR/$CONFIG_FILE_NAME"

override fun validateFile(path: String): Boolean {
val file = getFile(path)
return file != null && file.exists() && file.length() > 0
}

override fun getFile(path: String): File? {
val file = File(path)
return when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,29 +402,6 @@ internal class ExporterTest {
)
}

@Test
fun `filters out events with invalid serialized data file path`() {
whenever(networkClient.execute(any(), any(), any())).thenReturn(HttpResponse.Success())

insertSessionInDb("session1")
insertEventInDb("session1", "event1", sampled = true)
insertEventInDb(
sessionId = "session1",
eventId = "event2",
sampled = true,
serializedDataFilePath = "/nonexistent/path/data.json",
)
insertBatchInDb("batch1", eventIds = setOf("event1", "event2"))

exporter.export()

verify(networkClient).execute(
eq("batch1"),
argThat { size == 1 && first().eventId == "event1" },
any(),
)
}

@Test
fun `uploads attachments after events export`() {
val responseJson = attachmentResponseJson("attachment-1")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package sh.measure.android.exporter

import okio.Buffer
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito.mock
import org.mockito.Mockito.`when`
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.eq
import org.mockito.kotlin.isNull
import org.mockito.kotlin.verify
import sh.measure.android.events.EventType
import sh.measure.android.fakes.FakeConfigProvider
import sh.measure.android.fakes.NoopLogger
import sh.measure.android.logger.LogLevel
Expand Down Expand Up @@ -165,4 +169,101 @@ class NetworkClientTest {
any(),
)
}

@Test
fun `execute skips events with missing data file and logs error`() {
val mockLogger = mock<Logger>()
val client = NetworkClientImpl(
logger = mockLogger,
fileStorage = fileStorage,
httpClient = httpClient,
configProvider = configProvider,
).apply {
init(apiKey = "secret", baseUrl = "http://localhost:8080")
}

val event = EventPacket(
eventId = "event-1",
sessionId = "session-1",
timestamp = "2024-01-01T00:00:00.000Z",
type = EventType.EXCEPTION,
userTriggered = false,
serializedData = null,
serializedDataFilePath = "missing-file.json",
serializedAttachments = null,
serializedAttributes = "{}",
serializedUserDefinedAttributes = null,
)

`when`(fileStorage.getFile("missing-file.json")).thenReturn(null)

val jsonWriterCaptor = argumentCaptor<(okio.BufferedSink) -> Unit>()
`when`(httpClient.sendJsonRequest(anyString(), anyString(), any(), any())).thenReturn(
HttpResponse.Success(),
)

client.execute("batch123", listOf(event), emptyList())

verify(httpClient).sendJsonRequest(anyString(), anyString(), any(), jsonWriterCaptor.capture())
val buffer = Buffer()
jsonWriterCaptor.firstValue.invoke(buffer)
val json = buffer.readUtf8()

assertEquals("{\"events\":[],\"spans\":[]}", json)
verify(mockLogger).log(
eq(LogLevel.Error),
eq("Exporter: event data file missing, skipping event event-1"),
isNull(),
)
}

@Test
fun `execute includes events with valid inline data alongside skipped events`() {
val event1 = EventPacket(
eventId = "event-1",
sessionId = "session-1",
timestamp = "2024-01-01T00:00:00.000Z",
type = EventType.STRING,
userTriggered = false,
serializedData = "{\"key\":\"value\"}",
serializedDataFilePath = null,
serializedAttachments = null,
serializedAttributes = "{}",
serializedUserDefinedAttributes = null,
)

val event2 = EventPacket(
eventId = "event-2",
sessionId = "session-1",
timestamp = "2024-01-01T00:00:01.000Z",
type = EventType.EXCEPTION,
userTriggered = false,
serializedData = null,
serializedDataFilePath = "missing-file.json",
serializedAttachments = null,
serializedAttributes = "{}",
serializedUserDefinedAttributes = null,
)

`when`(fileStorage.getFile("missing-file.json")).thenReturn(null)

val jsonWriterCaptor = argumentCaptor<(okio.BufferedSink) -> Unit>()
`when`(httpClient.sendJsonRequest(anyString(), anyString(), any(), any())).thenReturn(
HttpResponse.Success(),
)

networkClient.execute("batch123", listOf(event1, event2), emptyList())

verify(httpClient).sendJsonRequest(anyString(), anyString(), any(), jsonWriterCaptor.capture())
val buffer = Buffer()
jsonWriterCaptor.firstValue.invoke(buffer)
val json = buffer.readUtf8()

// event-1 should be present, event-2 should be skipped
assertTrue(json.contains("\"id\":\"event-1\""))
assertTrue(!json.contains("\"id\":\"event-2\""))
// Verify valid JSON structure - no trailing/leading commas
assertTrue(json.startsWith("{\"events\":[{"))
assertTrue(json.contains("}],\"spans\":[]}"))
}
}
Loading
Loading