Skip to content

Commit fa0a135

Browse files
authored
Merge pull request #2631 from DataDog/nogorodnikov/rum-9854/introduce-event-processing-thread
RUM-9854: Introduce event processing thread
2 parents ac90182 + 7956da7 commit fa0a135

File tree

70 files changed

+3181
-1311
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3181
-1311
lines changed

dd-sdk-android-core/api/apiSurface

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ interface com.datadog.android.api.feature.FeatureEventReceiver
108108
fun onReceive(Any)
109109
interface com.datadog.android.api.feature.FeatureScope
110110
val dataStore: com.datadog.android.api.storage.datastore.DataStoreHandler
111-
fun withWriteContext((com.datadog.android.api.context.DatadogContext, com.datadog.android.api.storage.EventBatchWriter) -> Unit)
111+
fun withWriteContext((com.datadog.android.api.context.DatadogContext) -> Unit)
112+
fun getWriteContextSync(): Pair<com.datadog.android.api.context.DatadogContext, EventWriteScope>?
112113
fun sendEvent(Any)
113114
fun <T: Feature> unwrap(): T
115+
typealias EventWriteScope = ((com.datadog.android.api.storage.EventBatchWriter) -> Unit) -> Unit
114116
fun <R: Any?> com.datadog.android.api.InternalLogger.measureMethodCallPerf(Class<*>, String, Float = 100f, () -> R): R
115117
interface com.datadog.android.api.feature.FeatureSdkCore : com.datadog.android.api.SdkCore
116118
val internalLogger: com.datadog.android.api.InternalLogger
@@ -308,6 +310,7 @@ fun Collection<ByteArray>.join(ByteArray, ByteArray = ByteArray(0), ByteArray =
308310
fun java.util.concurrent.Executor.executeSafe(String, com.datadog.android.api.InternalLogger, Runnable)
309311
fun java.util.concurrent.ScheduledExecutorService.scheduleSafe(String, Long, java.util.concurrent.TimeUnit, com.datadog.android.api.InternalLogger, Runnable): java.util.concurrent.ScheduledFuture<*>?
310312
fun java.util.concurrent.ExecutorService.submitSafe(String, com.datadog.android.api.InternalLogger, Runnable): java.util.concurrent.Future<*>?
313+
fun <T> java.util.concurrent.ExecutorService.submitSafe(String, com.datadog.android.api.InternalLogger, java.util.concurrent.Callable<T>): java.util.concurrent.Future<T>?
311314
val NULL_MAP_VALUE: Object
312315
object com.datadog.android.core.internal.utils.JsonSerializer
313316
fun toJsonElement(Any?): com.google.gson.JsonElement

dd-sdk-android-core/api/dd-sdk-android-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ public abstract interface class com/datadog/android/api/feature/FeatureEventRece
342342

343343
public abstract interface class com/datadog/android/api/feature/FeatureScope {
344344
public abstract fun getDataStore ()Lcom/datadog/android/api/storage/datastore/DataStoreHandler;
345+
public abstract fun getWriteContextSync ()Lkotlin/Pair;
345346
public abstract fun sendEvent (Ljava/lang/Object;)V
346347
public abstract fun unwrap ()Lcom/datadog/android/api/feature/Feature;
347348
public abstract fun withWriteContext (Lkotlin/jvm/functions/Function2;)V
@@ -812,6 +813,7 @@ public final class com/datadog/android/core/internal/utils/ConcurrencyExtKt {
812813
public static final fun executeSafe (Ljava/util/concurrent/Executor;Ljava/lang/String;Lcom/datadog/android/api/InternalLogger;Ljava/lang/Runnable;)V
813814
public static final fun scheduleSafe (Ljava/util/concurrent/ScheduledExecutorService;Ljava/lang/String;JLjava/util/concurrent/TimeUnit;Lcom/datadog/android/api/InternalLogger;Ljava/lang/Runnable;)Ljava/util/concurrent/ScheduledFuture;
814815
public static final fun submitSafe (Ljava/util/concurrent/ExecutorService;Ljava/lang/String;Lcom/datadog/android/api/InternalLogger;Ljava/lang/Runnable;)Ljava/util/concurrent/Future;
816+
public static final fun submitSafe (Ljava/util/concurrent/ExecutorService;Ljava/lang/String;Lcom/datadog/android/api/InternalLogger;Ljava/util/concurrent/Callable;)Ljava/util/concurrent/Future;
815817
}
816818

817819
public final class com/datadog/android/core/internal/utils/JsonSerializer {

dd-sdk-android-core/src/main/kotlin/com/datadog/android/api/feature/FeatureScope.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import androidx.annotation.AnyThread
1010
import com.datadog.android.api.context.DatadogContext
1111
import com.datadog.android.api.storage.EventBatchWriter
1212
import com.datadog.android.api.storage.datastore.DataStoreHandler
13+
import com.datadog.android.lint.InternalApi
1314

1415
/**
1516
* Represents a Datadog feature.
@@ -24,15 +25,24 @@ interface FeatureScope {
2425
/**
2526
* Utility to write an event, asynchronously.
2627
* @param callback an operation called with an up-to-date [DatadogContext]
27-
* and an [EventBatchWriter]. Callback will be executed on a worker thread from I/O pool.
28-
* [DatadogContext] will have a state created at the moment this method is called, before the
29-
* thread switch for the callback invocation.
28+
* and an [EventWriteScope]. Callback will be executed on a single context processing worker thread.
29+
* [DatadogContext] will have a state created at the moment this method is called.
3030
*/
3131
@AnyThread
3232
fun withWriteContext(
33-
callback: (DatadogContext, EventBatchWriter) -> Unit
33+
callback: (datadogContext: DatadogContext, write: EventWriteScope) -> Unit
3434
)
3535

36+
// TODO RUM-9852 Implement better passthrough mechanism for the JVM crash scenario
37+
/**
38+
* Same as [withWriteContext] but will be executed in the blocking manner.
39+
*
40+
* **NOTE**: This API is for the internal use only and is not guaranteed to be stable.
41+
*/
42+
@AnyThread
43+
@InternalApi
44+
fun getWriteContextSync(): Pair<DatadogContext, EventWriteScope>?
45+
3646
/**
3747
* Send event to a given feature. It will be sent in a synchronous way.
3848
*
@@ -45,3 +55,9 @@ interface FeatureScope {
4555
*/
4656
fun <T : Feature> unwrap(): T
4757
}
58+
59+
/**
60+
* Scope for the event write operation which is invoked on the worker thread from I/O pool, which is different
61+
* from the context processing worker thread used for [FeatureScope.withWriteContext] callback invocation.
62+
*/
63+
typealias EventWriteScope = ((EventBatchWriter) -> Unit) -> Unit

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/CoreFeature.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import com.datadog.android.core.internal.system.NoOpAppVersionProvider
5959
import com.datadog.android.core.internal.system.NoOpSystemInfoProvider
6060
import com.datadog.android.core.internal.system.SystemInfoProvider
6161
import com.datadog.android.core.internal.thread.BackPressureExecutorService
62+
import com.datadog.android.core.internal.thread.DatadogThreadFactory
6263
import com.datadog.android.core.internal.thread.LoggingScheduledThreadPoolExecutor
6364
import com.datadog.android.core.internal.thread.ScheduledExecutorServiceFactory
6465
import com.datadog.android.core.internal.time.AppStartTimeProvider
@@ -97,8 +98,10 @@ import java.lang.ref.WeakReference
9798
import java.util.Locale
9899
import java.util.concurrent.ConcurrentHashMap
99100
import java.util.concurrent.ExecutorService
101+
import java.util.concurrent.LinkedBlockingQueue
100102
import java.util.concurrent.ScheduledExecutorService
101103
import java.util.concurrent.ScheduledThreadPoolExecutor
104+
import java.util.concurrent.ThreadPoolExecutor
102105
import java.util.concurrent.TimeUnit
103106
import java.util.concurrent.atomic.AtomicBoolean
104107

@@ -144,6 +147,7 @@ internal class CoreFeature(
144147

145148
internal lateinit var uploadExecutorService: ScheduledThreadPoolExecutor
146149
internal lateinit var persistenceExecutorService: FlushableExecutorService
150+
internal lateinit var contextExecutorService: ThreadPoolExecutor
147151
internal lateinit var backpressureStrategy: BackPressureStrategy
148152

149153
internal var localDataEncryption: Encryption? = null
@@ -286,6 +290,7 @@ internal class CoreFeature(
286290
fun drainAndShutdownExecutors() {
287291
val tasks = arrayListOf<Runnable>()
288292

293+
contextExecutorService.queue.drainTo(tasks)
289294
persistenceExecutorService.drainTo(tasks)
290295

291296
uploadExecutorService
@@ -295,9 +300,11 @@ internal class CoreFeature(
295300
// we need to make sure we drain the runnable list in both executors first
296301
// then we shut them down by using the await termination method to make sure we block
297302
// the thread until the active task is finished.
303+
contextExecutorService.shutdown()
298304
persistenceExecutorService.shutdown()
299305
uploadExecutorService.shutdown()
300306

307+
contextExecutorService.awaitTermination(DRAIN_WAIT_SECONDS, TimeUnit.SECONDS)
301308
persistenceExecutorService.awaitTermination(DRAIN_WAIT_SECONDS, TimeUnit.SECONDS)
302309
uploadExecutorService.awaitTermination(DRAIN_WAIT_SECONDS, TimeUnit.SECONDS)
303310

@@ -613,6 +620,19 @@ internal class CoreFeature(
613620
executorContext = "storage",
614621
backPressureStrategy = backpressureStrategy
615622
)
623+
// TODO RUM-9851 Switch to the executor which is aware of backpressure, but only logs it
624+
@Suppress("UnsafeThirdPartyFunctionCall") // all parameters are safe
625+
contextExecutorService = ThreadPoolExecutor(
626+
// core pool size
627+
1,
628+
// max pool size,
629+
1,
630+
// keep-alive time
631+
0L,
632+
TimeUnit.MILLISECONDS,
633+
LinkedBlockingQueue(),
634+
DatadogThreadFactory("context")
635+
)
616636
}
617637

618638
private fun resolveProcessInfo(appContext: Context) {
@@ -637,10 +657,12 @@ internal class CoreFeature(
637657

638658
private fun shutDownExecutors() {
639659
uploadExecutorService.shutdownNow()
660+
contextExecutorService.shutdownNow()
640661
persistenceExecutorService.shutdownNow()
641662

642663
try {
643664
uploadExecutorService.awaitTermination(1, TimeUnit.SECONDS)
665+
contextExecutorService.awaitTermination(1, TimeUnit.SECONDS)
644666
persistenceExecutorService.awaitTermination(1, TimeUnit.SECONDS)
645667
} catch (e: InterruptedException) {
646668
try {

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/DatadogCore.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,14 @@ internal class DatadogCore(
179179
/** @inheritDoc */
180180
@AnyThread
181181
override fun clearAllData() {
182-
features.values.forEach {
183-
it.clearAllData()
184-
}
185-
getPersistenceExecutorService().executeSafe("Clear all data", internalLogger) {
186-
coreFeature.deleteLastViewEvent()
187-
coreFeature.deleteLastFatalAnrSent()
182+
coreFeature.contextExecutorService.executeSafe("DatadogCore.clearAllData", internalLogger) {
183+
features.values.forEach {
184+
it.clearAllData()
185+
}
186+
getPersistenceExecutorService().executeSafe("Clear all data", internalLogger) {
187+
coreFeature.deleteLastViewEvent()
188+
coreFeature.deleteLastFatalAnrSent()
189+
}
188190
}
189191
}
190192

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/SdkFeature.kt

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ import androidx.annotation.AnyThread
1212
import androidx.annotation.WorkerThread
1313
import com.datadog.android.api.InternalLogger
1414
import com.datadog.android.api.context.DatadogContext
15+
import com.datadog.android.api.feature.EventWriteScope
1516
import com.datadog.android.api.feature.Feature
1617
import com.datadog.android.api.feature.FeatureContextUpdateReceiver
1718
import com.datadog.android.api.feature.FeatureEventReceiver
1819
import com.datadog.android.api.feature.FeatureScope
1920
import com.datadog.android.api.feature.StorageBackedFeature
2021
import com.datadog.android.api.net.RequestFactory
21-
import com.datadog.android.api.storage.EventBatchWriter
2222
import com.datadog.android.api.storage.FeatureStorageConfiguration
2323
import com.datadog.android.api.storage.datastore.DataStoreHandler
2424
import com.datadog.android.core.configuration.UploadSchedulerStrategy
@@ -52,14 +52,19 @@ import com.datadog.android.core.internal.persistence.file.NoOpFileOrchestrator
5252
import com.datadog.android.core.internal.persistence.file.advanced.FeatureFileOrchestrator
5353
import com.datadog.android.core.internal.persistence.file.batch.BatchFileReaderWriter
5454
import com.datadog.android.core.internal.persistence.tlvformat.TLVBlockFileReader
55+
import com.datadog.android.core.internal.utils.executeSafe
56+
import com.datadog.android.core.internal.utils.submitSafe
5557
import com.datadog.android.core.persistence.PersistenceStrategy
5658
import com.datadog.android.internal.profiler.BenchmarkSdkUploads
5759
import com.datadog.android.internal.profiler.GlobalBenchmark
5860
import com.datadog.android.privacy.TrackingConsentProviderCallback
5961
import com.datadog.android.security.Encryption
6062
import java.util.Collections
6163
import java.util.Locale
64+
import java.util.concurrent.Callable
65+
import java.util.concurrent.CancellationException
6266
import java.util.concurrent.ConcurrentHashMap
67+
import java.util.concurrent.ExecutionException
6368
import java.util.concurrent.atomic.AtomicBoolean
6469
import java.util.concurrent.atomic.AtomicReference
6570

@@ -169,16 +174,43 @@ internal class SdkFeature(
169174
// region FeatureScope
170175

171176
override fun withWriteContext(
172-
callback: (DatadogContext, EventBatchWriter) -> Unit
177+
callback: (DatadogContext, EventWriteScope) -> Unit
173178
) {
174-
// TODO RUM-1462 thread safety. Thread switch happens in Storage right now. Open questions:
175-
// * what if caller wants to have a sync operation, without thread switch
176-
// * should context read and write be on the dedicated thread? risk - time gap between
177-
// caller and context
178-
val contextProvider = coreFeature.contextProvider
179-
if (contextProvider is NoOpContextProvider) return
180-
val context = contextProvider.context
181-
storage.writeCurrentBatch(context) { callback(context, it) }
179+
coreFeature.contextExecutorService
180+
.executeSafe("withWriteContext-${wrappedFeature.name}", internalLogger) {
181+
val contextProvider = coreFeature.contextProvider
182+
if (contextProvider is NoOpContextProvider) return@executeSafe
183+
val context = contextProvider.context
184+
val eventBatchWriteScope = storage.getEventWriteScope(context)
185+
callback(context, eventBatchWriteScope)
186+
}
187+
}
188+
189+
override fun getWriteContextSync(): Pair<DatadogContext, EventWriteScope>? {
190+
val future = coreFeature.contextExecutorService
191+
.submitSafe(
192+
"getWriteContextSync-${wrappedFeature.name}",
193+
internalLogger,
194+
Callable {
195+
val contextProvider = coreFeature.contextProvider
196+
if (contextProvider is NoOpContextProvider) return@Callable null
197+
val context = contextProvider.context
198+
val eventBatchWriteScope = storage.getEventWriteScope(context)
199+
context to eventBatchWriteScope
200+
}
201+
)
202+
return try {
203+
future?.get()
204+
} catch (e: CancellationException) {
205+
logGetWriteContextSyncError(e)
206+
null
207+
} catch (e: ExecutionException) {
208+
logGetWriteContextSyncError(e)
209+
null
210+
} catch (e: InterruptedException) {
211+
logGetWriteContextSyncError(e)
212+
null
213+
}
182214
}
183215

184216
override fun sendEvent(event: Any) {
@@ -429,6 +461,15 @@ internal class SdkFeature(
429461
)
430462
}
431463

464+
private fun logGetWriteContextSyncError(e: Exception) {
465+
internalLogger.log(
466+
level = InternalLogger.Level.ERROR,
467+
target = InternalLogger.Target.USER,
468+
{ FAILED_TO_GET_WRITE_CONTEXT_SYNC },
469+
e
470+
)
471+
}
472+
432473
// endregion
433474

434475
// Used for nightly tests only
@@ -452,6 +493,7 @@ internal class SdkFeature(
452493
"Feature \"%s\" already has this listener registered."
453494
const val NO_EVENT_RECEIVER =
454495
"Feature \"%s\" has no event receiver registered, ignoring event."
496+
internal const val FAILED_TO_GET_WRITE_CONTEXT_SYNC = "Failed to get write context in a sync manner."
455497
internal const val TRACK_NAME = "track"
456498
internal const val METER_NAME = "dd-sdk-android"
457499
internal const val BATCH_COUNT_METRIC_NAME = "android.benchmark.batch_count"

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/persistence/AbstractStorage.kt

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import androidx.annotation.AnyThread
1010
import androidx.annotation.WorkerThread
1111
import com.datadog.android.api.InternalLogger
1212
import com.datadog.android.api.context.DatadogContext
13+
import com.datadog.android.api.feature.EventWriteScope
1314
import com.datadog.android.api.storage.EventBatchWriter
1415
import com.datadog.android.api.storage.EventType
1516
import com.datadog.android.api.storage.FeatureStorageConfiguration
@@ -49,6 +50,8 @@ internal class AbstractStorage(
4950
)
5051
}
5152

53+
private val writeLock = Any()
54+
5255
private val notGrantedPersistenceStrategy: PersistenceStrategy = NoOpPersistenceStrategy()
5356

5457
init {
@@ -59,28 +62,26 @@ internal class AbstractStorage(
5962
// region Storage
6063

6164
@AnyThread
62-
override fun writeCurrentBatch(
63-
datadogContext: DatadogContext,
64-
callback: (EventBatchWriter) -> Unit
65-
) {
66-
executorService.executeSafe("Data write", internalLogger) {
67-
val strategy = resolvePersistenceStrategy(datadogContext)
68-
val writer = object : EventBatchWriter {
69-
@WorkerThread
70-
override fun currentMetadata(): ByteArray? {
71-
return strategy.currentMetadata()
72-
}
65+
override fun getEventWriteScope(
66+
datadogContext: DatadogContext
67+
): EventWriteScope {
68+
val strategy = resolvePersistenceStrategy(datadogContext)
69+
val writer = object : EventBatchWriter {
70+
@WorkerThread
71+
override fun currentMetadata(): ByteArray? {
72+
return strategy.currentMetadata()
73+
}
7374

74-
@WorkerThread
75-
override fun write(event: RawBatchEvent, batchMetadata: ByteArray?, eventType: EventType): Boolean {
76-
return strategy.write(event, batchMetadata, eventType)
77-
}
75+
@WorkerThread
76+
override fun write(event: RawBatchEvent, batchMetadata: ByteArray?, eventType: EventType): Boolean {
77+
return strategy.write(event, batchMetadata, eventType)
7878
}
79-
callback.invoke(writer)
8079
}
80+
// although we don't know what storage is backed by the persistence strategy, so maybe writing in a concurrent
81+
// way is fine there and lock is not needed, but taking precautions
82+
return AsyncEventWriteScope(executorService, writer, writeLock, featureName, internalLogger)
8183
}
8284

83-
@WorkerThread
8485
private fun resolvePersistenceStrategy(datadogContext: DatadogContext) =
8586
when (datadogContext.trackingConsent) {
8687
TrackingConsent.GRANTED -> grantedPersistenceStrategy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
3+
* This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
* Copyright 2016-Present Datadog, Inc.
5+
*/
6+
7+
package com.datadog.android.core.internal.persistence
8+
9+
import com.datadog.android.api.InternalLogger
10+
import com.datadog.android.api.feature.EventWriteScope
11+
import com.datadog.android.api.storage.EventBatchWriter
12+
import com.datadog.android.core.internal.utils.executeSafe
13+
import java.util.concurrent.Executor
14+
15+
internal class AsyncEventWriteScope(
16+
private val executor: Executor,
17+
private val writer: EventBatchWriter,
18+
private val featureWriteLock: Any,
19+
private val featureName: String,
20+
private val internalLogger: InternalLogger
21+
) : EventWriteScope {
22+
override fun invoke(block: (EventBatchWriter) -> Unit) {
23+
executor.executeSafe("eventWriteScopeInvoke-$featureName", internalLogger) {
24+
// since writing may not be atomic: we can write batch data + batch metadata, there is a gap between
25+
// getting file for writing and write op, we sync file operation with a feature-wide lock
26+
synchronized(featureWriteLock) {
27+
block.invoke(writer)
28+
}
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)