11package com.segment.analytics.kotlin.destinations.amplitude
22
3+ import android.content.Context
4+ import android.content.SharedPreferences
35import com.segment.analytics.kotlin.core.*
46import com.segment.analytics.kotlin.core.platform.EventPlugin
57import com.segment.analytics.kotlin.core.platform.Plugin
68import com.segment.analytics.kotlin.core.platform.VersionedPlugin
79import com.segment.analytics.kotlin.core.platform.plugins.logger.*
810import com.segment.analytics.kotlin.core.utilities.putIntegrations
9- import java.util.concurrent.locks.ReentrantLock
10- import kotlin.concurrent.withLock
11+ import com.segment.analytics.kotlin.core.utilities.updateJsonObject
12+ import com.segment.analytics.kotlin.core.utilities.*
13+ import java.util.concurrent.atomic.AtomicBoolean
14+ import java.util.concurrent.atomic.AtomicLong
15+ import androidx.core.content.edit
1116
1217// A Destination plugin that adds session tracking to Amplitude cloud mode.
1318class AmplitudeSession (private val sessionTimeoutMs : Long = 300000 ) : EventPlugin, VersionedPlugin {
1419
20+ companion object {
21+ const val PREVIOUS_SESSION_ID = " previous_session_id"
22+ const val LAST_EVENT_TIME = " last_event_time"
23+
24+ const val AMP_PREFIX = " [Amplitude] "
25+ const val AMP_SESSION_END_EVENT = " session_end"
26+ const val AMP_SESSION_START_EVENT = " session_start"
27+ }
28+
1529 override val type: Plugin .Type = Plugin .Type .Enrichment
1630 override lateinit var analytics: Analytics
17- private var eventSessionId = - 1L
18- private var sessionId = eventSessionId
1931 private val key = " Actions Amplitude"
20- private val ampSessionEndEvent = " session_end"
21- private val ampSessionStartEvent = " session_start"
22- private var active = false
23- private var lastEventFiredTime = java.lang.System .currentTimeMillis()
24- private var sessionUpdateLock = ReentrantLock ()
32+
33+
34+ private val active = AtomicBoolean (false )
35+ private var prefs: SharedPreferences ? = null
36+
37+ private val _sessionId = AtomicLong (- 1L )
38+ private var sessionId
39+ get() = _sessionId .get()
40+ set(value) {
41+ _sessionId .set(value)
42+ prefs?.edit(commit = true ) { putLong(PREVIOUS_SESSION_ID , value) }
43+ }
44+ private val _lastEventTime = AtomicLong (- 1L )
45+ private var lastEventTime
46+ get() = _lastEventTime .get()
47+ set(value) {
48+ _lastEventTime .set(value)
49+ prefs?.edit(commit = true ) { putLong(LAST_EVENT_TIME , value) }
50+ }
2551
2652 override fun setup (analytics : Analytics ) {
2753 super .setup(analytics)
28- startNewSessionIfNecessary()
54+
55+ var context: Context ? = null
56+ if (analytics.configuration.application is Context ){
57+ context = analytics.configuration.application as Context
58+ }
59+ context?.let {
60+ prefs = context.getSharedPreferences(" analytics-android-${analytics.configuration.writeKey} " , Context .MODE_PRIVATE )
61+ prefs?.let {
62+ _sessionId .set(it.getLong(PREVIOUS_SESSION_ID , - 1 ))
63+ _lastEventTime .set(it.getLong(LAST_EVENT_TIME , - 1 ))
64+ }
65+ }
66+
67+ if (sessionId == - 1L ) {
68+ startNewSession()
69+ }
70+ else {
71+ startNewSessionIfNecessary()
72+ }
2973 }
74+
3075 override fun update (settings : Settings , type : Plugin .UpdateType ) {
31- active = settings.hasIntegrationSettings(key)
76+ if (type != Plugin .UpdateType .Initial ) return
77+
78+ active.set(settings.hasIntegrationSettings(key))
3279 }
3380
3481 override fun execute (event : BaseEvent ): BaseEvent ? {
35- if (! active) { // If amplitude destination is disabled, no need to do this enrichment
82+ if (! active.get() ) { // If amplitude destination is disabled, no need to do this enrichment
3683 return event
3784 }
3885
3986 startNewSessionIfNecessary()
4087
41- val modified = super .execute(event)
4288 analytics.log(
4389 message = " Running ${event.type} payload through AmplitudeSession"
4490 )
4591
46- if (event is TrackEvent ) {
47- if (event.event == ampSessionStartEvent) {
48- // Update session ID for all events after this in the queue
49- eventSessionId = sessionId
50- analytics.log(message = " NewSession = $eventSessionId " )
92+ var modified = super .execute(event)
93+ modified = insertSession(modified)
94+
95+ if (modified is ScreenEvent ) {
96+ val screenName = modified.name
97+ modified.properties = updateJsonObject(modified.properties) {
98+ // amp needs screen name in the properties for screen event
99+ it[" name" ] = screenName
100+ }
101+ }
102+ else if (modified is TrackEvent ) {
103+ if (modified.event == AMP_SESSION_START_EVENT ) {
104+ modified = modified.disableCloudIntegrations(exceptKeys = listOf (key))
105+ analytics.log(message = " NewSession = $sessionId " )
106+ }
107+ else if (modified.event == AMP_SESSION_END_EVENT ) {
108+ modified = modified.disableCloudIntegrations(exceptKeys = listOf (key))
109+ analytics.log(message = " EndSession = $sessionId " )
51110 }
52- else if (event .event == ampSessionEndEvent ) {
53- analytics.log(message = " EndSession = $eventSessionId " )
111+ else if (modified .event.contains( AMP_PREFIX ) ) {
112+ modified = modified.disableCloudIntegrations(exceptKeys = listOf (key) )
54113 }
55114 }
56115
57- return modified?.putIntegrations(key, mapOf (" session_id" to eventSessionId))
116+ // renew the session if there are activities
117+ lastEventTime = java.lang.System .currentTimeMillis()
118+ return modified
58119 }
59120
60121 override fun track (payload : TrackEvent ): BaseEvent {
@@ -67,39 +128,52 @@ class AmplitudeSession (private val sessionTimeoutMs : Long = 300000) : EventPlu
67128 return payload
68129 }
69130
131+ private fun insertSession (event : BaseEvent ? ): BaseEvent ? {
132+ // for session start or session end, session id is already attached, so skip it
133+ if (event?.integrations?.get(key)?.safeJsonObject?.containsKey(" session_id" ) == true ) {
134+ return event
135+ }
136+
137+ return event?.putIntegrations(key, mapOf (" session_id" to sessionId))
138+ }
139+
70140 private fun onBackground () {
141+ lastEventTime = java.lang.System .currentTimeMillis()
71142 }
72143
73144 private fun onForeground () {
74145 startNewSessionIfNecessary()
75146 }
76147
77148 private fun startNewSession () {
78- analytics.track(ampSessionStartEvent)
149+ sessionId = java.lang.System .currentTimeMillis()
150+
151+ // need to snapshot the current sessionId
152+ // because when the enrichment closure is applied the sessionId might have changed
153+ val copy = sessionId
154+ analytics.track(AMP_SESSION_START_EVENT ) { event ->
155+ event?.putIntegrations(key, mapOf (" session_id" to copy))
156+ }
79157 }
80158
81159 private fun endSession () {
82- analytics.track(ampSessionEndEvent)
160+ // need to snapshot the current sessionId
161+ // because when the enrichment closure is applied the sessionId might have changed
162+ val copy = sessionId
163+ analytics.track(AMP_SESSION_END_EVENT ) { event ->
164+ event?.putIntegrations(key, mapOf (" session_id" to copy))
165+ }
83166 }
84167
85168 private fun startNewSessionIfNecessary () {
86- sessionUpdateLock.withLock {
87- val current = java.lang.System .currentTimeMillis()
88- // Make sure the first event has a valid ID and we send a session start.
89- // Subsequent events should have session IDs updated after the session track event is sent
90- if (eventSessionId == - 1L || sessionId == - 1L ) {
91- sessionId = current
92- eventSessionId = current
93- lastEventFiredTime = current
94- startNewSession()
95- } else if (current - lastEventFiredTime >= sessionTimeoutMs) {
96- sessionId = current
97- lastEventFiredTime = current
98-
99- endSession()
100- startNewSession()
101- }
102- }
169+ val current = java.lang.System .currentTimeMillis()
170+ val withinSessionLimit = (current - lastEventTime < sessionTimeoutMs)
171+ if (sessionId >= 0 && withinSessionLimit) return
172+
173+ // we'll consider this our new lastEventTime
174+ lastEventTime = current
175+ endSession()
176+ startNewSession()
103177 }
104178
105179 override fun version (): String {
0 commit comments