Skip to content

Commit c816378

Browse files
committed
feat(core): introduces encoding output audio/video configuration invalidation in case one want to set a non-compatible configuration
1 parent 8bea211 commit c816378

File tree

10 files changed

+216
-63
lines changed

10 files changed

+216
-63
lines changed

core/src/androidTest/java/io/github/thibaultbee/streampack/core/pipelines/outputs/StubPipelineOutput.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ internal class StubAudioSyncConfigurableEncodingPipelineOutputInternal :
216216
_audioCodecConfigFlow.emit(audioCodecConfig)
217217
}
218218

219+
override suspend fun invalidateAudioCodecConfig() {
220+
_audioCodecConfigFlow.emit(null)
221+
}
222+
219223
override val endpoint: IEndpoint
220224
get() = TODO("Not yet implemented")
221225

@@ -261,6 +265,10 @@ internal class StubVideoSurfaceConfigurableEncodingPipelineOutputInternal :
261265
_videoCodecConfigFlow.emit(videoCodecConfig)
262266
}
263267

268+
override suspend fun invalidateVideoCodecConfig() {
269+
_videoCodecConfigFlow.emit(null)
270+
}
271+
264272
override val endpoint: IEndpoint
265273
get() = TODO("Not yet implemented")
266274

core/src/androidTest/java/io/github/thibaultbee/streampack/core/pipelines/utils/SourceConfigUtilsTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class SourceConfigUtilsTest {
8181
SourceConfigUtils.buildVideoSourceConfig(videoSourceConfigs)
8282
fail("All video source configs must have the same fps")
8383
} catch (e: IllegalArgumentException) {
84-
assertEquals("All video source configs must have the same fps", e.message)
84+
assertEquals("All video source configs must have the same fps but [30, 25]", e.message)
8585
}
8686
}
8787
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ConfigExtensions.kt

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ fun AudioCodecConfig.isCompatibleWith(sourceConfig: AudioSourceConfig): Boolean
4444
&& (byteFormat == sourceConfig.byteFormat)
4545
}
4646

47+
/**
48+
* Whether [AudioCodecConfig] is compatible with [AudioCodecConfig].
49+
*
50+
* @param codecConfig [AudioCodecConfig] to compare with
51+
* @return `true` if [AudioCodecConfig] is compatible with [AudioCodecConfig], `false` otherwise
52+
*/
53+
fun AudioCodecConfig.isCompatibleWith(codecConfig: AudioCodecConfig): Boolean {
54+
return (channelConfig == codecConfig.channelConfig)
55+
&& (sampleRate == codecConfig.sampleRate)
56+
&& (byteFormat == codecConfig.byteFormat)
57+
}
58+
4759
/**
4860
* Converts [AudioCodecConfig] to [AudioSourceConfig].
4961
*
@@ -67,15 +79,25 @@ fun VideoSourceConfig.isCompatibleWith(sourceConfig: VideoSourceConfig): Boolean
6779
}
6880

6981
/**
70-
* Whether [VideoCodecConfig] is compatible with [VideoCodecConfig].
82+
* Whether [VideoCodecConfig] is compatible with [VideoSourceConfig].
7183
*
72-
* @param sourceConfig [VideoCodecConfig] to compare with
73-
* @return `true` if [VideoCodecConfig] is compatible with [VideoCodecConfig], `false` otherwise
84+
* @param sourceConfig [VideoSourceConfig] to compare with
85+
* @return `true` if [VideoSourceConfig] is compatible with [VideoSourceConfig], `false` otherwise
7486
*/
7587
fun VideoCodecConfig.isCompatibleWith(sourceConfig: VideoSourceConfig): Boolean {
7688
return (fps == sourceConfig.fps) && (dynamicRangeProfile == sourceConfig.dynamicRangeProfile)
7789
}
7890

