Skip to content

Commit e834998

Browse files
committed
feat(*): improve the handling of coroutine dispatcher by the encoding output
1 parent 5324ada commit e834998

File tree

26 files changed

+474
-209
lines changed

26 files changed

+474
-209
lines changed

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import android.util.Log
2020
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
2121
import io.github.thibaultbee.streampack.core.elements.data.Frame
2222
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
23+
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutputDispatcherProvider
2324
import kotlinx.coroutines.flow.MutableStateFlow
2425
import kotlinx.coroutines.flow.asStateFlow
2526
import kotlinx.coroutines.runBlocking
@@ -102,7 +103,10 @@ class DummyEndpoint : IEndpointInternal {
102103
* The factory to create [DummyEndpoint].
103104
*/
104105
class DummyEndpointFactory : IEndpointInternal.Factory {
105-
override fun create(context: Context): IEndpointInternal {
106+
override fun create(
107+
context: Context,
108+
dispatcherProvider: EncodingPipelineOutputDispatcherProvider
109+
): IEndpointInternal {
106110
return DummyEndpoint()
107111
}
108112
}
@@ -111,7 +115,10 @@ class DummyEndpointFactory : IEndpointInternal.Factory {
111115
* The factory to create [DummyEndpoint].
112116
*/
113117
class DummyEndpointDummyFactory(val dummyEndpoint: DummyEndpoint) : IEndpointInternal.Factory {
114-
override fun create(context: Context): IEndpointInternal {
118+
override fun create(
119+
context: Context,
120+
dispatcherProvider: EncodingPipelineOutputDispatcherProvider
121+
): IEndpointInternal {
115122
return dummyEndpoint
116123
}
117124
}

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/EndpointStateTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import androidx.test.platform.app.InstrumentationRegistry
44
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint
55
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.flv.FlvMuxer
66
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
7+
import kotlinx.coroutines.Dispatchers
78
import kotlinx.coroutines.test.runTest
89
import org.junit.Test
910
import org.junit.runner.RunWith
@@ -33,8 +34,8 @@ class EndpointStateTest(private val endpoint: IEndpointInternal) {
3334
val context = InstrumentationRegistry.getInstrumentation().context
3435

3536
return arrayListOf(
36-
DynamicEndpoint(context),
37-
MediaMuxerEndpoint(context),
37+
DynamicEndpoint(context, Dispatchers.Default, Dispatchers.IO),
38+
MediaMuxerEndpoint(context, Dispatchers.IO),
3839
CompositeEndpoint(FlvMuxer(isForFile = false), FileSink())
3940
)
4041
}

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpointTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import android.net.Uri
55
import androidx.test.platform.app.InstrumentationRegistry
66
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor
77
import junit.framework.TestCase.fail
8+
import kotlinx.coroutines.Dispatchers
89
import kotlinx.coroutines.test.runTest
910
import org.junit.Test
1011

1112
class MediaMuxerEndpointTest {
1213
private val context: Context = InstrumentationRegistry.getInstrumentation().context
13-
private val mediaMuxerEndpoint = MediaMuxerEndpoint(context)
14+
private val mediaMuxerEndpoint = MediaMuxerEndpoint(context, Dispatchers.IO)
1415

1516
@Test
1617
fun releaseMustNotThrow() {

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/SinkStateTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.github.thibaultbee.streampack.core.elements.endpoints.composites.sink
33
import androidx.test.platform.app.InstrumentationRegistry
44
import io.github.thibaultbee.streampack.ext.rtmp.elements.endpoints.composites.sinks.RtmpSink
55
import io.github.thibaultbee.streampack.ext.srt.elements.endpoints.composites.sinks.SrtSink
6+
import kotlinx.coroutines.Dispatchers
67
import kotlinx.coroutines.test.runTest
78
import org.junit.Test
89
import org.junit.runner.RunWith
@@ -31,8 +32,8 @@ class SinkStateTest(private val endpoint: ISinkInternal) {
3132
ContentSink(context),
3233
ChunkedFileOutputStreamSink(1000),
3334
FakeSink(),
34-
SrtSink(),
35-
RtmpSink()
35+
SrtSink(Dispatchers.IO),
36+
RtmpSink(Dispatchers.IO)
3637
)
3738
}
3839
}

core/src/androidTest/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutputTest.kt

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig
2626
import io.github.thibaultbee.streampack.core.elements.endpoints.DummyEndpoint
2727
import io.github.thibaultbee.streampack.core.elements.endpoints.DummyEndpointDummyFactory
2828
import io.github.thibaultbee.streampack.core.elements.endpoints.DummyEndpointFactory
29+
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal
2930
import io.github.thibaultbee.streampack.core.elements.sources.audio.AudioSourceConfig
3031
import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig
32+
import io.github.thibaultbee.streampack.core.elements.utils.extensions.displayRotation
3133
import io.github.thibaultbee.streampack.core.interfaces.startStream
3234
import io.github.thibaultbee.streampack.core.pipelines.outputs.IConfigurableAudioPipelineOutputInternal
3335
import io.github.thibaultbee.streampack.core.pipelines.outputs.IConfigurableVideoPipelineOutputInternal
@@ -69,10 +71,24 @@ class EncodingPipelineOutputTest {
6971
descriptor.uri.toFile().delete()
7072
}
7173

74+
private fun createOutput(
75+
endpointFactory: IEndpointInternal.Factory,
76+
withAudio: Boolean = true,
77+
withVideo: Boolean = true,
78+
): EncodingPipelineOutput {
79+
return EncodingPipelineOutput(
80+
context = context,
81+
withAudio = withAudio,
82+
withVideo = withVideo,
83+
endpointFactory = endpointFactory,
84+
defaultRotation = context.displayRotation,
85+
dispatcherProvider = EncodingOutputDispatcherProvider()
86+
)
87+
}
88+
7289
@Test
7390
fun testOpenClose() = runTest {
74-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
75-
91+
output = createOutput(endpointFactory = DummyEndpointFactory())
7692
output.open(descriptor)
7793
assertTrue(output.isOpenFlow.value)
7894
output.close()
@@ -81,7 +97,7 @@ class EncodingPipelineOutputTest {
8197

8298
@Test
8399
fun testSetAudioCodecConfig() = runTest {
84-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
100+
output = createOutput(endpointFactory = DummyEndpointFactory())
85101
assertNull(output.audioSourceConfigFlow.value)
86102

87103
suspendCoroutine { continuation ->
@@ -101,7 +117,7 @@ class EncodingPipelineOutputTest {
101117

102118
@Test
103119
fun testSetAudioCodecConfigAndReject() = runTest {
104-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
120+
output = createOutput(endpointFactory = DummyEndpointFactory())
105121
output.audioConfigEventListener =
106122
object : IConfigurableAudioPipelineOutputInternal.Listener {
107123
override suspend fun onSetAudioSourceConfig(newAudioSourceConfig: AudioSourceConfig) {
@@ -118,7 +134,7 @@ class EncodingPipelineOutputTest {
118134

119135
@Test
120136
fun testSetVideoCodecConfig() = runTest {
121-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
137+
output = createOutput(endpointFactory = DummyEndpointFactory())
122138
assertNull(output.videoCodecConfigFlow.value)
123139

124140
suspendCoroutine { continuation ->
@@ -138,7 +154,7 @@ class EncodingPipelineOutputTest {
138154

139155
@Test
140156
fun testSetVideoCodecConfigAndReject() = runTest {
141-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
157+
output = createOutput(endpointFactory = DummyEndpointFactory())
142158
output.videoConfigEventListener =
143159
object : IConfigurableVideoPipelineOutputInternal.Listener {
144160
override suspend fun onSetVideoSourceConfig(newVideoSourceConfig: VideoSourceConfig) {
@@ -155,8 +171,7 @@ class EncodingPipelineOutputTest {
155171

156172
@Test
157173
fun testStartStreamWithoutConfig() = runTest {
158-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
159-
174+
output = createOutput(endpointFactory = DummyEndpointFactory())
160175
try {
161176
output.startStream(descriptor)
162177
fail("Should throw an exception")
@@ -167,35 +182,23 @@ class EncodingPipelineOutputTest {
167182

168183
@Test
169184
fun testStartStreamWithAudioConfig() = runTest {
170-
output = EncodingPipelineOutput(
171-
context,
172-
withVideo = false,
173-
endpointFactory = DummyEndpointFactory()
174-
)
185+
output = createOutput(endpointFactory = DummyEndpointFactory(), withVideo = false)
175186

176187
output.setAudioCodecConfig(AudioCodecConfig())
177188
output.startStream(descriptor)
178189
}
179190

180191
@Test
181192
fun testStartStreamWithVideoConfig() = runTest {
182-
output = EncodingPipelineOutput(
183-
context,
184-
withAudio = false,
185-
endpointFactory = DummyEndpointFactory()
186-
)
193+
output = createOutput(endpointFactory = DummyEndpointFactory(), withAudio = false)
187194

188195
output.setVideoCodecConfig(VideoCodecConfig())
189196
output.startStream(descriptor)
190197
}
191198

192199
@Test
193200
fun testStartStream() = runTest {
194-
output = EncodingPipelineOutput(
195-
context,
196-
withAudio = false,
197-
endpointFactory = DummyEndpointFactory()
198-
)
201+
output = createOutput(endpointFactory = DummyEndpointFactory(), withAudio = false)
199202

200203
output.setVideoCodecConfig(VideoCodecConfig()) // at least one config is needed
201204

@@ -223,7 +226,7 @@ class EncodingPipelineOutputTest {
223226

224227
@Test
225228
fun testStartStreamAndReject() = runTest {
226-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
229+
output = createOutput(endpointFactory = DummyEndpointFactory())
227230

228231
output.setVideoCodecConfig(VideoCodecConfig()) // at least one config is needed
229232

@@ -244,11 +247,7 @@ class EncodingPipelineOutputTest {
244247

245248
@Test
246249
fun testStartStreamVideoCodecSurface() = runTest {
247-
output = EncodingPipelineOutput(
248-
context,
249-
withAudio = false,
250-
endpointFactory = DummyEndpointFactory()
251-
)
250+
output = createOutput(endpointFactory = DummyEndpointFactory(), withAudio = false)
252251
assertNull(output.surfaceFlow.value)
253252

254253
output.setVideoCodecConfig(VideoCodecConfig())
@@ -269,27 +268,27 @@ class EncodingPipelineOutputTest {
269268

270269
@Test
271270
fun testClose() = runTest {
272-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
271+
output = createOutput(endpointFactory = DummyEndpointFactory())
273272
output.close()
274273
}
275274

276275
@Test
277276
fun testStopStreamOnly() = runTest {
278-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
277+
output = createOutput(endpointFactory = DummyEndpointFactory())
279278
output.stopStream()
280279
}
281280

282281
@Test
283282
fun testReleaseOnly() = runTest {
284-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
283+
output = createOutput(endpointFactory = DummyEndpointFactory())
285284
output.release()
286285
}
287286

288287
// Tests on inputs
289288

290289
@Test
291290
fun testSetVideoCodecSurface() = runTest {
292-
output = EncodingPipelineOutput(context, endpointFactory = DummyEndpointFactory())
291+
output = createOutput(endpointFactory = DummyEndpointFactory())
293292
assertNull(output.surfaceFlow.value)
294293

295294
output.setVideoCodecConfig(VideoCodecConfig())
@@ -310,10 +309,9 @@ class EncodingPipelineOutputTest {
310309
@Test
311310
fun testQueueAudioFrame() = runTest {
312311
val dummyEndpoint = DummyEndpoint()
313-
output = EncodingPipelineOutput(
314-
context,
312+
output = createOutput(
313+
endpointFactory = DummyEndpointDummyFactory(dummyEndpoint),
315314
withVideo = false,
316-
endpointFactory = DummyEndpointDummyFactory(dummyEndpoint)
317315
)
318316

319317
try {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/encoders/mediacodec/MediaCodecEncoder.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import io.github.thibaultbee.streampack.core.logger.Logger
3535
import kotlinx.coroutines.CoroutineDispatcher
3636
import kotlinx.coroutines.CoroutineScope
3737
import kotlinx.coroutines.SupervisorJob
38-
import kotlinx.coroutines.cancelChildren
38+
import kotlinx.coroutines.cancel
3939
import kotlinx.coroutines.launch
4040
import kotlinx.coroutines.runBlocking
4141
import kotlinx.coroutines.sync.Mutex
@@ -52,9 +52,10 @@ internal class MediaCodecEncoder
5252
internal constructor(
5353
private val encoderConfig: EncoderConfig<*>,
5454
private val listener: IEncoderInternal.IListener,
55-
private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
55+
private val defaultDispatcher: CoroutineDispatcher,
56+
private val processDispatcher: CoroutineDispatcher
5657
) : IEncoderInternal {
57-
private val coroutineScope = CoroutineScope(SupervisorJob() + coroutineDispatcher)
58+
private val coroutineScope = CoroutineScope(SupervisorJob() + defaultDispatcher)
5859
private val mutex = Mutex()
5960

6061
private val mediaCodec: MediaCodec
@@ -257,7 +258,7 @@ internal constructor(
257258
withMutexContext {
258259
releaseUnsafe()
259260
}
260-
coroutineScope.coroutineContext.cancelChildren()
261+
coroutineScope.cancel()
261262
}
262263
}
263264

@@ -321,7 +322,7 @@ internal constructor(
321322
}
322323

323324
private suspend fun withMutexContext(block: suspend () -> Unit) {
324-
withContext(coroutineDispatcher) {
325+
withContext(defaultDispatcher) {
325326
mutex.withLock {
326327
block()
327328
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2222
import io.github.thibaultbee.streampack.core.elements.utils.combineStates
2323
import io.github.thibaultbee.streampack.core.elements.utils.extensions.intersect
2424
import io.github.thibaultbee.streampack.core.logger.Logger
25+
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutputDispatcherProvider
2526
import kotlinx.coroutines.flow.StateFlow
2627

2728

@@ -231,31 +232,20 @@ open class CombineEndpoint(protected val endpointInternals: List<IEndpointIntern
231232
/**
232233
* A factory to build a [CombineEndpoint] from a varargs of [IEndpointInternal.Factory].
233234
*/
234-
fun CombineEndpointFactory(context: Context, vararg endpointFactory: IEndpointInternal.Factory) =
235-
CombineEndpointFactory(context, endpointFactory.toList())
235+
fun CombineEndpointFactory(vararg endpointFactory: IEndpointInternal.Factory) =
236+
CombineEndpointFactory(endpointFactory.toList())
236237

237238
/**
238239
* A factory to build a [CombineEndpoint] from a list of [IEndpointInternal.Factory].
239240
*/
240-
fun CombineEndpointFactory(context: Context, endpointFactory: List<IEndpointInternal.Factory>) =
241-
CombineEndpointFactory(
242-
endpointFactory.map { it.create(context) }
243-
)
244-
245-
/**
246-
* A factory to build a [CombineEndpoint] from a vararg of [IEndpointInternal].
247-
*/
248-
fun CombineEndpointFactory(vararg endpoints: IEndpointInternal) =
249-
CombineEndpointFactory(
250-
endpoints.toList()
251-
)
252-
253-
/**
254-
* A factory to build a [CombineEndpoint].
255-
*/
256-
class CombineEndpointFactory(val endpointInternals: List<IEndpointInternal>) :
241+
class CombineEndpointFactory(private val endpointFactory: List<IEndpointInternal.Factory>) :
257242
IEndpointInternal.Factory {
258-
override fun create(context: Context): IEndpointInternal {
259-
return CombineEndpoint()
243+
override fun create(
244+
context: Context,
245+
dispatcherProvider: EncodingPipelineOutputDispatcherProvider
246+
): IEndpointInternal {
247+
return CombineEndpoint(
248+
endpointFactory.map { it.create(context, dispatcherProvider) }
249+
)
260250
}
261251
}

0 commit comments

Comments
 (0)