Skip to content

Commit 7bd7ae9

Browse files
committed
feat(core): synchronize output surface with input changed
1 parent 5a86b83 commit 7bd7ae9

File tree

3 files changed

+106
-53
lines changed

3 files changed

+106
-53
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import io.github.thibaultbee.streampack.core.elements.processing.video.utils.ext
2626
import io.github.thibaultbee.streampack.core.elements.processing.video.utils.extensions.toRectF
2727
import io.github.thibaultbee.streampack.core.elements.utils.RotationValue
2828
import io.github.thibaultbee.streampack.core.elements.utils.extensions.rotate
29+
import io.github.thibaultbee.streampack.core.logger.Logger
2930
import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor
3031

3132
fun SurfaceOutput(

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import io.github.thibaultbee.streampack.core.pipelines.outputs.IPipelineOutput
4848
import io.github.thibaultbee.streampack.core.pipelines.outputs.IVideoCallbackPipelineOutputInternal
4949
import io.github.thibaultbee.streampack.core.pipelines.outputs.IVideoPipelineOutputInternal
5050
import io.github.thibaultbee.streampack.core.pipelines.outputs.IVideoSurfacePipelineOutputInternal
51+
import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor
5152
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput
5253
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableAudioVideoEncodingPipelineOutput
5354
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput
@@ -118,7 +119,9 @@ open class StreamerPipeline(
118119
override val audioInput: IAudioInput? = _audioInput
119120

120121
private val _videoInput = if (withVideo) {
121-
VideoInput(context, surfaceProcessorFactory, dispatcherProvider)
122+
VideoInput(context, surfaceProcessorFactory, dispatcherProvider) {
123+
getOutputSurfaces()
124+
}
122125
} else {
123126
null
124127
}
@@ -159,20 +162,6 @@ open class StreamerPipeline(
159162
require(withAudio || withVideo) { "At least one of audio or video must be set" }
160163

161164
_videoInput?.let { input ->
162-
coroutineScope.launch {
163-
input.inputConfigChanged.collect {
164-
if (isReleaseRequested.get()) {
165-
Logger.w(TAG, "Pipeline is released, dropping video input config changed")
166-
return@collect
167-
}
168-
169-
try {
170-
resetSurfaceProcessorOutputSurface()
171-
} catch (t: Throwable) {
172-
Logger.e(TAG, "Error while resetting surface processor output surfaces: $t")
173-
}
174-
}
175-
}
176165
coroutineScope.launch {
177166
input.isStreamingFlow.collect { isStreaming ->
178167
if (isReleaseRequested.get()) {
@@ -289,33 +278,21 @@ open class StreamerPipeline(
289278
* Updates the transformation of the surface output.
290279
* To be called when the source info provider or [isMirroringRequired] is updated.
291280
*/
292-
private suspend fun resetSurfaceProcessorOutputSurface() {
293-
Logger.d(TAG, "Resetting surface processor output surface")
294-
safeOutputCall { outputs ->
281+
private suspend fun getOutputSurfaces(): List<Triple<SurfaceDescriptor, Boolean, () -> Boolean>> {
282+
return safeOutputCall { outputs ->
295283
outputs.keys.filterIsInstance<IVideoSurfacePipelineOutputInternal>()
296-
.filter { it.surfaceFlow.value != null }.forEach {
297-
resetSurfaceProcessorOutputSurface(it)
284+
.mapNotNull {
285+
it.surfaceFlow.value?.let { surfaceDescriptor ->
286+
Triple(
287+
surfaceDescriptor,
288+
isMirroringRequired(),
289+
it::isStreaming
290+
)
291+
}
298292
}
299293
}
300294
}
301295

302-
/**
303-
* Updates the transformation of the surface output.
304-
*/
305-
private suspend fun resetSurfaceProcessorOutputSurface(
306-
videoOutput: IVideoSurfacePipelineOutputInternal
307-
) {
308-
Logger.i(TAG, "Updating transformation")
309-
videoOutput.surfaceFlow.value?.let { surfaceDescriptor ->
310-
_videoInput?.removeOutputSurface(surfaceDescriptor.surface)
311-
_videoInput?.addOutputSurface(
312-
surfaceDescriptor,
313-
isMirroringRequired(),
314-
videoOutput::isStreaming
315-
)
316-
}
317-
}
318-
319296
/**
320297
* Whether the output surface needs to be mirrored.
321298
*/

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.github.thibaultbee.streampack.core.elements.processing.video.ISurfaceP
2323
import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ISurfaceOutput
2424
import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.SurfaceOutput
2525
import io.github.thibaultbee.streampack.core.elements.processing.video.source.DefaultSourceInfoProvider
26+
import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider
2627
import io.github.thibaultbee.streampack.core.elements.sources.video.IPreviewableSource
2728
import io.github.thibaultbee.streampack.core.elements.sources.video.ISurfaceSourceInternal
2829
import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource
@@ -37,12 +38,10 @@ import io.github.thibaultbee.streampack.core.pipelines.IVideoDispatcherProvider
3738
import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor
3839
import kotlinx.coroutines.CoroutineScope
3940
import kotlinx.coroutines.cancelChildren
40-
import kotlinx.coroutines.flow.MutableSharedFlow
4141
import kotlinx.coroutines.flow.MutableStateFlow
42-
import kotlinx.coroutines.flow.SharedFlow
4342
import kotlinx.coroutines.flow.StateFlow
44-
import kotlinx.coroutines.flow.asSharedFlow
4543
import kotlinx.coroutines.flow.asStateFlow
44+
import kotlinx.coroutines.flow.drop
4645
import kotlinx.coroutines.launch
4746
import kotlinx.coroutines.sync.Mutex
4847
import kotlinx.coroutines.sync.withLock
@@ -160,7 +159,8 @@ internal class VideoInput(
160159
private val context: Context,
161160
private val surfaceProcessorFactory: ISurfaceProcessorInternal.Factory,
162161
private val dispatcherProvider: IVideoDispatcherProvider,
163-
dynamicRangeProfileHint: DynamicRangeProfile = DynamicRangeProfile.sdr
162+
dynamicRangeProfileHint: DynamicRangeProfile = DynamicRangeProfile.sdr,
163+
private val onUpdateOutputSurface: suspend () -> List<Triple<SurfaceDescriptor, Boolean, () -> Boolean>>
164164
) : IVideoInput {
165165
private val coroutineScope = CoroutineScope(dispatcherProvider.default)
166166
private var isStreamingJob = ConflatedJob()
@@ -186,9 +186,6 @@ internal class VideoInput(
186186
private val source: IVideoSourceInternal?
187187
get() = sourceInternalFlow.value
188188

189-
private val _inputConfigChanged = MutableSharedFlow<Unit>()
190-
val inputConfigChanged: SharedFlow<Unit> = _inputConfigChanged.asSharedFlow()
191-
192189
// CONFIG
193190
private val _sourceConfigFlow = MutableStateFlow<VideoSourceConfig?>(null)
194191

@@ -258,8 +255,18 @@ internal class VideoInput(
258255
)
259256

260257
infoProviderJob += coroutineScope.launch {
261-
newVideoSource.infoProviderFlow.collect {
262-
_inputConfigChanged.emit(Unit)
258+
/**
259+
* First emission is the current value, we skip it.
260+
*/
261+
newVideoSource.infoProviderFlow.drop(1).collect { infoProvider ->
262+
try {
263+
updateOutputSurfaces(infoProvider)
264+
} catch (t: Throwable) {
265+
Logger.w(
266+
TAG,
267+
"setVideoSource: Can't update output surfaces after info provider changed: ${t.message}"
268+
)
269+
}
263270
}
264271
}
265272

@@ -275,6 +282,8 @@ internal class VideoInput(
275282
)
276283
}
277284

285+
updateOutputSurfacesUnsafe(newVideoSource.infoProviderFlow.value)
286+
278287
try {
279288
newVideoSource.startStream()
280289
} catch (t: Throwable) {
@@ -284,6 +293,8 @@ internal class VideoInput(
284293
)
285294
throw t
286295
}
296+
} else {
297+
updateOutputSurfacesUnsafe(newVideoSource.infoProviderFlow.value)
287298
}
288299

289300
isStreamingJob += coroutineScope.launch {
@@ -297,9 +308,16 @@ internal class VideoInput(
297308

298309
// Gets and resets output surface from previous video source.
299310
if (previousVideoSource is ISurfaceSourceInternal) {
300-
val surface = previousVideoSource.getOutput()
301-
previousVideoSource.resetOutput()
302-
surface?.let { processor.removeInputSurface(surface) }
311+
try {
312+
val surface = previousVideoSource.getOutput()
313+
previousVideoSource.resetOutput()
314+
surface?.let { processor.removeInputSurface(surface) }
315+
} catch (t: Throwable) {
316+
Logger.w(
317+
TAG,
318+
"setVideoSource: Can't reset previous video source output surface: ${t.message}"
319+
)
320+
}
303321
}
304322

305323
val isPreviewing =
@@ -373,7 +391,7 @@ internal class VideoInput(
373391
videoSourceInternal.resetOutput()
374392
}
375393
currentSurfaceProcessor.removeInputSurface(it)
376-
_inputConfigChanged.emit(Unit)
394+
updateOutputSurfacesUnsafe()
377395
addSourceSurface(
378396
videoConfig,
379397
currentSurfaceProcessor,
@@ -408,15 +426,21 @@ internal class VideoInput(
408426
videoSourceConfig.dynamicRangeProfile,
409427
dispatcherProvider
410428
)
429+
// Re-adds source surface
411430
addSourceSurface(videoSourceConfig, newSurfaceProcessor)
412431

413432
// Re-adds output surfaces
414-
_inputConfigChanged.emit(Unit)
433+
addOutputSurfacesUnsafe()
415434

416435
return newSurfaceProcessor
417436
}
418437

419-
override suspend fun takeSnapshot(@IntRange(from = 0, to = 359) rotationDegrees: Int): Bitmap {
438+
override suspend fun takeSnapshot(
439+
@IntRange(
440+
from = 0,
441+
to = 359
442+
) rotationDegrees: Int
443+
): Bitmap {
420444
if (isReleaseRequested.get()) {
421445
throw IllegalStateException("Input is released")
422446
}
@@ -438,16 +462,18 @@ internal class VideoInput(
438462
*
439463
* Use it for additional processing.
440464
*
465+
* @param infoProvider the source info provider
441466
* @param surfaceDescriptor the encoder surface
442467
* @param isMirroringRequired whether mirroring is required
443468
* @param isStreaming a lambda to check if the surface is streaming
444469
*/
445470
private fun buildSurfaceOutput(
471+
infoProvider: ISourceInfoProvider?,
446472
surfaceDescriptor: SurfaceDescriptor,
447473
isMirroringRequired: Boolean,
448474
isStreaming: () -> Boolean
449475
): ISurfaceOutput {
450-
val infoProvider = source?.infoProviderFlow?.value ?: DefaultSourceInfoProvider()
476+
val infoProvider = infoProvider ?: DefaultSourceInfoProvider()
451477
val sourceResolution = infoProvider.getSurfaceSize(
452478
sourceConfig!!.resolution // build surface output is only called after config is set
453479
)
@@ -472,6 +498,7 @@ internal class VideoInput(
472498
sourceMutex.withLock {
473499
addOutputSurfaceUnsafe(
474500
buildSurfaceOutput(
501+
source?.infoProviderFlow?.value,
475502
surfaceDescriptor,
476503
isMirroringRequired,
477504
isStreaming
@@ -488,6 +515,54 @@ internal class VideoInput(
488515
processor.addOutputSurface(output)
489516
}
490517

518+
519+
private suspend fun addOutputSurfaces(infoProvider: ISourceInfoProvider? = source?.infoProviderFlow?.value) {
520+
if (isReleaseRequested.get()) {
521+
throw IllegalStateException("Input is released")
522+
}
523+
524+
sourceMutex.withLock {
525+
addOutputSurfacesUnsafe(infoProvider)
526+
}
527+
}
528+
529+
private suspend fun addOutputSurfacesUnsafe(infoProvider: ISourceInfoProvider? = source?.infoProviderFlow?.value) {
530+
if (isReleaseRequested.get()) {
531+
throw IllegalStateException("Input is released")
532+
}
533+
534+
val surfaces = onUpdateOutputSurface()
535+
surfaces.forEach { (surfaceDescriptor, isMirroringRequired, isStreaming) ->
536+
addOutputSurfaceUnsafe(
537+
buildSurfaceOutput(
538+
infoProvider,
539+
surfaceDescriptor,
540+
isMirroringRequired,
541+
isStreaming
542+
)
543+
)
544+
}
545+
}
546+
547+
private suspend fun updateOutputSurfacesUnsafe(infoProvider: ISourceInfoProvider? = source?.infoProviderFlow?.value) {
548+
if (isReleaseRequested.get()) {
549+
throw IllegalStateException("Input is released")
550+
}
551+
552+
processor.removeAllOutputSurfaces()
553+
addOutputSurfacesUnsafe(infoProvider)
554+
}
555+
556+
private suspend fun updateOutputSurfaces(infoProvider: ISourceInfoProvider? = source?.infoProviderFlow?.value) {
557+
if (isReleaseRequested.get()) {
558+
throw IllegalStateException("Input is released")
559+
}
560+
561+
sourceMutex.withLock {
562+
updateOutputSurfacesUnsafe(infoProvider)
563+
}
564+
}
565+
491566
internal fun removeOutputSurface(output: Surface) {
492567
processor.removeOutputSurface(output)
493568
}

0 commit comments

Comments
 (0)