Skip to content

Commit 0a4e5f1

Browse files
committed
refactor(*): introduce streamer pipeline to enable multiple output
1 parent e5e0c1d commit 0a4e5f1

File tree

63 files changed

+4537
-1140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+4537
-1140
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (C) 2025 Thibault B.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.thibaultbee.streampack.core.elements.endpoints
17+
18+
import android.util.Log
19+
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
20+
import io.github.thibaultbee.streampack.core.elements.data.Frame
21+
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
22+
import kotlinx.coroutines.flow.MutableStateFlow
23+
import kotlinx.coroutines.flow.StateFlow
24+
import kotlinx.coroutines.runBlocking
25+
26+
class DummyEndpoint : IEndpointInternal {
27+
private val _isOpenFlow = MutableStateFlow(false)
28+
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow
29+
30+
private val _frameFlow = MutableStateFlow<Frame?>(null)
31+
val frameFlow: StateFlow<Frame?> = _frameFlow
32+
33+
var numOfAudioFramesWritten = 0
34+
private set
35+
var numOfVideoFramesWritten = 0
36+
private set
37+
val numOfFramesWritten: Int
38+
get() = numOfAudioFramesWritten + numOfVideoFramesWritten
39+
40+
private val _isStreamingFlow = MutableStateFlow(false)
41+
val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow
42+
43+
private val _configFlow = MutableStateFlow<CodecConfig?>(null)
44+
val configFlow: StateFlow<CodecConfig?> = _configFlow
45+
46+
override val info: IEndpoint.IEndpointInfo
47+
get() = TODO("Not yet implemented")
48+
49+
override fun getInfo(type: MediaDescriptor.Type): IEndpoint.IEndpointInfo {
50+
TODO("Not yet implemented")
51+
}
52+
53+
override val metrics: Any
54+
get() = TODO("Not yet implemented")
55+
56+
override suspend fun open(descriptor: MediaDescriptor) {
57+
_isOpenFlow.emit(true)
58+
}
59+
60+
override suspend fun close() {
61+
_isOpenFlow.emit(false)
62+
}
63+
64+
override suspend fun write(frame: Frame, streamPid: Int) {
65+
Log.i(TAG, "write: $frame")
66+
_frameFlow.emit(frame)
67+
when {
68+
frame.isAudio -> numOfAudioFramesWritten++
69+
frame.isVideo -> numOfVideoFramesWritten++
70+
}
71+
}
72+
73+
override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
74+
runBlocking {
75+
streamConfigs.forEach { _configFlow.emit(it) }
76+
}
77+
return streamConfigs.associateWith { it.hashCode() }
78+
}
79+
80+
override fun addStream(streamConfig: CodecConfig): Int {
81+
runBlocking {
82+
_configFlow.emit(streamConfig)
83+
}
84+
return streamConfig.hashCode()
85+
}
86+
87+
override suspend fun startStream() {
88+
_isStreamingFlow.emit(true)
89+
}
90+
91+
override suspend fun stopStream() {
92+
_isStreamingFlow.emit(false)
93+
}
94+
95+
companion object {
96+
private const val TAG = "DummyEndpoint"
97+
}
98+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.github.thibaultbee.streampack.core.elements.sources
2+
3+
import android.media.MediaFormat
4+
import io.github.thibaultbee.streampack.core.elements.data.Frame
5+
import io.github.thibaultbee.streampack.core.elements.sources.audio.AudioSourceConfig
6+
import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal
7+
import kotlinx.coroutines.flow.MutableStateFlow
8+
import kotlinx.coroutines.flow.StateFlow
9+
import java.nio.ByteBuffer
10+
11+
class StubAudioSource : IAudioSourceInternal {
12+
override var isMuted: Boolean
13+
get() = TODO("Not yet implemented")
14+
set(value) {}
15+
16+
private val _isStreamingFlow = MutableStateFlow(false)
17+
val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow
18+
19+
private val _configurationFlow = MutableStateFlow<AudioSourceConfig?>(null)
20+
val configurationFlow: StateFlow<AudioSourceConfig?> = _configurationFlow
21+
22+
override fun getAudioFrame(inputBuffer: ByteBuffer?): Frame {
23+
return Frame(
24+
inputBuffer ?: ByteBuffer.allocate(8192),
25+
0,
26+
format = MediaFormat().apply {
27+
setString(
28+
MediaFormat.KEY_MIME,
29+
MediaFormat.MIMETYPE_AUDIO_RAW
30+
)
31+
})
32+
}
33+
34+
override fun startStream() {
35+
_isStreamingFlow.value = true
36+
}
37+
38+
override fun stopStream() {
39+
_isStreamingFlow.value = false
40+
}
41+
42+
override fun configure(config: AudioSourceConfig) {
43+
_configurationFlow.value = config
44+
}
45+
46+
47+
override fun release() {
48+
49+
}
50+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.github.thibaultbee.streampack.core.elements.sources
2+
3+
import io.github.thibaultbee.streampack.core.elements.processing.video.source.DefaultSourceInfoProvider
4+
import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider
5+
import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal
6+
import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig
7+
import kotlinx.coroutines.flow.MutableStateFlow
8+
import kotlinx.coroutines.flow.StateFlow
9+
10+
class StubVideoSource : IVideoSourceInternal {
11+
override val infoProviderFlow: StateFlow<ISourceInfoProvider> =
12+
MutableStateFlow(DefaultSourceInfoProvider())
13+
14+
private val _isStreamingFlow = MutableStateFlow(false)
15+
override val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow
16+
17+
private val _configurationFlow = MutableStateFlow<VideoSourceConfig?>(null)
18+
val configurationFlow: StateFlow<VideoSourceConfig?> = _configurationFlow
19+
20+
override suspend fun startStream() {
21+
_isStreamingFlow.emit(true)
22+
}
23+
24+
override suspend fun stopStream() {
25+
_isStreamingFlow.emit(false)
26+
}
27+
28+
override fun configure(config: VideoSourceConfig) {
29+
_configurationFlow.value = config
30+
}
31+
32+
override fun release() {
33+
34+
}
35+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (C) 2025 Thibault B.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.thibaultbee.streampack.core.pipelines
17+
18+
import org.junit.Assert.*
19+
20+
class StreamerPipelineTest {
21+
22+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (C) 2025 Thibault B.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.thibaultbee.streampack.core.pipelines.outputs
17+
18+
import android.util.Size
19+
import io.github.thibaultbee.streampack.core.elements.data.Frame
20+
import io.github.thibaultbee.streampack.core.logger.Logger
21+
import io.github.thibaultbee.streampack.core.utils.SurfaceUtils
22+
import kotlinx.coroutines.flow.MutableStateFlow
23+
import kotlinx.coroutines.flow.StateFlow
24+
25+
class StubAudioAsyncPipelineOutput :
26+
StubPipelineOutput(hasAudio = true, hasVideo = false),
27+
IAudioSyncPipelineOutputInternal {
28+
29+
private val _audioFrameFlow = MutableStateFlow<Frame?>(null)
30+
val audioFrameFlow: StateFlow<Frame?> = _audioFrameFlow
31+
32+
override fun queueAudioFrame(frame: Frame) {
33+
_audioFrameFlow.value = frame
34+
}
35+
}
36+
37+
class StubVideoSurfacePipelineOutput(resolution: Size) :
38+
StubPipelineOutput(hasAudio = false, hasVideo = true),
39+
IVideoSurfacePipelineOutputInternal {
40+
41+
override var targetRotation: Int = 0
42+
private val _surfaceFlow =
43+
MutableStateFlow<SurfaceWithSize?>(
44+
SurfaceWithSize(
45+
SurfaceUtils.createSurface(resolution),
46+
resolution
47+
)
48+
)
49+
override val surfaceFlow: StateFlow<SurfaceWithSize?> = _surfaceFlow
50+
override var videoSourceTimestampOffset: Long = 0L
51+
}
52+
53+
class StubAudioSyncVideoSurfacePipelineOutput(resolution: Size) :
54+
StubPipelineOutput(hasAudio = true, hasVideo = true),
55+
IAudioSyncPipelineOutputInternal, IVideoSurfacePipelineOutputInternal {
56+
57+
override var targetRotation: Int = 0
58+
private val _surfaceFlow =
59+
MutableStateFlow<SurfaceWithSize?>(
60+
SurfaceWithSize(
61+
SurfaceUtils.createSurface(resolution),
62+
resolution
63+
)
64+
)
65+
override val surfaceFlow: StateFlow<SurfaceWithSize?> = _surfaceFlow
66+
override var videoSourceTimestampOffset: Long = 0L
67+
68+
private val _audioFrameFlow = MutableStateFlow<Frame?>(null)
69+
val audioFrameFlow: StateFlow<Frame?> = _audioFrameFlow
70+
71+
override fun queueAudioFrame(frame: Frame) {
72+
_audioFrameFlow.value = frame
73+
}
74+
}
75+
76+
abstract class StubPipelineOutput(override val hasAudio: Boolean, override val hasVideo: Boolean) :
77+
IPipelineOutput {
78+
79+
private val _throwable = MutableStateFlow<Throwable?>(null)
80+
override val throwableFlow: StateFlow<Throwable?> = _throwable
81+
82+
private val _isStreaming = MutableStateFlow(false)
83+
override val isStreamingFlow: StateFlow<Boolean> = _isStreaming
84+
85+
override suspend fun startStream() {
86+
Logger.i(TAG, "Start stream")
87+
_isStreaming.emit(true)
88+
}
89+
90+
override suspend fun stopStream() {
91+
Logger.i(TAG, "Stop stream")
92+
_isStreaming.emit(false)
93+
}
94+
95+
override suspend fun release() {
96+
Logger.i(TAG, "Release")
97+
_isStreaming.emit(false)
98+
}
99+
100+
companion object {
101+
private const val TAG = "DummyPipelineOutput"
102+
}
103+
}
104+
105+
abstract class StubPipelineOutputInternal(hasAudio: Boolean, hasVideo: Boolean) :
106+
StubPipelineOutput(hasAudio, hasVideo),
107+
ISyncStartStreamPipelineOutputInternal {
108+
109+
override var streamerListener: ISyncStartStreamPipelineOutputInternal.Listener? = null
110+
111+
override suspend fun startStream() {
112+
super.startStream()
113+
streamerListener?.onStartStream()
114+
}
115+
}

0 commit comments

Comments
 (0)