Skip to content

Commit bec0d32

Browse files
committed
feat(rtmp): modify client and server factory
1 parent 0d55886 commit bec0d32

File tree

16 files changed

+667
-465
lines changed

16 files changed

+667
-465
lines changed

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,20 @@ implementation("io.github.thibaultbee.krtmp:rtmp:0.9.0")
4646

4747
### Client
4848

49-
Creates a RTMP client with the Factory `RtmpClient`:
49+
Use `RtmpConnectionBuilder` to create a RTMP client:
5050

5151
```kotlin
52-
val client = RtmpClient(
52+
val client = RtmpConnectionBuilder.connect(
5353
"rtmp://my.server.com/app/streamkey" // Your RTMP server URL (incl app name and stream key)
5454
)
5555
```
5656

5757
Then prepare your live by sending these messages to the server:
5858

5959
```kotlin
60-
client.connect()
61-
client.createStream()
62-
client.publish(StreamPublishType.LIVE)
60+
client.connect(...) // Send connect message
61+
client.createStream() // Send createStream message
62+
client.publish(StreamPublishType.LIVE) // Send publish message
6363
```
6464

6565
If you already have FLV data, write your video/audio data:
@@ -85,10 +85,10 @@ See [FLV](#flv) for more details to write audio and video frames..
8585

8686
### Server
8787

88-
Use the `RtmpServer` to create a RTMP server:
88+
Use `RtmpConnectionBuilder` to create a RTMP server:
8989

9090
```kotlin
91-
val server = RtmpServer("0.0.0.0:1935") // Listening on port 1935
91+
val server = RtmpConnectionBuilder.bind("0.0.0.0:1935") // Listening on port 1935
9292
```
9393

9494
Then start the server:

rtmp/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ kotlin {
4646
api(libs.ktor.network)
4747
implementation(libs.ktor.network.tls)
4848
implementation(libs.ktor.http)
49-
implementation(libs.ktor.client.core)
49+
api(libs.ktor.client.core)
5050
implementation(libs.ktor.client.cio)
5151
implementation(libs.kotlinx.io.core)
5252
implementation(libs.kotlinx.serialization.core)
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package io.github.thibaultbee.krtmp.rtmp
2+
3+
import io.github.thibaultbee.krtmp.rtmp.client.RtmpClient
4+
import io.github.thibaultbee.krtmp.rtmp.client.RtmpClientCallbackBuilder
5+
import io.github.thibaultbee.krtmp.rtmp.client.RtmpClientSettings
6+
import io.github.thibaultbee.krtmp.rtmp.extensions.clientHandshake
7+
import io.github.thibaultbee.krtmp.rtmp.extensions.isSecureRtmp
8+
import io.github.thibaultbee.krtmp.rtmp.extensions.isTunneledRtmp
9+
import io.github.thibaultbee.krtmp.rtmp.extensions.validateRtmp
10+
import io.github.thibaultbee.krtmp.rtmp.server.RtmpServer
11+
import io.github.thibaultbee.krtmp.rtmp.server.RtmpServerCallbackBuilder
12+
import io.github.thibaultbee.krtmp.rtmp.server.RtmpServerSettings
13+
import io.github.thibaultbee.krtmp.rtmp.util.RtmpURLBuilder
14+
import io.github.thibaultbee.krtmp.rtmp.util.RtmpURLProtocol
15+
import io.github.thibaultbee.krtmp.rtmp.util.extensions.startWithScheme
16+
import io.github.thibaultbee.krtmp.rtmp.util.sockets.ISocket
17+
import io.github.thibaultbee.krtmp.rtmp.util.sockets.http.HttpSocket
18+
import io.github.thibaultbee.krtmp.rtmp.util.sockets.tcp.TcpSocket
19+
import io.ktor.http.URLBuilder
20+
import io.ktor.http.Url
21+
import io.ktor.network.selector.SelectorManager
22+
import io.ktor.network.sockets.InetSocketAddress
23+
import io.ktor.network.sockets.ServerSocket
24+
import io.ktor.network.sockets.SocketAddress
25+
import io.ktor.network.sockets.SocketOptions
26+
import io.ktor.network.sockets.aSocket
27+
import io.ktor.network.tls.tls
28+
29+
/**
30+
* Builder class for creating RTMP connections, both clients and servers.
31+
*
32+
* @param selectorManager the [SelectorManager] to use for socket operations
33+
*/
34+
class RtmpConnectionBuilder(val selectorManager: SelectorManager) {
35+
private val tcpSocketBuilder = aSocket(selectorManager).tcp()
36+
37+
/**
38+
* The socket options used for TCP connections.
39+
*/
40+
val socketOptions: SocketOptions
41+
get() = tcpSocketBuilder.options
42+
43+
/**
44+
* Connects to the given [urlBuilder] and performs the RTMP handshake.
45+
*
46+
* The [urlBuilder] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
47+
*
48+
* Don't forget to call [RtmpClient.connect] after this to complete the RTMP connection.
49+
*
50+
* @param urlBuilder the URL to connect to
51+
* @param configure the settings for the RTMP client
52+
* @param message the callback to handle RTMP client events
53+
*/
54+
suspend fun connect(
55+
urlBuilder: URLBuilder,
56+
configure: RtmpClientSettings.() -> Unit = {},
57+
message: RtmpClientCallbackBuilder.() -> Unit = {}
58+
): RtmpClient {
59+
urlBuilder.validateRtmp()
60+
val socket = if (urlBuilder.protocol.isTunneledRtmp) {
61+
HttpSocket(urlBuilder)
62+
} else {
63+
val tcpSocket = tcpSocketBuilder.connect(urlBuilder.host, urlBuilder.port).apply {
64+
if (urlBuilder.protocol.isSecureRtmp) {
65+
tls(selectorManager.coroutineContext)
66+
}
67+
}
68+
TcpSocket(tcpSocket, urlBuilder)
69+
}
70+
71+
return connect(socket, configure, message)
72+
}
73+
74+
/**
75+
* Connects to the given [socket] and performs the RTMP handshake.
76+
*/
77+
private suspend fun connect(
78+
socket: ISocket,
79+
configure: RtmpClientSettings.() -> Unit,
80+
message: RtmpClientCallbackBuilder.() -> Unit
81+
): RtmpClient {
82+
val settings = RtmpClientSettings().apply { configure() }
83+
try {
84+
socket.clientHandshake(settings.clock)
85+
} catch (t: Throwable) {
86+
socket.close()
87+
throw t
88+
}
89+
return RtmpClient(
90+
socket,
91+
settings,
92+
RtmpClientCallbackBuilder().apply { message() }
93+
)
94+
}
95+
96+
/**
97+
* Binds a new [RtmpServer] to the given [localAddress].
98+
*
99+
* @param localAddress the local address to bind to. If null, binds to a random port on all interfaces.
100+
* @param configure the settings for the RTMP server
101+
* @param message the callback to handle RTMP server events
102+
* @return a new [RtmpServer] instance
103+
*/
104+
suspend fun bind(
105+
localAddress: SocketAddress? = null,
106+
configure: RtmpServerSettings.() -> Unit = {},
107+
message: RtmpServerCallbackBuilder.() -> Unit = {}
108+
): RtmpServer {
109+
val serverSocket = tcpSocketBuilder.bind(localAddress)
110+
111+
return bind(serverSocket, configure, message)
112+
}
113+
114+
/**
115+
* Binds a new [RtmpServer] to the given [serverSocket].
116+
*/
117+
private fun bind(
118+
serverSocket: ServerSocket,
119+
settings: RtmpServerSettings.() -> Unit,
120+
messages: RtmpServerCallbackBuilder.() -> Unit
121+
) = RtmpServer(
122+
serverSocket,
123+
RtmpServerSettings().apply { settings() },
124+
RtmpServerCallbackBuilder().apply { messages() }
125+
)
126+
}
127+
128+
/**
129+
* Connects to the given [urlString] and performs the RTMP handshake.
130+
*
131+
* The [urlString] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
132+
*
133+
* Don't forget to call [RtmpClient.connect] after this to complete the RTMP connection.
134+
*
135+
* @param urlString the RTMP URL to connect to
136+
* @param configure the settings for the RTMP client
137+
* @param message the callback to handle RTMP client events
138+
*/
139+
suspend fun RtmpConnectionBuilder.connect(
140+
urlString: String,
141+
configure: RtmpClientSettings.() -> Unit = {},
142+
message: RtmpClientCallbackBuilder.() -> Unit = {}
143+
) = connect(RtmpURLBuilder(urlString), configure, message)
144+
145+
/**
146+
* Binds a new [RtmpServer] to the given [urlString].
147+
*
148+
* The [urlString] must be in the format `tcp://host:port` or `host:port`.
149+
*
150+
* @param urlString the URL string to bind to
151+
* @param configure the settings for the RTMP server
152+
* @param message the callback to handle RTMP server events
153+
* @return a new [RtmpServer] instance
154+
*/
155+
suspend fun RtmpConnectionBuilder.bind(
156+
urlString: String,
157+
configure: RtmpServerSettings.() -> Unit = {},
158+
message: RtmpServerCallbackBuilder.() -> Unit = {}
159+
): RtmpServer {
160+
val url = if (urlString.startWithScheme()) {
161+
Url(urlString)
162+
} else {
163+
Url("rtmp://$urlString")
164+
}
165+
val localAddress = InetSocketAddress(
166+
url.host, if (url.specifiedPort == 0) {
167+
RtmpURLProtocol.createOrDefault(url.protocol.name).defaultPort
168+
} else {
169+
url.port
170+
}
171+
)
172+
return bind(localAddress, configure, message)
173+
}

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/client/RtmpClient.kt

Lines changed: 9 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.github.thibaultbee.krtmp.rtmp.client
1717

18-
import io.github.thibaultbee.krtmp.common.logger.KrtmpLogger
1918
import io.github.thibaultbee.krtmp.flv.sources.ByteArrayBackedRawSource
2019
import io.github.thibaultbee.krtmp.flv.tags.FLVData
2120
import io.github.thibaultbee.krtmp.flv.tags.FLVTag
@@ -25,83 +24,31 @@ import io.github.thibaultbee.krtmp.rtmp.connection.RtmpConnection
2524
import io.github.thibaultbee.krtmp.rtmp.connection.RtmpConnectionCallback
2625
import io.github.thibaultbee.krtmp.rtmp.connection.RtmpSettings
2726
import io.github.thibaultbee.krtmp.rtmp.connection.write
28-
import io.github.thibaultbee.krtmp.rtmp.extensions.clientHandshake
2927
import io.github.thibaultbee.krtmp.rtmp.messages.Command
3028
import io.github.thibaultbee.krtmp.rtmp.messages.DataAmf
3129
import io.github.thibaultbee.krtmp.rtmp.messages.Message
3230
import io.github.thibaultbee.krtmp.rtmp.messages.command.ConnectObjectBuilder
3331
import io.github.thibaultbee.krtmp.rtmp.messages.command.StreamPublishType
3432
import io.github.thibaultbee.krtmp.rtmp.util.NetConnectionConnectCodeReconnect
35-
import io.github.thibaultbee.krtmp.rtmp.util.RtmpURLBuilder
3633
import io.github.thibaultbee.krtmp.rtmp.util.sockets.ISocket
37-
import io.github.thibaultbee.krtmp.rtmp.util.sockets.SocketFactory
38-
import io.ktor.http.URLBuilder
39-
import io.ktor.http.Url
4034
import io.ktor.network.sockets.ASocket
4135
import kotlinx.coroutines.CoroutineScope
4236
import kotlinx.io.Buffer
4337
import kotlinx.io.RawSource
4438
import kotlinx.io.Source
4539

4640
/**
47-
* Creates a new [RtmpClient] with the given URL string and settings.
41+
* Creates an RTMP client.
4842
*
49-
* @param urlString the RTMP URL to connect to
50-
* @param callback the callback to handle RTMP client events
51-
* @param settings the settings for the RTMP client
52-
* @return a new [RtmpClient] instance
43+
* @param connection the socket connection to use
44+
* @param settings the RTMP settings to use
45+
* @param callback the callback to handle RTMP events
46+
* @return the created [RtmpClient]
5347
*/
54-
suspend fun RtmpClient(
55-
urlString: String,
56-
callback: DefaultRtmpClientCallback = DefaultRtmpClientCallback(),
57-
settings: RtmpSettings = RtmpSettings
58-
) =
59-
RtmpClient(RtmpURLBuilder(urlString), callback, settings)
60-
61-
/**
62-
* Creates a new [RtmpClient] with the given URL and settings.
63-
*
64-
* @param url the RTMP URL to connect to
65-
* @param callback the callback to handle RTMP client events
66-
* @param settings the settings for the RTMP client
67-
* @return a new [RtmpClient] instance
68-
*/
69-
suspend fun RtmpClient(
70-
url: Url,
71-
callback: DefaultRtmpClientCallback = DefaultRtmpClientCallback(),
72-
settings: RtmpSettings = RtmpSettings
73-
) =
74-
RtmpClient(RtmpURLBuilder(url), callback, settings)
75-
76-
/**
77-
* Creates a new [RtmpClient] with the given [URLBuilder] and settings.
78-
*
79-
* Use [RtmpURLBuilder] to create the [URLBuilder].
80-
*
81-
* @param urlBuilder the [URLBuilder] to connect to
82-
* @param callback the callback to handle RTMP client events
83-
* @param settings the settings for the RTMP client
84-
* @return a new [RtmpClient] instance
85-
*/
86-
suspend fun RtmpClient(
87-
urlBuilder: URLBuilder,
88-
callback: RtmpClientCallback = DefaultRtmpClientCallback(),
89-
settings: RtmpSettings = RtmpSettings,
90-
): RtmpClient {
91-
val connection = SocketFactory().connect(urlBuilder)
92-
try {
93-
connection.clientHandshake(settings.clock)
94-
} catch (t: Throwable) {
95-
connection.close()
96-
throw t
97-
}
98-
return RtmpClient(connection, callback, settings)
99-
}
100-
10148
internal fun RtmpClient(
10249
connection: ISocket,
103-
callback: RtmpClientCallback,
104-
settings: RtmpSettings
50+
settings: RtmpSettings,
51+
callback: RtmpClientCallback
10552
): RtmpClient {
10653
return RtmpClient(
10754
RtmpConnection(
@@ -318,64 +265,8 @@ internal class RtmpClientConnectionCallback(
318265
}
319266

320267
class Factory(private val callback: RtmpClientCallback) : RtmpConnectionCallback.Factory {
321-
override fun create(streamer: RtmpConnection): RtmpConnectionCallback {
322-
return RtmpClientConnectionCallback(streamer, callback)
268+
override fun create(connection: RtmpConnection): RtmpConnectionCallback {
269+
return RtmpClientConnectionCallback(connection, callback)
323270
}
324271
}
325272
}
326-
327-
class DefaultRtmpClientCallback : RtmpClientCallback {
328-
override suspend fun onMessage(message: Message) {
329-
KrtmpLogger.i(TAG, "Received message: $message")
330-
}
331-
332-
override suspend fun onCommand(command: Command) {
333-
KrtmpLogger.i(TAG, "Received command: $command")
334-
}
335-
336-
override suspend fun onData(data: DataAmf) {
337-
KrtmpLogger.i(TAG, "Received data: $data")
338-
}
339-
340-
companion object {
341-
/**
342-
* Default instance of [DefaultRtmpClientCallback].
343-
*/
344-
private const val TAG = "DefaultRtmpClientCallback"
345-
}
346-
}
347-
348-
/**
349-
* Callback interface for RTMP client events.
350-
*/
351-
interface RtmpClientCallback {
352-
/**
353-
* Called when a message is received.
354-
*
355-
* @param message the received message
356-
*/
357-
suspend fun onMessage(message: Message) = Unit
358-
359-
/**
360-
* Called when a command is received.
361-
*
362-
* @param command the received command
363-
*/
364-
suspend fun onCommand(command: Command) = Unit
365-
366-
/**
367-
* Called when data is received.
368-
*
369-
* @param data the received data
370-
*/
371-
suspend fun onData(data: DataAmf) = Unit
372-
373-
/**
374-
* Called when a reconnect request is received.
375-
*
376-
* This is used to handle reconnection logic.
377-
*
378-
* @param command the command containing the reconnect request
379-
*/
380-
suspend fun onReconnectRequest(command: Command.OnStatus) = Unit
381-
}

0 commit comments

Comments
 (0)