Skip to content

Commit 1379c79

Browse files
authored
Merge pull request #2745 from DataDog/nogorodnikov/rum-9851/monitor-backpressure-for-context-executor
RUM-9851: Monitor backpressure of context executor
2 parents 6375758 + bc4e1fa commit 1379c79

File tree

7 files changed

+546
-34
lines changed

7 files changed

+546
-34
lines changed

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/CoreFeature.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import com.datadog.android.core.internal.system.NoOpAppVersionProvider
6161
import com.datadog.android.core.internal.system.NoOpSystemInfoProvider
6262
import com.datadog.android.core.internal.system.SystemInfoProvider
6363
import com.datadog.android.core.internal.thread.BackPressureExecutorService
64+
import com.datadog.android.core.internal.thread.BackPressuredBlockingQueue
6465
import com.datadog.android.core.internal.thread.DatadogThreadFactory
6566
import com.datadog.android.core.internal.thread.LoggingScheduledThreadPoolExecutor
6667
import com.datadog.android.core.internal.thread.ScheduledExecutorServiceFactory
@@ -100,7 +101,6 @@ import java.io.FileNotFoundException
100101
import java.lang.ref.WeakReference
101102
import java.util.Locale
102103
import java.util.concurrent.ExecutorService
103-
import java.util.concurrent.LinkedBlockingQueue
104104
import java.util.concurrent.ScheduledExecutorService
105105
import java.util.concurrent.ScheduledThreadPoolExecutor
106106
import java.util.concurrent.ThreadPoolExecutor
@@ -636,7 +636,16 @@ internal class CoreFeature(
636636
executorContext = "storage",
637637
backPressureStrategy = backpressureStrategy
638638
)
639-
// TODO RUM-9851 Switch to the executor which is aware of backpressure, but only logs it
639+
val contextQueue = BackPressuredBlockingQueue<Runnable>(
640+
internalLogger,
641+
executorContext = "context",
642+
capacity = Int.MAX_VALUE,
643+
notifyThreshold = 1024,
644+
// just notify when reached
645+
onItemDropped = {},
646+
onThresholdReached = {},
647+
backpressureMitigation = null
648+
)
640649
@Suppress("UnsafeThirdPartyFunctionCall") // all parameters are safe
641650
contextExecutorService = ThreadPoolExecutor(
642651
// core pool size
@@ -646,7 +655,7 @@ internal class CoreFeature(
646655
// keep-alive time
647656
0L,
648657
TimeUnit.MILLISECONDS,
649-
LinkedBlockingQueue(),
658+
contextQueue,
650659
DatadogThreadFactory("context")
651660
)
652661
}

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,55 @@ import com.datadog.android.core.configuration.BackPressureStrategy
1212
import com.datadog.android.internal.thread.NamedExecutionUnit
1313
import java.util.concurrent.TimeUnit
1414

