Skip to content

Commit 96ebac5

Browse files
committed
feat(rtmp): write/read RTMP messages from channels
1 parent afe8743 commit 96ebac5

File tree

13 files changed

+457
-220
lines changed

13 files changed

+457
-220
lines changed

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/RtmpConnectionBuilder.kt

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ class RtmpConnectionBuilder(val selectorManager: SelectorManager) {
4646
* The [urlBuilder] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
4747
*
4848
* @param urlBuilder the URL to connect to
49-
* @param configure the settings for the RTMP client
49+
* @param settings the settings for the RTMP client
5050
* @param message the callback to handle RTMP client events
5151
*/
5252
suspend fun connect(
5353
urlBuilder: URLBuilder,
54-
configure: RtmpClientSettings.() -> Unit = {},
54+
settings: RtmpClientSettings = RtmpClientSettings(),
5555
message: RtmpClientCallbackBuilder.() -> Unit = {}
5656
): RtmpClient {
5757
urlBuilder.validateRtmp()
@@ -66,18 +66,17 @@ class RtmpConnectionBuilder(val selectorManager: SelectorManager) {
6666
TcpSocket(tcpSocket, urlBuilder)
6767
}
6868

69-
return connect(socket, configure, message)
69+
return connect(socket, settings, message)
7070
}
7171

7272
/**
7373
* Connects to the given [socket] and performs the RTMP handshake.
7474
*/
7575
private suspend fun connect(
7676
socket: ISocket,
77-
configure: RtmpClientSettings.() -> Unit,
77+
settings: RtmpClientSettings,
7878
message: RtmpClientCallbackBuilder.() -> Unit
7979
): RtmpClient {
80-
val settings = RtmpClientSettings().apply { configure() }
8180
try {
8281
socket.clientHandshake(settings.clock)
8382
} catch (t: Throwable) {
@@ -103,30 +102,30 @@ class RtmpConnectionBuilder(val selectorManager: SelectorManager) {
103102
* Binds a new [RtmpServer] to the given [localAddress].
104103
*
105104
* @param localAddress the local address to bind to. If null, binds to a random port on all interfaces.
106-
* @param configure the settings for the RTMP server
105+
* @param settings the settings for the RTMP server
107106
* @param message the callback to handle RTMP server events
108107
* @return a new [RtmpServer] instance
109108
*/
110109
suspend fun bind(
111110
localAddress: SocketAddress? = null,
112-
configure: RtmpServerSettings.() -> Unit = {},
111+
settings: RtmpServerSettings = RtmpServerSettings(),
113112
message: RtmpServerCallbackBuilder.() -> Unit = {}
114113
): RtmpServer {
115114
val serverSocket = tcpSocketBuilder.bind(localAddress)
116115

117-
return bind(serverSocket, configure, message)
116+
return bind(serverSocket, settings, message)
118117
}
119118

120119
/**
121120
* Binds a new [RtmpServer] to the given [serverSocket].
122121
*/
123122
private fun bind(
124123
serverSocket: ServerSocket,
125-
settings: RtmpServerSettings.() -> Unit,
124+
settings: RtmpServerSettings,
126125
messages: RtmpServerCallbackBuilder.() -> Unit
127126
) = RtmpServer(
128127
serverSocket,
129-
RtmpServerSettings().apply { settings() },
128+
settings,
130129
RtmpServerCallbackBuilder().apply { messages() }
131130
)
132131
}
@@ -137,28 +136,28 @@ class RtmpConnectionBuilder(val selectorManager: SelectorManager) {
137136
* The [urlString] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
138137
*
139138
* @param urlString the RTMP URL to connect to
140-
* @param configure the settings for the RTMP client
139+
* @param settings the settings for the RTMP client
141140
* @param message the callback to handle RTMP client events
142141
*/
143142
suspend fun RtmpConnectionBuilder.connect(
144143
urlString: String,
145-
configure: RtmpClientSettings.() -> Unit = {},
144+
settings: RtmpClientSettings = RtmpClientSettings(),
146145
message: RtmpClientCallbackBuilder.() -> Unit = {}
147-
) = connect(RtmpURLBuilder(urlString), configure, message)
146+
) = connect(RtmpURLBuilder(urlString), settings, message)
148147

149148
/**
150149
* Binds a new [RtmpServer] to the given [urlString].
151150
*
152151
* The [urlString] must be in the format `tcp://host:port` or `host:port`.
153152
*
154153
* @param urlString the URL string to bind to
155-
* @param configure the settings for the RTMP server
154+
* @param settings the settings for the RTMP server
156155
* @param message the callback to handle RTMP server events
157156
* @return a new [RtmpServer] instance
158157
*/
159158
suspend fun RtmpConnectionBuilder.bind(
160159
urlString: String,
161-
configure: RtmpServerSettings.() -> Unit = {},
160+
settings: RtmpServerSettings = RtmpServerSettings(),
162161
message: RtmpServerCallbackBuilder.() -> Unit = {}
163162
): RtmpServer {
164163
val url = if (urlString.startWithScheme()) {
@@ -173,5 +172,5 @@ suspend fun RtmpConnectionBuilder.bind(
173172
url.port
174173
}
175174
)
176-
return bind(localAddress, configure, message)
175+
return bind(localAddress, settings, message)
177176
}

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/RtmpConstants.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,11 @@ object RtmpConstants {
3030
* The range of valid chunk sizes for RTMP connections.
3131
*/
3232
internal val chunkSizeRange = MIN_CHUNK_SIZE..MAX_CHUNK_SIZE
33+
34+
/**
35+
* The default chunk size used for RTMP connections.
36+
* This value is used when the client does not specify a chunk size during the handshake.
37+
* The default value is 128 bytes, which is the minimum chunk size allowed by the RTMP protocol.
38+
*/
39+
const val DEFAULT_CHUNK_SIZE = MIN_CHUNK_SIZE // bytes
3340
}

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/client/RtmpClient.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ class RtmpClient internal constructor(
120120
*
121121
* @param metadata the on metadata to send
122122
* @param timestampMs the timestamp of the metadata in milliseconds (usually 0)
123+
* @return the deferred that will be completed when the frame is sent
123124
*/
124125
suspend fun writeSetDataFrame(metadata: Metadata, timestampMs: Int = 0) =
125126
connection.writeSetDataFrame(metadata, timestampMs)
@@ -133,6 +134,7 @@ class RtmpClient internal constructor(
133134
* @param onMetadata the on metadata to send
134135
* @param onMetadataSize the size of the on metadata
135136
* @param timestampMs the timestamp of the metadata in milliseconds (usually 0)
137+
* @return the deferred that will be completed when the frame is sent
136138
*/
137139
suspend fun writeSetDataFrame(
138140
onMetadata: RawSource,
@@ -149,6 +151,7 @@ class RtmpClient internal constructor(
149151
* @param source the audio frame to write
150152
* @param sourceSize the size of the audio frame
151153
* @param timestampMs the timestamp of the frame in milliseconds
154+
* @return the deferred that will be completed when the frame is sent
152155
*/
153156
suspend fun writeAudio(source: RawSource, sourceSize: Int, timestampMs: Int) =
154157
connection.writeAudio(source, sourceSize, timestampMs)
@@ -161,6 +164,7 @@ class RtmpClient internal constructor(
161164
* @param source the video frame to write
162165
* @param sourceSize the size of the video frame
163166
* @param timestampMs the timestamp of the frame in milliseconds
167+
* @return the deferred that will be completed when the frame is sent
164168
*/
165169
suspend fun writeVideo(source: RawSource, sourceSize: Int, timestampMs: Int) =
166170
connection.writeVideo(source, sourceSize, timestampMs)
@@ -171,6 +175,7 @@ class RtmpClient internal constructor(
171175
* The frame must be in the FLV format.
172176
*
173177
* @param source the frame to write
178+
* @return the deferred that will be completed when the frame is sent
174179
*/
175180
suspend fun write(source: Source) = connection.write(source)
176181

@@ -179,20 +184,23 @@ class RtmpClient internal constructor(
179184
*
180185
* @param data the frame to write
181186
* @param timestampMs the timestamp of the frame in milliseconds
187+
* @return the deferred that will be completed when the frame is sent
182188
*/
183189
suspend fun write(data: FLVData, timestampMs: Int) = connection.write(data, timestampMs)
184190

185191
/**
186192
* Writes a [FLVTag].
187193
*
188194
* @param tag the FLV tag to write
195+
* @return the deferred that will be completed when the tag is sent
189196
*/
190197
suspend fun write(tag: FLVTag) = connection.write(tag)
191198

192199
/**
193200
* Writes a [FLVTagRawBody].
194201
*
195202
* @param rawTag the FLV tag to write
203+
* @return the deferred that will be completed when the tag is sent
196204
*/
197205
suspend fun write(rawTag: FLVTagRawBody) = connection.write(rawTag)
198206

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/client/RtmpClientSettings.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import io.github.thibaultbee.krtmp.rtmp.util.RtmpClock
2626
* @param writeWindowAcknowledgementSize RTMP acknowledgement window size in bytes
2727
* @param amfVersion AMF version
2828
* @param clock Clock used to timestamp RTMP messages. You should use the same clock for your video and audio timestamps.
29+
* @param tooLateFrameDropTimeoutInMs the timeout after which a frame will be dropped (from frame timestamps). Make sure frame timestamps are on on the same clock as [clock]. If null is provided, frames will never be dropped. Default is null
2930
* @param connectInfo Lambda to configure the connect command object.
3031
*/
3132
class RtmpClientSettings(
3233
writeWindowAcknowledgementSize: Int = Int.MAX_VALUE,
3334
amfVersion: AmfVersion = AmfVersion.AMF0,
3435
clock: RtmpClock = RtmpClock.Default(),
35-
var connectInfo: ConnectObjectBuilder.() -> Unit = {}
36-
) : RtmpSettings(writeWindowAcknowledgementSize, amfVersion, clock, false, 0L)
36+
tooLateFrameDropTimeoutInMs: Long? = null,
37+
val connectInfo: ConnectObjectBuilder.() -> Unit = {}
38+
) : RtmpSettings(writeWindowAcknowledgementSize, amfVersion, clock, tooLateFrameDropTimeoutInMs)

0 commit comments

Comments
 (0)