Skip to content

Commit e246a67

Browse files
authored
Add WideEventClient implementation (#6838)
Task/Issue URL: https://app.asana.com/1/137249556945/project/488551667048375/task/1211444639001946 ### Description Adds initial version of the new library to instrument user journeys. ### Steps to test this PR API is not used yet, so this is not testable. ### No UI changes
1 parent 6eb06df commit e246a67

File tree

18 files changed

+2705
-0
lines changed

18 files changed

+2705
-0
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright (c) 2025 DuckDuckGo
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.duckduckgo.app.statistics.wideevents
18+
19+
import com.duckduckgo.app.statistics.wideevents.CleanupPolicy.OnTimeout
20+
import java.time.Duration
21+
22+
interface WideEventClient {
23+
/**
24+
* Begin a new flow.
25+
*
26+
* @param name Stable flow name (e.g., "subscription_purchase").
27+
* @param flowEntryPoint Optional identifier of the flow entry point (e.g., "app_settings")
28+
* @param metadata Optional metadata (e.g., "free_trial_eligible=true").
29+
* @param cleanupPolicy Strategy for dealing with abandoned events, see [CleanupPolicy] for details.
30+
* @return Wide event ID used for subsequent calls.
31+
*/
32+
suspend fun flowStart(
33+
name: String,
34+
flowEntryPoint: String? = null,
35+
metadata: Map<String, String> = emptyMap(),
36+
cleanupPolicy: CleanupPolicy = OnTimeout(duration = Duration.ofDays(7)),
37+
): Result<Long>
38+
39+
/**
40+
* Record a step in an active flow.
41+
*
42+
* @param wideEventId ID of the wide event.
43+
* @param stepName Step label (e.g., "account_created").
44+
* @param success Whether this step succeeded.
45+
* @param metadata Optional metadata.
46+
*/
47+
suspend fun flowStep(
48+
wideEventId: Long,
49+
stepName: String,
50+
success: Boolean = true,
51+
metadata: Map<String, String> = emptyMap(),
52+
): Result<Unit>
53+
54+
/**
55+
* Finish an active flow and send it with a terminal status.
56+
*
57+
* @param wideEventId ID of the wide event.
58+
* @param status Status of the entire flow.
59+
* @param metadata Optional metadata.
60+
*/
61+
suspend fun flowFinish(
62+
wideEventId: Long,
63+
status: FlowStatus,
64+
metadata: Map<String, String> = emptyMap(),
65+
): Result<Unit>
66+
67+
/**
68+
* Abort an active flow and discard all data (nothing is sent).
69+
*
70+
* @param wideEventId ID of the wide event.
71+
*/
72+
suspend fun flowAbort(wideEventId: Long): Result<Unit>
73+
74+
/**
75+
* Get ids of active flows by their name.
76+
* Ids are ordered chronologically based on wide event creation time.
77+
*
78+
* Use to pick up wideEventId after process restart or if the flow started
79+
* in a different module. Or to clean up abandoned events.
80+
*
81+
* @param name Stable flow name that was previously used with [flowStart].
82+
*/
83+
suspend fun getFlowIds(name: String): Result<List<Long>>
84+
85+
/**
86+
* Start a named interval inside the flow.
87+
*
88+
* If [timeout] elapses before [intervalEnd], [flowFinish] or [flowAbort], the flow auto-finishes with
89+
* [FlowStatus.Unknown] and is sent. Explicit finish/abort cancels any pending timeouts.
90+
*
91+
* @param wideEventId ID of the wide event.
92+
* @param key Interval key (e.g., "token_refresh_duration").
93+
* @param timeout Optional duration for auto-finish.
94+
*/
95+
suspend fun intervalStart(
96+
wideEventId: Long,
97+
key: String,
98+
timeout: Duration? = null,
99+
): Result<Unit>
100+
101+
/**
102+
* End a previously started interval.
103+
*
104+
* Cancels any timeout set by the corresponding [intervalStart].
105+
*
106+
* @param wideEventId ID of the wide event.
107+
* @param key Interval key passed to [intervalStart].
108+
* @return Duration of the interval.
109+
*/
110+
suspend fun intervalEnd(
111+
wideEventId: Long,
112+
key: String,
113+
): Result<Duration>
114+
}
115+
116+
/** Represents the final outcome status of a wide event. */
117+
sealed class FlowStatus {
118+
/** The operation completed successfully */
119+
data object Success : FlowStatus()
120+
121+
/** The operation failed */
122+
data class Failure(
123+
val reason: String,
124+
) : FlowStatus()
125+
126+
/** The operation was cancelled by the user */
127+
data object Cancelled : FlowStatus()
128+
129+
/** The final status could not be determined */
130+
data object Unknown : FlowStatus()
131+
}
132+
133+
/**
134+
* Per-flow cleanup behavior applied when a flow is left open.
135+
*
136+
* If the flow is explicitly finished or aborted before conditions defined by the policy are met,
137+
* the policy has no effect.
138+
*
139+
* Each policy carries a target [flowStatus] - the flow is auto-finished with that status and sent.
140+
*/
141+
sealed class CleanupPolicy {
142+
/**
143+
* The status to use when this policy triggers.
144+
*/
145+
abstract val flowStatus: FlowStatus
146+
147+
/**
148+
* Apply cleanup on the next (main) process start for any still-open flow.
149+
*
150+
* If [ignoreIfIntervalTimeoutPresent] is true and the flow has a pending interval timeout,
151+
* this policy is skipped so the interval timeout can handle cleanup.
152+
*
153+
* @param ignoreIfIntervalTimeoutPresent When true, do not apply this policy if an interval timeout exists.
154+
*/
155+
data class OnProcessStart(
156+
val ignoreIfIntervalTimeoutPresent: Boolean,
157+
override val flowStatus: FlowStatus = FlowStatus.Unknown,
158+
) : CleanupPolicy()
159+
160+
/**
161+
* Apply cleanup if a flow stays open beyond a specified time.
162+
*
163+
* - This API does not guarantee immediate cleanup as soon as the timeout expires — the event becomes eligible
164+
* for cleanup after the duration has passed, and the actual cleanup may occur later.
165+
*
166+
* @param duration Duration after flow start for it to become eligible for cleanup.
167+
*/
168+
data class OnTimeout(
169+
val duration: Duration,
170+
override val flowStatus: FlowStatus = FlowStatus.Unknown,
171+
) : CleanupPolicy()
172+
}

statistics/statistics-impl/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,14 @@ dependencies {
6464

6565
implementation AndroidX.core.ktx
6666

67+
implementation "com.squareup.moshi:moshi-adapters:_"
68+
6769
testImplementation Testing.junit4
6870
testImplementation "org.mockito.kotlin:mockito-kotlin:_"
6971
testImplementation "androidx.lifecycle:lifecycle-runtime-testing:_"
7072
testImplementation project(path: ':common-test')
7173
testImplementation project(':data-store-test')
74+
testImplementation project(':feature-toggles-test')
7275
testImplementation CashApp.turbine
7376
testImplementation Testing.robolectric
7477
testImplementation(KotlinX.coroutines.test) {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2025 DuckDuckGo
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.duckduckgo.app.statistics.wideevents
18+
19+
import androidx.lifecycle.LifecycleOwner
20+
import com.duckduckgo.app.di.AppCoroutineScope
21+
import com.duckduckgo.app.lifecycle.MainProcessLifecycleObserver
22+
import com.duckduckgo.app.statistics.wideevents.db.WideEventRepository
23+
import com.duckduckgo.common.utils.DispatcherProvider
24+
import com.duckduckgo.di.scopes.AppScope
25+
import com.squareup.anvil.annotations.ContributesMultibinding
26+
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.ExperimentalCoroutinesApi
28+
import kotlinx.coroutines.flow.conflate
29+
import kotlinx.coroutines.launch
30+
import kotlinx.coroutines.withContext
31+
import javax.inject.Inject
32+
33+
@OptIn(ExperimentalCoroutinesApi::class)
34+
@ContributesMultibinding(AppScope::class)
35+
class CompletedWideEventsProcessor @Inject constructor(
36+
private val wideEventRepository: WideEventRepository,
37+
private val wideEventSender: WideEventSender,
38+
@AppCoroutineScope private val appCoroutineScope: CoroutineScope,
39+
private val wideEventFeature: WideEventFeature,
40+
private val dispatcherProvider: DispatcherProvider,
41+
) : MainProcessLifecycleObserver {
42+
override fun onCreate(owner: LifecycleOwner) {
43+
appCoroutineScope.launch {
44+
runCatching {
45+
if (!isFeatureEnabled()) return@runCatching
46+
47+
wideEventRepository
48+
.getCompletedWideEventIdsFlow()
49+
.conflate()
50+
.collect { ids ->
51+
// Process events in chunks to avoid querying too many events at once.
52+
ids.chunked(100).forEach { idsChunk ->
53+
processCompletedWideEvents(idsChunk.toSet())
54+
}
55+
}
56+
}
57+
}
58+
}
59+
60+
private suspend fun processCompletedWideEvents(wideEventIds: Set<Long>) {
61+
wideEventRepository.getWideEvents(wideEventIds).forEach { event ->
62+
wideEventSender.sendWideEvent(event)
63+
wideEventRepository.deleteWideEvent(event.id)
64+
}
65+
}
66+
67+
private suspend fun isFeatureEnabled(): Boolean =
68+
withContext(dispatcherProvider.io()) {
69+
wideEventFeature.self().isEnabled()
70+
}
71+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (c) 2025 DuckDuckGo
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.duckduckgo.app.statistics.wideevents
18+
19+
import androidx.lifecycle.LifecycleOwner
20+
import com.duckduckgo.app.di.AppCoroutineScope
21+
import com.duckduckgo.app.lifecycle.MainProcessLifecycleObserver
22+
import com.duckduckgo.app.statistics.wideevents.db.WideEventRepository
23+
import com.duckduckgo.common.utils.CurrentTimeProvider
24+
import com.duckduckgo.common.utils.DispatcherProvider
25+
import com.duckduckgo.di.scopes.AppScope
26+
import com.squareup.anvil.annotations.ContributesMultibinding
27+
import kotlinx.coroutines.CoroutineScope
28+
import kotlinx.coroutines.launch
29+
import kotlinx.coroutines.withContext
30+
import java.time.Duration
31+
import java.time.Instant
32+
import javax.inject.Inject
33+
34+
@ContributesMultibinding(AppScope::class)
35+
class WideEventCleaner @Inject constructor(
36+
@AppCoroutineScope private val appCoroutineScope: CoroutineScope,
37+
private val wideEventRepository: WideEventRepository,
38+
private val currentTimeProvider: CurrentTimeProvider,
39+
private val wideEventFeature: WideEventFeature,
40+
private val dispatcherProvider: DispatcherProvider,
41+
) : MainProcessLifecycleObserver {
42+
override fun onCreate(owner: LifecycleOwner) {
43+
appCoroutineScope.launch {
44+
runCatching {
45+
if (isFeatureEnabled()) {
46+
performWideEventCleanup()
47+
}
48+
}
49+
}
50+
}
51+
52+
private suspend fun performWideEventCleanup() {
53+
wideEventRepository
54+
.getActiveWideEventIds()
55+
.chunked(100)
56+
.forEach { idsChunk ->
57+
wideEventRepository
58+
.getWideEvents(idsChunk.toSet())
59+
.forEach { event ->
60+
processWideEvent(event)
61+
}
62+
}
63+
}
64+
65+
private suspend fun processWideEvent(event: WideEventRepository.WideEvent) {
66+
when (val policy = event.cleanupPolicy) {
67+
is WideEventRepository.CleanupPolicy.OnProcessStart -> {
68+
val hasIntervalTimeouts = event.activeIntervals.any { it.timeout != null }
69+
70+
if (!hasIntervalTimeouts || !policy.ignoreIfIntervalTimeoutPresent) {
71+
wideEventRepository.setWideEventStatus(
72+
eventId = event.id,
73+
status = policy.status,
74+
metadata = policy.metadata,
75+
)
76+
return
77+
}
78+
}
79+
80+
is WideEventRepository.CleanupPolicy.OnTimeout -> {
81+
if (isTimeoutReached(startAt = event.createdAt, timeout = policy.duration)) {
82+
wideEventRepository.setWideEventStatus(
83+
eventId = event.id,
84+
status = policy.status,
85+
metadata = policy.metadata,
86+
)
87+
return
88+
}
89+
}
90+
}
91+
92+
event.activeIntervals
93+
.firstOrNull { interval ->
94+
interval.timeout != null && isTimeoutReached(event.createdAt, interval.timeout)
95+
}?.let { interval ->
96+
wideEventRepository.setWideEventStatus(
97+
eventId = event.id,
98+
status = WideEventRepository.WideEventStatus.UNKNOWN,
99+
metadata = emptyMap(),
100+
)
101+
}
102+
}
103+
104+
private fun isTimeoutReached(
105+
startAt: Instant,
106+
timeout: Duration,
107+
): Boolean {
108+
val currentTime = Instant.ofEpochMilli(currentTimeProvider.currentTimeMillis())
109+
return timeout <= Duration.between(startAt, currentTime)
110+
}
111+
112+
private suspend fun isFeatureEnabled(): Boolean =
113+
withContext(dispatcherProvider.io()) {
114+
wideEventFeature.self().isEnabled()
115+
}
116+
}

0 commit comments

Comments
 (0)