@@ -2,6 +2,10 @@ package com.amplitude.android
22
33import com.amplitude.android.Amplitude.Companion.END_SESSION_EVENT
44import com.amplitude.android.Amplitude.Companion.START_SESSION_EVENT
5+ import com.amplitude.android.EventQueueMessage.EnterForeground
6+ import com.amplitude.android.EventQueueMessage.Event
7+ import com.amplitude.android.EventQueueMessage.ExitForeground
8+ import com.amplitude.android.EventQueueMessage.SetIdentity
59import com.amplitude.core.Storage
610import com.amplitude.core.Storage.Constants
711import com.amplitude.core.Storage.Constants.LAST_EVENT_ID
@@ -10,7 +14,6 @@ import com.amplitude.core.Storage.Constants.PREVIOUS_SESSION_ID
1014import com.amplitude.core.events.BaseEvent
1115import com.amplitude.core.platform.Timeline
1216import kotlinx.coroutines.channels.Channel
13- import kotlinx.coroutines.channels.ChannelResult
1417import kotlinx.coroutines.channels.onFailure
1518import kotlinx.coroutines.launch
1619import java.util.concurrent.atomic.AtomicBoolean
@@ -67,62 +70,53 @@ class Timeline(
6770 incomingEvent.timestamp = System .currentTimeMillis()
6871 }
6972
70- eventMessageChannel.trySend(EventQueueMessage .Event (incomingEvent))
71- .logOnFailure(" Failed to enqueue event: ${incomingEvent.eventType} . Channel is closed or full." )
73+ eventMessageChannel.trySendAndLog(Event (incomingEvent))
7274 }
7375
7476 internal fun onEnterForeground (timestamp : Long ) {
75- eventMessageChannel.trySend(EventQueueMessage .EnterForeground (timestamp))
76- .logOnFailure(" Failed to enqueue EnterForeground. Channel is closed or full." )
77+ eventMessageChannel.trySendAndLog(EnterForeground (timestamp))
7778 }
7879
7980 internal fun onExitForeground (timestamp : Long ) {
80- eventMessageChannel.trySend(EventQueueMessage .ExitForeground (timestamp))
81- .logOnFailure(" Failed to enqueue ExitForeground. Channel is closed or full." )
81+ eventMessageChannel.trySendAndLog(ExitForeground (timestamp))
8282 }
8383
8484 /* *
85- * Queue a user ID change to be processed in order with events.
85+ * Queue an identity field change to be processed in order with events.
8686 * This ensures identity changes happen in FIFO order with other events.
8787 */
88- internal fun queueSetUserId (userId : String? ) {
89- eventMessageChannel.trySend(EventQueueMessage .SetUserId (userId))
90- .logOnFailure(" Failed to enqueue SetUserId. Channel is closed or full." )
91- }
92-
93- /* *
94- * Queue a device ID change to be processed in order with events.
95- * This ensures identity changes happen in FIFO order with other events.
96- */
97- internal fun queueSetDeviceId (deviceId : String ) {
98- eventMessageChannel.trySend(EventQueueMessage .SetDeviceId (deviceId))
99- .logOnFailure(" Failed to enqueue SetDeviceId. Channel is closed or full." )
88+ internal fun queueSetIdentity (field : IdentityField ) {
89+ eventMessageChannel.trySendAndLog(SetIdentity (field))
10090 }
10191
10292 /* *
10393 * Process an event message from the event queue.
10494 */
10595 private suspend fun processEventMessage (message : EventQueueMessage ) {
10696 when (message) {
107- is EventQueueMessage . EnterForeground -> {
97+ is EnterForeground -> {
10898 val stopAndStartSessionEvents = startNewSessionIfNeeded(message.timestamp)
10999 foreground.set(true )
110100 processAndPersistEvents(stopAndStartSessionEvents)
111101 }
112- is EventQueueMessage . Event -> {
102+ is Event -> {
113103 processEvent(message.event)
114104 }
115- is EventQueueMessage . ExitForeground -> {
105+ is ExitForeground -> {
116106 foreground.set(false )
117107 refreshSessionTime(message.timestamp)
118108 }
119- is EventQueueMessage .SetUserId -> {
120- amplitude.idContainer.identityManager.editIdentity()
121- .setUserId(message.userId).commit()
122- }
123- is EventQueueMessage .SetDeviceId -> {
124- amplitude.idContainer.identityManager.editIdentity()
125- .setDeviceId(message.deviceId).commit()
109+ is SetIdentity -> {
110+ when (val field = message.field) {
111+ is IdentityField .UserId -> {
112+ amplitude.idContainer.identityManager.editIdentity()
113+ .setUserId(field.value).commit()
114+ }
115+ is IdentityField .DeviceId -> {
116+ amplitude.idContainer.identityManager.editIdentity()
117+ .setDeviceId(field.value).commit()
118+ }
119+ }
126120 }
127121 }
128122 }
@@ -230,10 +224,11 @@ class Timeline(
230224 return sessionId > DEFAULT_SESSION_ID
231225 }
232226
233- private fun <T > ChannelResult<T>.logOnFailure (message : String ) =
234- onFailure {
235- amplitude.logger.error(message)
236- }
227+ private fun Channel<EventQueueMessage>.trySendAndLog (event : EventQueueMessage ) =
228+ trySend(event)
229+ .onFailure {
230+ amplitude.logger.error(" Failed to enqueue event $event . Channel is closed or full." )
231+ }
237232
238233 private fun Storage.readLong (
239234 key : Constants ,
@@ -243,14 +238,18 @@ class Timeline(
243238 }
244239}
245240
241+ sealed class IdentityField {
242+ data class UserId (val value : String? ) : IdentityField()
243+
244+ data class DeviceId (val value : String ) : IdentityField()
245+ }
246+
246247sealed class EventQueueMessage {
247248 data class Event (val event : BaseEvent ) : EventQueueMessage()
248249
249250 data class EnterForeground (val timestamp : Long ) : EventQueueMessage()
250251
251252 data class ExitForeground (val timestamp : Long ) : EventQueueMessage()
252253
253- data class SetUserId (val userId : String? ) : EventQueueMessage()
254-
255- data class SetDeviceId (val deviceId : String ) : EventQueueMessage()
254+ data class SetIdentity (val field : IdentityField ) : EventQueueMessage()
256255}
0 commit comments