15-
internal class BackPressuredBlockingQueue<E : Any>(
16-
private val logger: InternalLogger,
17-
private val executorContext: String,
18-
private val backPressureStrategy: BackPressureStrategy
19-
) : ObservableLinkedBlockingQueue<E>(
20-
backPressureStrategy.capacity
21-
) {
15+
/**
16+
* [LinkedBlockingQueue] that supports backpressure handling via the chosen backpressure mitigation strategy.
17+
*
18+
* This queue may be either bounded or unbounded by specifying capacity. See docs of [LinkedBlockingQueue] for more
19+
* details.
20+
*
21+
* If queue is unbounded, there is still a possibility to be notified if certain size threshold is reached.
22+
*/
23+
internal class BackPressuredBlockingQueue<E : Any> : ObservableLinkedBlockingQueue<E> {
24+
25+
private val logger: InternalLogger
26+
private val executorContext: String
27+
internal val capacity: Int
28+
private val notifyThreshold: Int
29+
private val onThresholdReached: () -> Unit
30+
private val onItemDropped: (Any) -> Unit
31+
private val backpressureMitigation: BackPressureMitigation?
32+
33+
constructor(
34+
logger: InternalLogger,
35+
executorContext: String,
36+
backPressureStrategy: BackPressureStrategy
37+
) : this(
38+
logger,
39+
executorContext,
40+
backPressureStrategy.capacity,
41+
backPressureStrategy.capacity,
42+
backPressureStrategy.onThresholdReached,
43+
backPressureStrategy.onItemDropped,
44+
backPressureStrategy.backpressureMitigation
45+
)
46+
47+
constructor(
48+
logger: InternalLogger,
49+
executorContext: String,
50+
notifyThreshold: Int,
51+
capacity: Int,
52+
onThresholdReached: () -> Unit,
53+
onItemDropped: (Any) -> Unit,
54+
backpressureMitigation: BackPressureMitigation?
55+
) : super(capacity) {
56+
this.logger = logger
57+
this.executorContext = executorContext
58+
this.capacity = capacity
59+
this.notifyThreshold = notifyThreshold
60+
this.onThresholdReached = onThresholdReached
61+
this.onItemDropped = onItemDropped
62+
this.backpressureMitigation = backpressureMitigation
63+
}
2264

2365
override fun offer(e: E): Boolean {
2466
return addWithBackPressure(e) {
@@ -33,52 +75,59 @@ internal class BackPressuredBlockingQueue<E : Any>(
3375
if (!accepted) {
3476
return offer(e)
3577
} else {
36-
if (remainingCapacity() == 0) {
37-
onThresholdReached()
78+
if (size == notifyThreshold) {
79+
notifyThresholdReached()
3880
}
3981
return true
4082
}
4183
}
4284

85+
override fun put(e: E) {
86+
if (size + 1 == notifyThreshold) {
87+
notifyThresholdReached()
88+
}
89+
super.put(e)
90+
}
91+
4392
private fun addWithBackPressure(
4493
e: E,
4594
operation: (E) -> Boolean
4695
): Boolean {
4796
val remainingCapacity = remainingCapacity()
4897
return if (remainingCapacity == 0) {
49-
when (backPressureStrategy.backpressureMitigation) {
98+
when (backpressureMitigation) {
5099
BackPressureMitigation.DROP_OLDEST -> {
51100
val first = take()
52-
onItemDropped(first)
101+
notifyItemDropped(first)
53102
operation(e)
54103
}
55104

56-
BackPressureMitigation.IGNORE_NEWEST -> {
57-
onItemDropped(e)
105+
BackPressureMitigation.IGNORE_NEWEST, null -> {
106+
notifyItemDropped(e)
58107
true
59108
}
60109
}
61110
} else {
62-
if (remainingCapacity == 1) {
63-
onThresholdReached()
111+
if (size + 1 == notifyThreshold) {
112+
notifyThresholdReached()
64113
}
65114
operation(e)
66115
}
67116
}
68117

69-
private fun onThresholdReached() {
118+
private fun notifyThresholdReached() {
70119
val dump = dumpQueue()
71120
val backPressureMap = buildMap {
72-
put("capacity", backPressureStrategy.capacity)
121+
put("capacity", capacity)
73122
if (!dump.isNullOrEmpty()) {
74123
put("dump", dump)
75124
}
76125
}
77-
backPressureStrategy.onThresholdReached()
126+
onThresholdReached()
78127
logger.log(
79128
level = InternalLogger.Level.WARN,
80129
targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
81-
messageBuilder = { "BackPressuredBlockingQueue reached capacity:${backPressureStrategy.capacity}" },
130+
messageBuilder = { "BackPressuredBlockingQueue reached capacity:$notifyThreshold" },
82131
throwable = null,
83132
onlyOnce = false,
84133
additionalProperties = mapOf(
@@ -88,8 +137,8 @@ internal class BackPressuredBlockingQueue<E : Any>(
88137
)
89138
}
90139

91-
private fun onItemDropped(item: E) {
92-
backPressureStrategy.onItemDropped(item)
140+
private fun notifyItemDropped(item: E) {
141+
onItemDropped(item)
93142
val name = (item as? NamedExecutionUnit)?.name ?: item.toString()
94143
// Note, do not send this to telemetry as it might cause a stack overflow
95144
logger.log(
@@ -99,7 +148,7 @@ internal class BackPressuredBlockingQueue<E : Any>(
99148
throwable = null,
100149
onlyOnce = false,
101150
additionalProperties = mapOf(
102-
"backpressure.capacity" to backPressureStrategy.capacity,
151+
"backpressure.capacity" to capacity,
103152
"executor.context" to executorContext
104153
)
105154
)

dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/ObservableLinkedBlockingQueue.kt

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,22 @@ import java.util.concurrent.LinkedBlockingQueue
1111
import java.util.concurrent.TimeUnit
1212
import java.util.concurrent.atomic.AtomicLong
1313

14-
internal open class ObservableLinkedBlockingQueue<E : Any>(
15-
capacity: Int,
16-
private val currentTimeProvider: () -> Long = { System.currentTimeMillis() }
17-
) : LinkedBlockingQueue<E>(capacity) {
14+
internal open class ObservableLinkedBlockingQueue<E : Any> : LinkedBlockingQueue<E> {
1815

19-
private var lastDumpTimestamp: AtomicLong = AtomicLong(0)
16+
private val currentTimeProvider: () -> Long
17+
18+
constructor(
19+
currentTimeProvider: () -> Long = { System.currentTimeMillis() }
20+
) : this(Int.MAX_VALUE, currentTimeProvider)
21+
22+
constructor(
23+
capacity: Int,
24+
currentTimeProvider: () -> Long = { System.currentTimeMillis() }
25+
) : super(capacity) {
26+
this.currentTimeProvider = currentTimeProvider
27+
}
28+
29+
private val lastDumpTimestamp: AtomicLong = AtomicLong(0)
2030

2131
fun dumpQueue(): Map<String, Int>? {
2232
val currentTime = currentTimeProvider.invoke()

dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/CoreFeatureTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.datadog.android.core.internal.privacy.NoOpConsentProvider
2929
import com.datadog.android.core.internal.privacy.TrackingConsentProvider
3030
import com.datadog.android.core.internal.system.BroadcastReceiverSystemInfoProvider
3131
import com.datadog.android.core.internal.system.NoOpSystemInfoProvider
32+
import com.datadog.android.core.internal.thread.BackPressuredBlockingQueue
3233
import com.datadog.android.core.internal.time.AppStartTimeProvider
3334
import com.datadog.android.core.internal.time.KronosTimeProvider
3435
import com.datadog.android.core.internal.time.NoOpTimeProvider
@@ -667,6 +668,24 @@ internal class CoreFeatureTest {
667668
assertThat(testedFeature.contextExecutorService).isNotNull()
668669
}
669670

671+
@Test
672+
fun `M initialize context executor with unbounded + observable queue W initialize()`() {
673+
// When
674+
testedFeature.initialize(
675+
appContext.mockInstance,
676+
fakeSdkInstanceId,
677+
fakeConfig,
678+
fakeConsent
679+
)
680+
681+
// Then
682+
assertThat(testedFeature.contextExecutorService).isNotNull()
683+
with(testedFeature.contextExecutorService.queue) {
684+
check(this is BackPressuredBlockingQueue)
685+
assertThat(capacity).isEqualTo(Int.MAX_VALUE)
686+
}
687+
}
688+
670689
@Test
671690
fun `M initialize only once W initialize() twice`(
672691
@Forgery otherConfig: Configuration

dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ class DropOldestBackPressuredBlockingQueueTest {
396396
// Then
397397
assertThat(testedQueue).hasSize(fakeBackPressureThreshold)
398398
assertThat(testedQueue).contains(fakeNewItem)
399-
verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger)
399+
verify(mockOnThresholdReached).invoke()
400+
verifyNoInteractions(mockOnItemsDropped)
400401
}
401402

402403
@Test
@@ -421,7 +422,8 @@ class DropOldestBackPressuredBlockingQueueTest {
421422
// Then
422423
assertThat(testedQueue).hasSize(fakeBackPressureThreshold)
423424
assertThat(testedQueue).contains(fakeNewItem)
424-
verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger)
425+
verify(mockOnThresholdReached).invoke()
426+
verifyNoInteractions(mockOnItemsDropped)
425427
}
426428

427429
// endregion

dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ class IgnoreNewestBackPressuredBlockingQueueTest {
396396
// Then
397397
assertThat(testedQueue).hasSize(fakeBackPressureThreshold)
398398
assertThat(testedQueue).contains(fakeNewItem)
399-
verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger)
399+
verify(mockOnThresholdReached).invoke()
400+
verifyNoInteractions(mockOnItemsDropped)
400401
}
401402

402403
@Test
@@ -421,7 +422,8 @@ class IgnoreNewestBackPressuredBlockingQueueTest {
421422
// Then
422423
assertThat(testedQueue).hasSize(fakeBackPressureThreshold)
423424
assertThat(testedQueue).contains(fakeNewItem)
424-
verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger)
425+
verify(mockOnThresholdReached).invoke()
426+
verifyNoInteractions(mockOnItemsDropped)
425427
}
426428

427429
// endregion

0 commit comments

Comments
 (0)