Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions analytics/metricsplatform/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/build
77 changes: 77 additions & 0 deletions analytics/metricsplatform/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
plugins {
id 'com.android.library'
id 'com.google.devtools.ksp'
id 'kotlin-android'
id 'kotlinx-serialization'
}

final JavaVersion JAVA_VERSION = JavaVersion.VERSION_17

android {
namespace 'org.wikimedia.metricsplatform'

compileOptions {
coreLibraryDesugaringEnabled true

sourceCompatibility = JAVA_VERSION
targetCompatibility = JAVA_VERSION
}

kotlinOptions {
jvmTarget = JAVA_VERSION
}
compileSdk 35

defaultConfig {
targetSdk 35

buildConfigField "String", "EVENTGATE_ANALYTICS_EXTERNAL_BASE_URI", '"https://intake-analytics.wikimedia.org"'
buildConfigField "String", "EVENTGATE_LOGGING_EXTERNAL_BASE_URI", '"https://intake-logging.wikimedia.org"'
}

buildFeatures {
buildConfig true
}
}

dependencies {
coreLibraryDesugaring libs.desugar.jdk.libs

implementation libs.kotlin.stdlib.jdk8
implementation libs.kotlinx.coroutines.core
implementation libs.kotlinx.coroutines.android
implementation libs.kotlinx.serialization.json

implementation libs.material
implementation libs.appcompat
implementation libs.core.ktx
implementation libs.browser
implementation libs.constraintlayout
implementation libs.fragment.ktx
implementation libs.paging.runtime.ktx
implementation libs.palette.ktx
implementation libs.preference.ktx
implementation libs.recyclerview
implementation libs.viewpager2
implementation libs.flexbox
implementation libs.drawerlayout
implementation libs.swiperefreshlayout
implementation libs.work.runtime.ktx

implementation libs.okhttp.tls
implementation libs.okhttp3.logging.interceptor
implementation libs.retrofit
implementation libs.commons.lang3
implementation libs.jsoup
implementation libs.photoview
implementation libs.balloon
implementation libs.retrofit2.kotlinx.serialization.converter

implementation libs.android.sdk
implementation libs.android.plugin.annotation.v9

implementation libs.androidx.room.runtime
annotationProcessor libs.androidx.room.compiler
ksp libs.androidx.room.compiler
implementation libs.androidx.room.ktx
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.wikimedia.metricsplatform

import org.wikimedia.metricsplatform.config.StreamConfig
import org.wikimedia.metricsplatform.context.AgentData
import org.wikimedia.metricsplatform.context.ClientData
import org.wikimedia.metricsplatform.context.ContextValue
import org.wikimedia.metricsplatform.context.MediawikiData
import org.wikimedia.metricsplatform.context.PageData
import org.wikimedia.metricsplatform.context.PerformerData
import org.wikimedia.metricsplatform.event.EventProcessed

class ContextController {
fun enrichEvent(event: EventProcessed, streamConfig: StreamConfig) {
if (!streamConfig.hasRequestedContextValuesConfig()) {
return
}
// Check stream config for which contextual values should be added to the event.
val requestedValuesFromConfig = streamConfig.producerConfig?.metricsPlatformClientConfig?.requestedValues.orEmpty()
// Add required properties.
val requestedValues= mutableSetOf<String>()
requestedValues.addAll(requestedValuesFromConfig)
requestedValues.addAll(REQUIRED_PROPERTIES)
val filteredData = filterClientData(event.clientData, requestedValues)
event.applyClientData(filteredData)
}

private fun filterClientData(clientData: ClientData, requestedValues: Collection<String>): ClientData {
val newAgentData = AgentData()
val newPageData = PageData()
val newMediawikiData = MediawikiData()
val newPerformerData = PerformerData()

for (requestedValue in requestedValues) {
when (requestedValue) {
ContextValue.AGENT_APP_INSTALL_ID -> newAgentData.appInstallId = clientData.agentData?.appInstallId
ContextValue.AGENT_CLIENT_PLATFORM -> newAgentData.clientPlatform = clientData.agentData?.clientPlatform
ContextValue.AGENT_CLIENT_PLATFORM_FAMILY -> newAgentData.clientPlatformFamily = clientData.agentData?.clientPlatformFamily
ContextValue.AGENT_APP_FLAVOR -> newAgentData.appFlavor = clientData.agentData?.appFlavor
ContextValue.AGENT_APP_THEME -> newAgentData.appTheme = clientData.agentData?.appTheme
ContextValue.AGENT_APP_VERSION -> newAgentData.appVersion = clientData.agentData?.appVersion
ContextValue.AGENT_APP_VERSION_NAME -> newAgentData.appVersionName = clientData.agentData?.appVersionName
ContextValue.AGENT_DEVICE_FAMILY -> newAgentData.deviceFamily = clientData.agentData?.deviceFamily
ContextValue.AGENT_DEVICE_LANGUAGE -> newAgentData.deviceLanguage = clientData.agentData?.deviceLanguage
ContextValue.AGENT_RELEASE_STATUS -> newAgentData.releaseStatus = clientData.agentData?.releaseStatus
ContextValue.PAGE_ID -> newPageData.id = clientData.pageData?.id
ContextValue.PAGE_TITLE -> newPageData.title = clientData.pageData?.title
ContextValue.PAGE_NAMESPACE_ID -> newPageData.namespaceId = clientData.pageData?.namespaceId
ContextValue.PAGE_NAMESPACE_NAME -> newPageData.namespaceName = clientData.pageData?.namespaceName
ContextValue.PAGE_REVISION_ID -> newPageData.revisionId = clientData.pageData?.revisionId
ContextValue.PAGE_WIKIDATA_QID -> newPageData.wikidataItemQid = clientData.pageData?.wikidataItemQid
ContextValue.PAGE_CONTENT_LANGUAGE -> newPageData.contentLanguage = clientData.pageData?.contentLanguage
ContextValue.MEDIAWIKI_DATABASE -> newMediawikiData.database = clientData.mediawikiData?.database
ContextValue.PERFORMER_ID -> newPerformerData.id = clientData.performerData?.id
ContextValue.PERFORMER_NAME -> newPerformerData.name = clientData.performerData?.name
ContextValue.PERFORMER_IS_LOGGED_IN -> newPerformerData.isLoggedIn = clientData.performerData?.isLoggedIn
ContextValue.PERFORMER_IS_TEMP -> newPerformerData.isTemp = clientData.performerData?.isTemp
ContextValue.PERFORMER_SESSION_ID -> newPerformerData.sessionId = clientData.performerData?.sessionId
ContextValue.PERFORMER_PAGEVIEW_ID -> newPerformerData.pageviewId = clientData.performerData?.pageviewId
ContextValue.PERFORMER_GROUPS -> newPerformerData.groups = clientData.performerData?.groups
ContextValue.PERFORMER_LANGUAGE_GROUPS -> {
var languageGroups = clientData.performerData?.languageGroups
if (languageGroups != null && languageGroups.length > 255) {
languageGroups = languageGroups.substring(0, 255)
}
newPerformerData.languageGroups = languageGroups
}

ContextValue.PERFORMER_LANGUAGE_PRIMARY -> newPerformerData.languagePrimary = clientData.performerData?.languagePrimary
ContextValue.PERFORMER_REGISTRATION_DT -> newPerformerData.registrationDt = clientData.performerData?.registrationDt
else -> throw IllegalArgumentException("Unknown property: $requestedValue")
}
}

return ClientData(newAgentData, newPageData, newMediawikiData, newPerformerData)
}

companion object {
/**
* @see [Metrics Platform/Contextual attributes](https://wikitech.wikimedia.org/wiki/Metrics_Platform/Contextual_attributes)
*/
private val REQUIRED_PROPERTIES = listOf(
"agent_app_flavor",
"agent_app_install_id",
"agent_app_theme",
"agent_app_version",
"agent_app_version_name",
"agent_client_platform",
"agent_client_platform_family",
"agent_device_family",
"agent_device_language",
"agent_release_status"
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.wikimedia.metricsplatform

import org.wikimedia.metricsplatform.config.StreamConfig
import org.wikimedia.metricsplatform.event.EventProcessed

class CurationController {
fun shouldProduceEvent(event: EventProcessed, streamConfig: StreamConfig): Boolean {
if (!streamConfig.hasCurationFilter()) return true
return streamConfig.curationFilter.test(event)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.wikimedia.metricsplatform

import org.wikimedia.metricsplatform.config.DestinationEventService
import org.wikimedia.metricsplatform.config.SourceConfig
import org.wikimedia.metricsplatform.config.StreamConfig
import org.wikimedia.metricsplatform.event.EventProcessed
import java.net.SocketTimeoutException
import java.net.UnknownHostException
import java.util.concurrent.BlockingQueue
import java.util.concurrent.atomic.AtomicReference

class EventProcessor(
private val contextController: ContextController,
private val curationController: CurationController,
private val sourceConfig: AtomicReference<SourceConfig>,
private val samplingController: SamplingController,
private val eventSender: EventSender,
private val eventQueue: BlockingQueue<EventProcessed>
) {

/**
* Send all events currently in the output buffer.
*
*
* A shallow clone of the output buffer is created and passed to the integration layer for
* submission by the client. If the event submission succeeds, the events are removed from the
* output buffer. (Note that the shallow copy created by clone() retains pointers to the original
* Event objects.) If the event submission fails, a client error is produced, and the events remain
* in buffer to be retried on the next submission attempt.
*/
fun sendEnqueuedEvents() {
val config: SourceConfig? = sourceConfig.get()
if (config == null) {
//log.log(Level.FINE, "Configuration is missing, enqueued events are not sent.")
return
}

val pending = mutableListOf<EventProcessed>()
synchronized(eventQueue) {
eventQueue.drainTo(pending)
}

val streamConfigsMap = config.streamConfigs

pending.filter { event -> streamConfigsMap.containsKey(event.stream) }
.filter { event ->
val cfg = streamConfigsMap[event.stream]
if (cfg == null) {
false
} else {
cfg.sampleConfig?.let { event.sample = it }
samplingController.isInSample(cfg)
}
}
.filter { event -> eventPassesCurationRules(event, streamConfigsMap) }
.groupBy { event -> destinationEventService(event, streamConfigsMap) }
.forEach { (destinationEventService, pendingValidEvents) ->
sendEventsToDestination(destinationEventService, pendingValidEvents)
}
}

fun eventPassesCurationRules(
event: EventProcessed,
streamConfigMap: Map<String, StreamConfig>
): Boolean {
val streamConfig = streamConfigMap[event.stream]
if (streamConfig == null) {
return false
}
contextController.enrichEvent(event, streamConfig)
return curationController.shouldProduceEvent(event, streamConfig)
}

private fun destinationEventService(
event: EventProcessed,
streamConfigMap: Map<String, StreamConfig>
): DestinationEventService {
val streamConfig = streamConfigMap[event.stream]
return streamConfig?.destinationEventService ?: DestinationEventService.ANALYTICS
}

private fun sendEventsToDestination(
destinationEventService: DestinationEventService,
pendingValidEvents: List<EventProcessed>
) {
try {
//TODO!
//eventSender.sendEvents(destinationEventService.getBaseUri(isDebug), pendingValidEvents)
} catch (e: UnknownHostException) {
//log.log(Level.WARNING, "Network error while sending " + pendingValidEvents.size + " events. Adding back to queue.", e)
eventQueue.addAll(pendingValidEvents)
} catch (e: SocketTimeoutException) {
//log.log(Level.WARNING, "Network error while sending " + pendingValidEvents.size + " events. Adding back to queue.", e)
eventQueue.addAll(pendingValidEvents)
} catch (e: Exception) {
//log.log(Level.WARNING, "Failed to send " + pendingValidEvents.size + " events.", e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.wikimedia.metricsplatform

import android.net.Uri
import org.wikimedia.metricsplatform.event.EventProcessed

fun interface EventSender {
/**
* Transmit an event to a destination intake service.
*
* @param baseUri base uri of destination intake service
* @param events events to be sent
*/
fun sendEvents(baseUri: Uri, events: List<EventProcessed>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.wikimedia.metricsplatform

import android.net.Uri
import kotlinx.serialization.json.Json
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import org.wikimedia.metricsplatform.event.EventProcessed
import java.io.IOException

class EventSenderDefault(
private val json: Json,
private val httpClient: OkHttpClient
) : EventSender {
override fun sendEvents(baseUri: Uri, events: List<EventProcessed>) {
val request = Request.Builder()
.url(baseUri.toString())
.header("Accept", "application/json")
.header(
"User-Agent",
"Metrics Platform Client/Java " + MetricsClient.METRICS_PLATFORM_LIBRARY_VERSION
)
.post(json.encodeToString(events).toRequestBody("application/json".toMediaTypeOrNull()))
.build()

httpClient.newCall(request).execute().use { response ->
val status = response.code
val body = response.body
if (!response.isSuccessful || status == 207) {
// In the case of a multi-status response (207), it likely means that one or more
// events were rejected. In such a case, the error is actually contained in
// the normal response body.
throw IOException(body?.string().orEmpty())
}
//log.log(java.util.logging.Level.INFO, "Sent " + events.size + " events successfully.")
}
}
}
Loading
Loading