Skip to content

Commit 8e2894a

Browse files
committed
fix(core): add mutex system for composite endpoint
1 parent 9358036 commit 8e2894a

File tree

1 file changed

+17
-7
lines changed
  • core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites

1 file changed

+17
-7
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks
2828
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration
2929
import kotlinx.coroutines.flow.StateFlow
3030
import kotlinx.coroutines.runBlocking
31+
import kotlinx.coroutines.sync.Mutex
32+
import kotlinx.coroutines.sync.withLock
3133

3234
/**
3335
* An [IEndpointInternal] implementation that combines a [IMuxerInternal] and a [ISinkInternal].
@@ -41,7 +43,7 @@ class CompositeEndpoint(
4143
* The video and audio configurations.
4244
* It is used to configure the sink.
4345
*/
44-
private val configurations = mutableListOf<CodecConfig>()
46+
private val mutex = Mutex()
4547

4648
override val info by lazy { EndpointInfo(muxer.info) }
4749
override fun getInfo(type: MediaDescriptor.Type) = info
@@ -77,16 +79,24 @@ class CompositeEndpoint(
7779
) = muxer.write(frame, streamPid)
7880

7981
override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
80-
val streamIds = muxer.addStreams(streamConfigs)
81-
return streamIds
82+
mutex.tryLock()
83+
return try {
84+
muxer.addStreams(streamConfigs)
85+
} finally {
86+
mutex.unlock()
87+
}
8288
}
8389

8490
override fun addStream(streamConfig: CodecConfig): Int {
85-
val streamId = muxer.addStream(streamConfig)
86-
return streamId
91+
mutex.tryLock()
92+
return try {
93+
muxer.addStream(streamConfig)
94+
} finally {
95+
mutex.unlock()
96+
}
8797
}
8898

89-
override suspend fun startStream() {
99+
override suspend fun startStream() = mutex.withLock {
90100
sink.configure(SinkConfiguration(muxer.streamConfigs))
91101
sink.startStream()
92102
muxer.startStream()
@@ -97,7 +107,7 @@ class CompositeEndpoint(
97107
*
98108
* It also clears registered streams and resets the bitrate.
99109
*/
100-
override suspend fun stopStream() {
110+
override suspend fun stopStream() = mutex.withLock {
101111
muxer.stopStream()
102112
sink.stopStream()
103113
}

0 commit comments

Comments
 (0)