1+ package io.github.thibaultbee.krtmp.rtmp
2+
3+ import io.github.thibaultbee.krtmp.rtmp.client.RtmpClient
4+ import io.github.thibaultbee.krtmp.rtmp.client.RtmpClientCallbackBuilder
5+ import io.github.thibaultbee.krtmp.rtmp.client.RtmpClientSettings
6+ import io.github.thibaultbee.krtmp.rtmp.extensions.clientHandshake
7+ import io.github.thibaultbee.krtmp.rtmp.extensions.isSecureRtmp
8+ import io.github.thibaultbee.krtmp.rtmp.extensions.isTunneledRtmp
9+ import io.github.thibaultbee.krtmp.rtmp.extensions.validateRtmp
10+ import io.github.thibaultbee.krtmp.rtmp.server.RtmpServer
11+ import io.github.thibaultbee.krtmp.rtmp.server.RtmpServerCallbackBuilder
12+ import io.github.thibaultbee.krtmp.rtmp.server.RtmpServerSettings
13+ import io.github.thibaultbee.krtmp.rtmp.util.RtmpURLBuilder
14+ import io.github.thibaultbee.krtmp.rtmp.util.RtmpURLProtocol
15+ import io.github.thibaultbee.krtmp.rtmp.util.extensions.startWithScheme
16+ import io.github.thibaultbee.krtmp.rtmp.util.sockets.ISocket
17+ import io.github.thibaultbee.krtmp.rtmp.util.sockets.http.HttpSocket
18+ import io.github.thibaultbee.krtmp.rtmp.util.sockets.tcp.TcpSocket
19+ import io.ktor.http.URLBuilder
20+ import io.ktor.http.Url
21+ import io.ktor.network.selector.SelectorManager
22+ import io.ktor.network.sockets.InetSocketAddress
23+ import io.ktor.network.sockets.ServerSocket
24+ import io.ktor.network.sockets.SocketAddress
25+ import io.ktor.network.sockets.SocketOptions
26+ import io.ktor.network.sockets.aSocket
27+ import io.ktor.network.tls.tls
28+
29+ /* *
30+ * Builder class for creating RTMP connections, both clients and servers.
31+ *
32+ * @param selectorManager the [SelectorManager] to use for socket operations
33+ */
34+ class RtmpConnectionBuilder (val selectorManager : SelectorManager ) {
35+ private val tcpSocketBuilder = aSocket(selectorManager).tcp()
36+
37+ /* *
38+ * The socket options used for TCP connections.
39+ */
40+ val socketOptions: SocketOptions
41+ get() = tcpSocketBuilder.options
42+
43+ /* *
44+ * Connects to the given [urlBuilder] and performs the RTMP handshake.
45+ *
46+ * The [urlBuilder] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
47+ * @param urlBuilder the URL to connect to
48+ * @param configure the settings for the RTMP client
49+ * @param message the callback to handle RTMP client events
50+ */
51+ suspend fun connect (
52+ urlBuilder : URLBuilder ,
53+ configure : RtmpClientSettings .() -> Unit = {},
54+ message : RtmpClientCallbackBuilder .() -> Unit = {}
55+ ): RtmpClient {
56+ urlBuilder.validateRtmp()
57+ val socket = if (urlBuilder.protocol.isTunneledRtmp) {
58+ HttpSocket (urlBuilder)
59+ } else {
60+ val tcpSocket = tcpSocketBuilder.connect(urlBuilder.host, urlBuilder.port).apply {
61+ if (urlBuilder.protocol.isSecureRtmp) {
62+ tls(selectorManager.coroutineContext)
63+ }
64+ }
65+ TcpSocket (tcpSocket, urlBuilder)
66+ }
67+
68+ return connect(socket, configure, message)
69+ }
70+
71+ /* *
72+ * Connects to the given [socket] and performs the RTMP handshake.
73+ */
74+ private suspend fun connect (
75+ socket : ISocket ,
76+ configure : RtmpClientSettings .() -> Unit ,
77+ message : RtmpClientCallbackBuilder .() -> Unit
78+ ): RtmpClient {
79+ val settings = RtmpClientSettings ().apply { configure() }
80+ try {
81+ socket.clientHandshake(settings.clock)
82+ } catch (t: Throwable ) {
83+ socket.close()
84+ throw t
85+ }
86+ return RtmpClient (
87+ socket,
88+ settings,
89+ RtmpClientCallbackBuilder ().apply { message() }
90+ )
91+ }
92+
93+ /* *
94+ * Binds a new [RtmpServer] to the given [localAddress].
95+ *
96+ * @param localAddress the local address to bind to. If null, binds to a random port on all interfaces.
97+ * @param configure the settings for the RTMP server
98+ * @param message the callback to handle RTMP server events
99+ * @return a new [RtmpServer] instance
100+ */
101+ suspend fun bind (
102+ localAddress : SocketAddress ? = null,
103+ configure : RtmpServerSettings .() -> Unit = {},
104+ message : RtmpServerCallbackBuilder .() -> Unit = {}
105+ ): RtmpServer {
106+ val serverSocket = tcpSocketBuilder.bind(localAddress)
107+
108+ return bind(serverSocket, configure, message)
109+ }
110+
111+ /* *
112+ * Binds a new [RtmpServer] to the given [serverSocket].
113+ */
114+ private fun bind (
115+ serverSocket : ServerSocket ,
116+ settings : RtmpServerSettings .() -> Unit ,
117+ messages : RtmpServerCallbackBuilder .() -> Unit
118+ ) = RtmpServer (
119+ serverSocket,
120+ RtmpServerSettings ().apply { settings() },
121+ RtmpServerCallbackBuilder ().apply { messages() }
122+ )
123+ }
124+
125+ /* *
126+ * Connects to the given [urlString] and performs the RTMP handshake.
127+ *
128+ * The [urlString] must use the `rtmp`, `rtmps`, `rtmpt` or `rtmpts` protocol.
129+ *
130+ * @param urlString the RTMP URL to connect to
131+ * @param configure the settings for the RTMP client
132+ * @param message the callback to handle RTMP client events
133+ */
134+ suspend fun RtmpConnectionBuilder.connect (
135+ urlString : String ,
136+ configure : RtmpClientSettings .() -> Unit = {},
137+ message : RtmpClientCallbackBuilder .() -> Unit = {}
138+ ) = connect(RtmpURLBuilder (urlString), configure, message)
139+
140+ /* *
141+ * Binds a new [RtmpServer] to the given [urlString].
142+ *
143+ * The [urlString] must be in the format `tcp://host:port` or `host:port`.
144+ *
145+ * @param urlString the URL string to bind to
146+ * @param configure the settings for the RTMP server
147+ * @param message the callback to handle RTMP server events
148+ * @return a new [RtmpServer] instance
149+ */
150+ suspend fun RtmpConnectionBuilder.bind (
151+ urlString : String ,
152+ configure : RtmpServerSettings .() -> Unit = {},
153+ message : RtmpServerCallbackBuilder .() -> Unit = {}
154+ ): RtmpServer {
155+ val url = if (urlString.startWithScheme()) {
156+ Url (urlString)
157+ } else {
158+ Url (" rtmp://$urlString " )
159+ }
160+ val localAddress = InetSocketAddress (
161+ url.host, if (url.specifiedPort == 0 ) {
162+ RtmpURLProtocol .createOrDefault(url.protocol.name).defaultPort
163+ } else {
164+ url.port
165+ }
166+ )
167+ return bind(localAddress, configure, message)
168+ }
0 commit comments