91+
/**
92+
* Whether [VideoCodecConfig] is compatible with [VideoCodecConfig].
93+
*
94+
* @param codecConfig [VideoCodecConfig] to compare with
95+
* @return `true` if [VideoCodecConfig] is compatible with [VideoCodecConfig], `false` otherwise
96+
*/
97+
fun VideoCodecConfig.isCompatibleWith(codecConfig: VideoCodecConfig): Boolean {
98+
return (fps == codecConfig.fps) && (dynamicRangeProfile == codecConfig.dynamicRangeProfile)
99+
}
100+
79101
/**
80102
* Converts [VideoCodecConfig] to [VideoSourceConfig].
81103
*

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import kotlinx.coroutines.cancel
6262
import kotlinx.coroutines.flow.MutableStateFlow
6363
import kotlinx.coroutines.flow.StateFlow
6464
import kotlinx.coroutines.flow.asStateFlow
65+
import kotlinx.coroutines.flow.drop
6566
import kotlinx.coroutines.joinAll
6667
import kotlinx.coroutines.launch
6768
import kotlinx.coroutines.runBlocking
@@ -517,7 +518,7 @@ open class StreamerPipeline(
517518
)
518519
}
519520
if (output is IConfigurableAudioPipelineOutputInternal) {
520-
addEncodingAudioOutput(output)
521+
jobs += addConfigurableAudioOutput(output)
521522
}
522523
} else {
523524
Logger.w(TAG, "Pipeline has no audio")
@@ -530,7 +531,7 @@ open class StreamerPipeline(
530531
jobs += it
531532
}
532533
if (output is IConfigurableVideoPipelineOutputInternal) {
533-
addEncodingVideoOutput(output)
534+
addConfigurableVideoOutput(output)
534535
}
535536
} else {
536537
Logger.w(TAG, "Pipeline has no video")
@@ -569,11 +570,29 @@ open class StreamerPipeline(
569570
output.audioFrameRequestedListener = audioInput.frameRequestedListener
570571
}
571572

572-
private fun addEncodingAudioOutput(
573+
private fun addConfigurableAudioOutput(
573574
output: IConfigurableAudioPipelineOutputInternal
574-
) {
575+
): Job {
575576
require(output.audioSourceConfigFlow.value == null) { "Output $output already have an audio source config" }
576577

578+
// Catch the config invalidation
579+
val job = coroutineScope.launch {
580+
output.audioSourceConfigFlow.drop(1).collect { sourceConfig ->
581+
if (sourceConfig == null) {
582+
withContextInputMutex {
583+
try {
584+
setAudioSourceConfig(buildAudioSourceConfig(output))
585+
} catch (t: Throwable) {
586+
Logger.e(
587+
TAG,
588+
"Error while setting audio source config after invalidation for output $output: $t"
589+
)
590+
}
591+
}
592+
}
593+
}
594+
}
595+
577596
// Apply future audio source config
578597
require(output.audioConfigEventListener == null) { "Output $output already have an audio listener" }
579598
output.audioConfigEventListener =
@@ -583,6 +602,8 @@ open class StreamerPipeline(
583602
setAudioSourceConfig(buildAudioSourceConfig(output, newAudioSourceConfig))
584603
}
585604
}
605+
606+
return job
586607
}
587608

588609
private fun addVideoSurfaceOutputIfNeeded(
@@ -655,10 +676,27 @@ open class StreamerPipeline(
655676
return SourceConfigUtils.buildVideoSourceConfig(videoSourceConfigs)
656677
}
657678

658-
private fun addEncodingVideoOutput(
679+
private fun addConfigurableVideoOutput(
659680
output: IConfigurableVideoPipelineOutputInternal
660681
) {
661682
require(output.videoSourceConfigFlow.value == null) { "Output $output already have a video source config" }
683+
// Catch the config invalidation
684+
coroutineScope.launch {
685+
output.videoSourceConfigFlow.drop(1).collect { sourceConfig ->
686+
if (sourceConfig == null) {
687+
withContextInputMutex {
688+
try {
689+
setVideoSourceConfig(buildVideoSourceConfig(output))
690+
} catch (t: Throwable) {
691+
Logger.e(
692+
TAG,
693+
"Error while setting video source config after invalidation for output $output: $t"
694+
)
695+
}
696+
}
697+
}
698+
}
699+
}
662700

663701
// Apply future video source config
664702
require(output.videoConfigEventListener == null) { "Output $output already have a video listener" }
@@ -676,7 +714,7 @@ open class StreamerPipeline(
676714
*
677715
* It will stop the stream.
678716
*/
679-
private suspend fun detachOutput(output: IPipelineOutput) {
717+
private fun detachOutput(output: IPipelineOutput) {
680718
Logger.i(TAG, "Detaching output $output")
681719

682720
// Clean streamer output

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ import io.github.thibaultbee.streampack.core.pipelines.IVideoDispatcherProvider
3737
import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor
3838
import kotlinx.coroutines.CoroutineScope
3939
import kotlinx.coroutines.cancelChildren
40+
import kotlinx.coroutines.flow.MutableSharedFlow
4041
import kotlinx.coroutines.flow.MutableStateFlow
42+
import kotlinx.coroutines.flow.SharedFlow
4143
import kotlinx.coroutines.flow.StateFlow
44+
import kotlinx.coroutines.flow.asSharedFlow
4245
import kotlinx.coroutines.flow.asStateFlow
4346
import kotlinx.coroutines.launch
4447
import kotlinx.coroutines.sync.Mutex
@@ -183,8 +186,8 @@ internal class VideoInput(
183186
private val source: IVideoSourceInternal?
184187
get() = sourceInternalFlow.value
185188

186-
private val _inputConfigChanged = MutableStateFlow(Unit)
187-
val inputConfigChanged: StateFlow<Unit> = _inputConfigChanged.asStateFlow()
189+
private val _inputConfigChanged = MutableSharedFlow<Unit>()
190+
val inputConfigChanged: SharedFlow<Unit> = _inputConfigChanged.asSharedFlow()
188191

189192
// CONFIG
190193
private val _sourceConfigFlow = MutableStateFlow<VideoSourceConfig?>(null)
@@ -466,16 +469,18 @@ internal class VideoInput(
466469
throw IllegalStateException("Input is released")
467470
}
468471

469-
addOutputSurface(
470-
buildSurfaceOutput(
471-
surfaceDescriptor,
472-
isMirroringRequired,
473-
isStreaming
472+
sourceMutex.withLock {
473+
addOutputSurfaceUnsafe(
474+
buildSurfaceOutput(
475+
surfaceDescriptor,
476+
isMirroringRequired,
477+
isStreaming
478+
)
474479
)
475-
)
480+
}
476481
}
477482

478-
internal fun addOutputSurface(output: ISurfaceOutput) {
483+
internal fun addOutputSurfaceUnsafe(output: ISurfaceOutput) {
479484
if (isReleaseRequested.get()) {
480485
throw IllegalStateException("Input is released")
481486
}

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutput.kt

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -307,18 +307,6 @@ internal class EncodingPipelineOutput(
307307
}
308308
}
309309

310-
override suspend fun queueAudioFrame(frame: RawFrame) = mutex.withLock {
311-
val input = try {
312-
requireNotNull(audioInput) { "Audio input is null" }
313-
} catch (t: Throwable) {
314-
frame.close()
315-
throw t
316-
}
317-
input.queueInputFrame(
318-
frame
319-
)
320-
}
321-
322310
private suspend fun setAudioCodecConfigUnsafe(audioCodecConfig: AudioCodecConfig) {
323311
require(!isStreaming) { "Can't change audio configuration while streaming" }
324312

@@ -383,6 +371,36 @@ internal class EncodingPipelineOutput(
383371
return audioEncoder
384372
}
385373

374+
override suspend fun invalidateAudioCodecConfig() {
375+
if (!withAudio) {
376+
return
377+
}
378+
withContextMutex {
379+
require(!isStreaming) { "Can't invalidate audio configuration while streaming" }
380+
381+
_audioCodecConfigFlow.emit(null)
382+
try {
383+
audioEncoderInternal?.release()
384+
} catch (t: Throwable) {
385+
Logger.w(TAG, "Can't release audio encoder: ${t.message}")
386+
}
387+
audioEncoderInternal = null
388+
}
389+
}
390+
391+
override suspend fun queueAudioFrame(frame: RawFrame) = mutex.withLock {
392+
val input = try {
393+
requireNotNull(audioInput) { "Audio input is null" }
394+
} catch (t: Throwable) {
395+
frame.close()
396+
throw t
397+
}
398+
input.queueInputFrame(
399+
frame
400+
)
401+
}
402+
403+
386404
private val _videoCodecConfigFlow = MutableStateFlow<VideoCodecConfig?>(null)
387405
override val videoCodecConfigFlow = _videoCodecConfigFlow.asStateFlow()
388406

@@ -490,6 +508,24 @@ internal class EncodingPipelineOutput(
490508
return videoEncoder
491509
}
492510

511+
override suspend fun invalidateVideoCodecConfig() {
512+
if (!withVideo) {
513+
return
514+
}
515+
withContextMutex {
516+
require(!isStreaming) { "Can't invalidate video configuration while streaming" }
517+
518+
_videoCodecConfigFlow.emit(null)
519+
try {
520+
videoEncoderInternal?.release()
521+
} catch (t: Throwable) {
522+
Logger.w(TAG, "Can't release video encoder: ${t.message}")
523+
}
524+
videoEncoderInternal = null
525+
_surfaceFlow.emit(null)
526+
}
527+
}
528+
493529
/**
494530
* Opens the output endpoint.
495531
*

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/IEncodingPipelineOutput.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ interface IConfigurableAudioEncodingPipelineOutput : IEncodingPipelineOutput,
7373
* @throws [Throwable] if configuration can not be applied.
7474
*/
7575
suspend fun setAudioCodecConfig(audioCodecConfig: AudioCodecConfig)
76+
77+
/**
78+
* Invalidates current audio codec configuration.
79+
*
80+
* You should call [setAudioCodecConfig] after this to set a new configuration.
81+
*/
82+
suspend fun invalidateAudioCodecConfig()
7683
}
7784

7885
/**
@@ -98,6 +105,13 @@ interface IConfigurableVideoEncodingPipelineOutput : IEncodingPipelineOutput,
98105
* @throws [Throwable] if configuration can not be applied.
99106
*/
100107
suspend fun setVideoCodecConfig(videoCodecConfig: VideoCodecConfig)
108+
109+
/**
110+
* Invalidates current video codec configuration.
111+
*
112+
* You should call [setVideoCodecConfig] after this to set a new configuration.
113+
*/
114+
suspend fun invalidateVideoCodecConfig()
101115
}
102116

103117
interface IConfigurableAudioVideoEncodingPipelineOutput :

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/utils/SourceConfigUtils.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ object SourceConfigUtils {
3131
require(videoSourceConfigs.isNotEmpty()) { "No video source config provided" }
3232
val maxResolution =
3333
videoSourceConfigs.map { it.resolution }.maxWith(compareBy({ it.width }, { it.height }))
34-
val fps = videoSourceConfigs.first().fps
35-
require(videoSourceConfigs.all { it.fps == fps }) { "All video source configs must have the same fps" }
36-
val dynamicRangeProfile = videoSourceConfigs.first().dynamicRangeProfile
37-
require(videoSourceConfigs.all { it.dynamicRangeProfile == dynamicRangeProfile }) { "All video source configs must have the same dynamic range profile" }
38-
return VideoSourceConfig(maxResolution, fps, dynamicRangeProfile)
34+
val fps = videoSourceConfigs.map { it.fps }.distinct()
35+
require(fps.distinct().size == 1) { "All video source configs must have the same fps but $fps" }
36+
val dynamicRangeProfiles = videoSourceConfigs.map { it.dynamicRangeProfile }.distinct()
37+
require(dynamicRangeProfiles.size == 1) { "All video source configs must have the same dynamic range profile but $dynamicRangeProfiles" }
38+
return VideoSourceConfig(maxResolution, fps.first(), dynamicRangeProfiles.first())
3939
}
4040
}

0 commit comments

Comments
 (0)