Skip to content

Commit 61f6d1a

Browse files
committed
fix(core): fix dynamic endpoint isOpenFlow
1 parent 038f5da commit 61f6d1a

File tree

1 file changed

+9
-1
lines changed
  • core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints

1 file changed

+9
-1
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2727
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo
2828
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink
2929
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
30+
import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob
3031
import io.github.thibaultbee.streampack.core.logger.Logger
3132
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
3233
import kotlinx.coroutines.CoroutineDispatcher
@@ -68,6 +69,7 @@ open class DynamicEndpoint(
6869
private var srtEndpoint: IEndpointInternal? = null
6970
private var rtmpEndpoint: IEndpointInternal? = null
7071

72+
private val isOpenJob = ConflatedJob()
7173
private val isOpenFlows = endpointFlow.map { it?.isOpenFlow }
7274
private val _isOpenFlow = MutableStateFlow(false)
7375
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow.asStateFlow()
@@ -86,7 +88,13 @@ open class DynamicEndpoint(
8688
init {
8789
coroutineScope.launch {
8890
isOpenFlows.collect { isOpenFlow ->
89-
_isOpenFlow.emit(isOpenFlow?.value == true)
91+
if (isOpenFlow == null) {
92+
isOpenJob.cancel()
93+
} else {
94+
isOpenJob += isOpenFlow.collect { isOpen ->
95+
_isOpenFlow.emit(isOpen)
96+
}
97+
}
9098
}
9199
}
92100
}

0 commit comments

Comments
 (0)