Skip to content

Commit 982cf6d

Browse files
committed
refactor(rtmp): write and read message without channels
1 parent 7133b87 commit 982cf6d

File tree

5 files changed

+60
-144
lines changed

5 files changed

+60
-144
lines changed

rtmp/src/commonJvmAndroid/kotlin/io/github/thibaultbee/krtmp/rtmp/client/RtmpClientJvmAndroid.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ suspend fun RtmpClient.writeVideo(buffer: ByteBuffer, timestampMs: Int) =
4444

4545
/**
4646
* Writes the SetDataFrame from a [ByteBuffer].
47-
* It must be called after [publish] and before sending audio or video frames.
47+
* It must be called after [RtmpClient.publish] and before sending audio or video frames.
4848
*
4949
* Expected AMF format is the one set in [RtmpSettings.amfVersion].
5050
*

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/connection/RtmpConnection.kt

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ import kotlinx.coroutines.CoroutineDispatcher
7878
import kotlinx.coroutines.CoroutineScope
7979
import kotlinx.coroutines.Deferred
8080
import kotlinx.coroutines.Dispatchers
81-
import kotlinx.coroutines.awaitAll
8281
import kotlinx.coroutines.isActive
8382
import kotlinx.coroutines.launch
8483
import kotlinx.coroutines.runBlocking
84+
import kotlinx.coroutines.withTimeout
8585
import kotlinx.io.Buffer
8686
import kotlinx.io.IOException
8787
import kotlinx.io.RawSource
@@ -108,14 +108,7 @@ internal class RtmpConnection internal constructor(
108108
) : CoroutineScope by socket, ASocket by socket {
109109
private val callback by lazy { callbackFactory.create(this) }
110110

111-
private val messageWriteChannel = MessageWriteChannel(socket, { message ->
112-
if (settings.tooLateFrameDropTimeoutInMs != null && ((message is Video) || (message is Audio))) {
113-
settings.tooLateFrameDropTimeoutInMs - (settings.clock.nowInMs - message.timestamp)
114-
} else {
115-
null
116-
}
117-
})
118-
111+
private val messageWriteChannel = MessageWriteChannel(socket)
119112
private val messageReadChannel = MessageReadChannel(socket)
120113

121114
private var _transactionId = 1L
@@ -179,7 +172,7 @@ internal class RtmpConnection internal constructor(
179172
"Chunk size must be in range ${RtmpConstants.chunkSizeRange}, but was $chunkSize"
180173
}
181174
val setChunkSize = SetChunkSize(timestampMs, chunkSize)
182-
return writeMessage(setChunkSize).await()
175+
return writeMessage(setChunkSize)
183176
}
184177

185178
/**
@@ -192,7 +185,7 @@ internal class RtmpConnection internal constructor(
192185
val setWindowAcknowledgementSize = WindowAcknowledgementSize(
193186
timestampMs, size
194187
)
195-
writeMessage(setWindowAcknowledgementSize).await()
188+
writeMessage(setWindowAcknowledgementSize)
196189
}
197190

198191
/**
@@ -206,7 +199,7 @@ internal class RtmpConnection internal constructor(
206199
val setPeerBandwidth = SetPeerBandwidth(
207200
timestampMs, size, type
208201
)
209-
writeMessage(setPeerBandwidth).await()
202+
writeMessage(setPeerBandwidth)
210203
}
211204

212205
/**
@@ -222,7 +215,7 @@ internal class RtmpConnection internal constructor(
222215
val userControl = UserControl(
223216
timestampMs, eventType
224217
)
225-
writeMessage(userControl).await()
218+
writeMessage(userControl)
226219
}
227220

228221

@@ -241,7 +234,7 @@ internal class RtmpConnection internal constructor(
241234
val userControl = UserControl(
242235
timestampMs, eventType, data
243236
)
244-
writeMessage(userControl).await()
237+
writeMessage(userControl)
245238
}
246239

247240
/**
@@ -287,7 +280,7 @@ internal class RtmpConnection internal constructor(
287280
userControlStreamBegin,
288281
result.createMessage(amfVersion = settings.amfVersion)
289282
)
290-
).awaitAll()
283+
)
291284
}
292285

293286
/**
@@ -405,7 +398,7 @@ internal class RtmpConnection internal constructor(
405398
)
406399

407400
return try {
408-
writeAmfMessage(playCommand).await()
401+
writeAmfMessage(playCommand)
409402
} catch (t: Throwable) {
410403
throw IOException("Play command failed", t)
411404
}
@@ -430,7 +423,7 @@ internal class RtmpConnection internal constructor(
430423
)
431424
}
432425

433-
writeAmfMessages(messages).awaitAll()
426+
writeAmfMessages(messages)
434427
}
435428

436429
/**
@@ -476,14 +469,14 @@ internal class RtmpConnection internal constructor(
476469
)
477470
)
478471
)
479-
writeAmfMessage(onStatus).await()
472+
writeAmfMessage(onStatus)
480473
}
481474

482475
private fun close(timestampMs: Int) {
483476
try {
484477
val closeCommand = CommandCloseStream(transactionId, timestampMs)
485478
runBlocking {
486-
writeAmfMessage(closeCommand).await()
479+
writeAmfMessage(closeCommand)
487480
}
488481
} catch (t: Throwable) {
489482
KrtmpLogger.i(TAG, "Error sending close command: ${t.message}")
@@ -516,7 +509,7 @@ internal class RtmpConnection internal constructor(
516509
* @param timestampMs the timestamp of the metadata in milliseconds (usually 0)
517510
* @return the deferred that will be completed when the frame is sent
518511
*/
519-
suspend fun writeSetDataFrame(metadata: Metadata, timestampMs: Int): Deferred<Unit> {
512+
suspend fun writeSetDataFrame(metadata: Metadata, timestampMs: Int) {
520513
val messageStreamId = requireNotNull(messageStreamId) {
521514
"You must call createStream() before publish()"
522515
}
@@ -537,7 +530,7 @@ internal class RtmpConnection internal constructor(
537530
* @param timestampMs the timestamp of the metadata in milliseconds (usually 0)
538531
* @return the deferred that will be completed when the frame is sent
539532
*/
540-
suspend fun writeSetDataFrame(onMetadata: ByteArray, timestampMs: Int): Deferred<Unit> {
533+
suspend fun writeSetDataFrame(onMetadata: ByteArray, timestampMs: Int) {
541534
val messageStreamId = requireNotNull(messageStreamId) {
542535
"You must call createStream() before publish()"
543536
}
@@ -569,7 +562,7 @@ internal class RtmpConnection internal constructor(
569562
onMetadataSize: Int,
570563
timestampMs: Int,
571564
amfVersion: AmfVersion = settings.amfVersion
572-
): Deferred<Unit> {
565+
) {
573566
val messageStreamId = requireNotNull(messageStreamId) {
574567
"You must call createStream() before publish()"
575568
}
@@ -590,13 +583,13 @@ internal class RtmpConnection internal constructor(
590583
* @param timestampMs the timestamp of the frame in milliseconds
591584
* @return the deferred that will be completed when the frame is sent
592585
*/
593-
suspend fun writeAudio(source: RawSource, sourceSize: Int, timestampMs: Int): Deferred<Unit> {
586+
suspend fun writeAudio(source: RawSource, sourceSize: Int, timestampMs: Int) {
594587
val messageStreamId = requireNotNull(messageStreamId) {
595588
"You must call createStream() before publish()"
596589
}
597590

598591
val audio = Audio(timestampMs, messageStreamId, source, sourceSize)
599-
return writeMessage(audio)
592+
return withTimeoutWriteIfNeeded(audio)
600593
}
601594

602595
/**
@@ -609,13 +602,13 @@ internal class RtmpConnection internal constructor(
609602
* @param timestampMs the timestamp of the frame in milliseconds
610603
* @return the deferred that will be completed when the frame is sent
611604
*/
612-
suspend fun writeVideo(source: RawSource, sourceSize: Int, timestampMs: Int): Deferred<Unit> {
605+
suspend fun writeVideo(source: RawSource, sourceSize: Int, timestampMs: Int) {
613606
val messageStreamId = requireNotNull(messageStreamId) {
614607
"You must call createStream() before publish()"
615608
}
616609

617610
val video = Video(timestampMs, messageStreamId, source, sourceSize)
618-
return writeMessage(video)
611+
return withTimeoutWriteIfNeeded(video)
619612
}
620613

621614
private suspend fun writeAmfMessagesWithResponse(
@@ -625,7 +618,7 @@ internal class RtmpConnection internal constructor(
625618
return commandChannels.waitForResponse(id)
626619
}
627620

628-
private suspend fun writeAmfMessages(amfMessages: List<AmfMessage>): List<Deferred<Unit>> {
621+
private suspend fun writeAmfMessages(amfMessages: List<AmfMessage>) {
629622
val messages = amfMessages.map { it.createMessage(settings.amfVersion) }
630623
return writeMessages(messages)
631624
}
@@ -637,14 +630,30 @@ internal class RtmpConnection internal constructor(
637630
return writeMessageWithResponse(message, id)
638631
}
639632

640-
suspend fun writeAmfMessage(amfMessage: AmfMessage): Deferred<Unit> {
633+
suspend fun writeAmfMessage(amfMessage: AmfMessage) {
641634
val message = amfMessage.createMessage(settings.amfVersion)
642635
return writeMessage(message)
643636
}
644637

645-
private suspend fun writeMessages(messages: List<Message>): List<Deferred<Unit>> =
646-
messages.map {
647-
messageWriteChannel.send(it)
638+
private suspend fun withTimeoutWriteIfNeeded(message: Message) {
639+
val timeoutInMs = if (settings.tooLateFrameDropTimeoutInMs != null) {
640+
settings.tooLateFrameDropTimeoutInMs - (settings.clock.nowInMs - message.timestamp)
641+
} else {
642+
null
643+
}
644+
if (timeoutInMs != null) {
645+
withTimeout(timeoutInMs) {
646+
writeMessage(message)
647+
}
648+
} else {
649+
writeMessage(message)
650+
}
651+
}
652+
653+
654+
private suspend fun writeMessages(messages: List<Message>) =
655+
messages.forEach {
656+
messageWriteChannel.write(it)
648657
}
649658

650659
/**
@@ -655,7 +664,7 @@ internal class RtmpConnection internal constructor(
655664
* @return the response command
656665
*/
657666
suspend fun writeMessageWithResponse(message: Message, id: Any = transactionId): Command {
658-
writeMessage(message).await()
667+
writeMessage(message)
659668
return commandChannels.waitForResponse(id)
660669
}
661670

@@ -665,7 +674,7 @@ internal class RtmpConnection internal constructor(
665674
* @param message the message to write
666675
*/
667676
suspend fun writeMessage(message: Message) =
668-
messageWriteChannel.send(message)
677+
messageWriteChannel.write(message)
669678

670679
private suspend fun handleRtmpMessages() {
671680
try {
@@ -793,7 +802,7 @@ internal class RtmpConnection internal constructor(
793802
}
794803

795804
private suspend fun readMessage(): Message {
796-
return messageReadChannel.receive()
805+
return messageReadChannel.read()
797806
}
798807

799808
companion object {
@@ -864,20 +873,18 @@ internal suspend fun RtmpConnection.write(array: ByteArray) =
864873
* @param source the frame to write
865874
* @return a [Deferred] that completes when the frame is sent
866875
*/
867-
internal suspend fun RtmpConnection.write(source: Source): Deferred<Unit> {
876+
internal suspend fun RtmpConnection.write(source: Source) {
868877
/**
869878
* Dropping FLV header that is not needed. It starts with 'F', 'L' and 'V'.
870879
* Just check the first byte to simplify.
871880
*/
872881
val peek = source.peek()
873-
val isHeader = try {
882+
try {
874883
peek.readString(3) == "FLV"
875-
} catch (_: Throwable) {
876-
false
877-
}
878-
if (isHeader) {
879884
// Skip header
880885
FLVHeader.decode(source)
886+
} catch (_: Throwable) {
887+
// Not a FLV header, continue
881888
}
882889

883890
source.readInt() // skip previous tag size
@@ -903,7 +910,7 @@ internal suspend fun RtmpConnection.write(source: Source): Deferred<Unit> {
903910
* @param timestampMs the timestamp of the frame in milliseconds
904911
* @return a [Deferred] that completes when the frame is sent
905912
*/
906-
internal suspend fun RtmpConnection.write(data: FLVData, timestampMs: Int): Deferred<Unit> {
913+
internal suspend fun RtmpConnection.write(data: FLVData, timestampMs: Int) {
907914
val rawSource = data.asRawSource(settings.amfVersion, false)
908915
val rawSourceSize = data.getSize(settings.amfVersion)
909916

@@ -941,7 +948,7 @@ internal suspend fun RtmpConnection.write(tag: FLVTag) = write(tag.data, tag.tim
941948
*
942949
* @param tag the FLV tag to write
943950
*/
944-
internal suspend fun RtmpConnection.write(tag: FLVTagRawBody): Deferred<Unit> {
951+
internal suspend fun RtmpConnection.write(tag: FLVTagRawBody) {
945952
return when (tag.type) {
946953
FLVTag.Type.AUDIO -> {
947954
writeAudio(tag.body, tag.bodySize, tag.timestampMs)

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/util/Processable.kt

Lines changed: 0 additions & 31 deletions
This file was deleted.

rtmp/src/commonMain/kotlin/io/github/thibaultbee/krtmp/rtmp/util/channels/MessageReadChannel.kt

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,26 @@ import io.github.thibaultbee.krtmp.rtmp.messages.Message
2020
import io.github.thibaultbee.krtmp.rtmp.messages.SetChunkSize
2121
import io.github.thibaultbee.krtmp.rtmp.util.MessageHistory
2222
import io.github.thibaultbee.krtmp.rtmp.util.sockets.ReadableMessageSocket
23-
import io.ktor.network.sockets.isClosed
24-
import kotlinx.coroutines.channels.Channel
25-
import kotlinx.coroutines.launch
26-
import kotlinx.io.EOFException
23+
import kotlinx.coroutines.sync.Mutex
24+
import kotlinx.coroutines.sync.withLock
2725

2826
/**
2927
* Received RTMP messages through a [ReadableMessageSocket].
3028
*
31-
* It uses a [Channel] to queue messages and a coroutine to send them sequentially.
32-
*
3329
* It also manages the chunk size and message history for proper chunking.
3430
*
35-
* @param sendChannel The channel to queue messages.
3631
* @param socket The socket to send messages through.
3732
*/
3833
internal class MessageReadChannel(
39-
private val socket: ReadableMessageSocket,
40-
private val sendChannel: Channel<Message> = Channel(Channel.UNLIMITED),
34+
private val socket: ReadableMessageSocket
4135
) {
4236
private val messageHistory = MessageHistory()
37+
private val messageMutex = Mutex()
4338

4439
var chunkSize: Int = RtmpConstants.DEFAULT_CHUNK_SIZE
4540
private set
4641

47-
init {
48-
socket.launch {
49-
socket.socketContext
50-
while (!socket.isClosed) {
51-
val message = try {
52-
readMessage()
53-
} catch (_: EOFException) {
54-
break
55-
}
56-
sendChannel.send(message)
57-
}
58-
}
59-
}
60-
61-
private suspend fun readMessage(): Message {
42+
suspend fun read(): Message = messageMutex.withLock {
6243
val message = socket.read(chunkSize) { chunkStreamId ->
6344
messageHistory.get(chunkStreamId)
6445
}
@@ -69,11 +50,7 @@ internal class MessageReadChannel(
6950
return message
7051
}
7152

72-
suspend fun receive() =
73-
sendChannel.receive()
74-
7553
fun close() {
76-
sendChannel.cancel()
7754
messageHistory.clear()
7855
}
7956
}

0 commit comments

Comments
 (0)