Skip to content

Commit d16332d

Browse files
committed
chunk psi packets
1 parent 964c3b7 commit d16332d

File tree

8 files changed

+53
-31
lines changed

8 files changed

+53
-31
lines changed

srt/src/main/java/com/pedro/srt/mpeg2ts/packets/AacPacket.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.pedro.srt.mpeg2ts.Pes
2525
import com.pedro.srt.mpeg2ts.PesType
2626
import com.pedro.srt.mpeg2ts.psi.PsiManager
2727
import com.pedro.srt.srt.packets.data.PacketPosition
28+
import com.pedro.srt.utils.chunkPackets
2829
import java.nio.ByteBuffer
2930

3031
/**
@@ -54,7 +55,7 @@ class AacPacket(
5455
fixedBuffer.get(payload, headerSize, length)
5556

5657
val pes = Pes(psiManager.getAudioPid().toInt(), false, PesType.AUDIO, mediaFrame.info.timestamp, ByteBuffer.wrap(payload))
57-
val mpeg2tsPackets = chunkPackets(mpegTsPacketizer.write(listOf(pes))).map { buffer ->
58+
val mpeg2tsPackets = mpegTsPacketizer.write(listOf(pes)).chunkPackets(chunkSize).map { buffer ->
5859
MpegTsPacket(buffer, MpegType.AUDIO, PacketPosition.SINGLE, isKey = false)
5960
}
6061
if (mpeg2tsPackets.isNotEmpty()) callback(mpeg2tsPackets)

srt/src/main/java/com/pedro/srt/mpeg2ts/packets/BasePacket.kt

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ abstract class BasePacket(
4949
) {
5050

5151
protected val mpegTsPacketizer = MpegTsPacketizer(psiManager)
52-
private var chunkSize = limitSize / MpegTsPacketizer.packetSize //max number of ts packets per srtpacket
52+
protected var chunkSize = limitSize / MpegTsPacketizer.packetSize //max number of ts packets per srtpacket
5353

5454
abstract suspend fun createAndSendPacket(
5555
mediaFrame: MediaFrame,
@@ -67,16 +67,4 @@ abstract class BasePacket(
6767
this.limitSize = limitSize
6868
chunkSize = limitSize / MpegTsPacketizer.packetSize
6969
}
70-
71-
protected fun chunkPackets(mpeg2tsPackets: List<ByteArray>): List<ByteArray> {
72-
val chunked = mpeg2tsPackets.chunked(chunkSize)
73-
val packets = mutableListOf<ByteArray>()
74-
chunked.forEach { chunks ->
75-
val size = chunks.sumOf { it.size }
76-
val buffer = ByteBuffer.allocate(size)
77-
chunks.forEach { buffer.put(it) }
78-
packets.add(buffer.toByteArray())
79-
}
80-
return packets
81-
}
8270
}

srt/src/main/java/com/pedro/srt/mpeg2ts/packets/H26XPacket.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.pedro.srt.mpeg2ts.Pes
2727
import com.pedro.srt.mpeg2ts.PesType
2828
import com.pedro.srt.mpeg2ts.psi.PsiManager
2929
import com.pedro.srt.srt.packets.data.PacketPosition
30+
import com.pedro.srt.utils.chunkPackets
3031
import com.pedro.srt.utils.startWith
3132
import java.nio.ByteBuffer
3233

@@ -78,7 +79,7 @@ class H26XPacket(
7879
validBuffer.get(payload, 0, validBuffer.remaining())
7980

8081
val pes = Pes(psiManager.getVideoPid().toInt(), isKeyFrame, PesType.VIDEO, mediaFrame.info.timestamp, ByteBuffer.wrap(payload))
81-
val mpeg2tsPackets = chunkPackets(mpegTsPacketizer.write(listOf(pes))).map { buffer ->
82+
val mpeg2tsPackets = mpegTsPacketizer.write(listOf(pes)).chunkPackets(chunkSize).map { buffer ->
8283
MpegTsPacket(buffer, MpegType.VIDEO, PacketPosition.SINGLE, isKeyFrame)
8384
}
8485
if (mpeg2tsPackets.isNotEmpty()) callback(mpeg2tsPackets)

srt/src/main/java/com/pedro/srt/mpeg2ts/packets/OpusPacket.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.pedro.srt.mpeg2ts.Pes
2424
import com.pedro.srt.mpeg2ts.PesType
2525
import com.pedro.srt.mpeg2ts.psi.PsiManager
2626
import com.pedro.srt.srt.packets.data.PacketPosition
27+
import com.pedro.srt.utils.chunkPackets
2728
import com.pedro.srt.utils.toByteArray
2829
import java.nio.ByteBuffer
2930

@@ -49,7 +50,7 @@ class OpusPacket(
4950
System.arraycopy(header, 0, payload, 0, header.size)
5051

5152
val pes = Pes(psiManager.getAudioPid().toInt(), true, PesType.PRIVATE_STREAM_1, mediaFrame.info.timestamp, ByteBuffer.wrap(payload))
52-
val mpeg2tsPackets = chunkPackets(mpegTsPacketizer.write(listOf(pes))).map { buffer ->
53+
val mpeg2tsPackets = mpegTsPacketizer.write(listOf(pes)).chunkPackets(chunkSize).map { buffer ->
5354
MpegTsPacket(buffer, MpegType.AUDIO, PacketPosition.SINGLE, true)
5455
}
5556
if (mpeg2tsPackets.isNotEmpty()) callback(mpeg2tsPackets)

srt/src/main/java/com/pedro/srt/mpeg2ts/psi/PsiManager.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.pedro.srt.mpeg2ts.MpegTsPayload
2222
import com.pedro.srt.mpeg2ts.MpegType
2323
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
2424
import com.pedro.srt.srt.packets.data.PacketPosition
25+
import com.pedro.srt.utils.chunkPackets
2526
import kotlin.random.Random
2627

2728
/**
@@ -52,23 +53,20 @@ class PsiManager(
5253
service = service
5354
)
5455

55-
fun checkSendInfo(isKey: Boolean = false, mpegTsPacketizer: MpegTsPacketizer): List<MpegTsPacket> {
56+
fun checkSendInfo(isKey: Boolean = false, mpegTsPacketizer: MpegTsPacketizer, chunkSize: Int): List<MpegTsPacket> {
5657
val pmt = service.pmt ?: return arrayListOf()
5758
val psiPackets = mutableListOf<MpegTsPayload>()
58-
if (sdtCount >= sdtPeriod && patCount >= patPeriod) {
59-
psiPackets.addAll(listOf(pmt, sdt, pat))
60-
sdtCount = 0
61-
patCount = 0
62-
} else if (patCount >= patPeriod || isKey) {
59+
if (patCount >= patPeriod || isKey) {
6360
psiPackets.addAll(listOf(pat, pmt))
6461
patCount = 0
65-
} else if (sdtCount >= sdtPeriod) {
62+
}
63+
if (sdtCount >= sdtPeriod) {
6664
psiPackets.add(sdt)
6765
sdtCount = 0
6866
}
6967
sdtCount++
7068
patCount++
71-
return mpegTsPacketizer.write(psiPackets, increasePsiContinuity = true).map { b ->
69+
return mpegTsPacketizer.write(psiPackets, increasePsiContinuity = true).chunkPackets(chunkSize).map { b ->
7270
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
7371
}
7472
}

srt/src/main/java/com/pedro/srt/srt/SrtSender.kt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ import com.pedro.srt.mpeg2ts.packets.AacPacket
3131
import com.pedro.srt.mpeg2ts.packets.BasePacket
3232
import com.pedro.srt.mpeg2ts.packets.H26XPacket
3333
import com.pedro.srt.mpeg2ts.packets.OpusPacket
34+
import com.pedro.srt.mpeg2ts.psi.Psi
3435
import com.pedro.srt.mpeg2ts.psi.PsiManager
3536
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
3637
import com.pedro.srt.srt.packets.SrtPacket
3738
import com.pedro.srt.srt.packets.data.PacketPosition
3839
import com.pedro.srt.utils.SrtSocket
40+
import com.pedro.srt.utils.chunkPackets
3941
import com.pedro.srt.utils.toCodec
4042
import kotlinx.coroutines.isActive
4143
import kotlinx.coroutines.runInterruptible
@@ -55,7 +57,11 @@ class SrtSender(
5557
upgradePatVersion()
5658
upgradeSdtVersion()
5759
}
58-
private val limitSize = commandsManager.MTU - SrtPacket.headerSize
60+
private val limitSize: Int
61+
get() {
62+
return commandsManager.MTU - SrtPacket.headerSize
63+
}
64+
5965
private val mpegTsPacketizer = MpegTsPacketizer(psiManager)
6066
private var audioPacket: BasePacket = AacPacket(limitSize, psiManager)
6167
private val videoPacket = H26XPacket(limitSize, psiManager)
@@ -86,11 +92,17 @@ class SrtSender(
8692
}
8793

8894
override suspend fun onRun() {
95+
val limitSize = this.limitSize
96+
val chunkSize = limitSize / MpegTsPacketizer.packetSize
97+
audioPacket.setLimitSize(limitSize)
98+
videoPacket.setLimitSize(limitSize)
99+
89100
setTrackConfig(!commandsManager.videoDisabled, !commandsManager.audioDisabled)
90101
//send config
91-
val psiList = mutableListOf(psiManager.getSdt(), psiManager.getPat())
102+
val psiList = mutableListOf<Psi>(psiManager.getPat())
92103
psiManager.getPmt()?.let { psiList.add(0, it) }
93-
val psiPacketsConfig = mpegTsPacketizer.write(psiList).map { buffer ->
104+
psiList.add(psiManager.getSdt())
105+
val psiPacketsConfig = mpegTsPacketizer.write(psiList).chunkPackets(chunkSize).map { buffer ->
94106
MpegTsPacket(buffer, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
95107
}
96108
sendPackets(psiPacketsConfig, MpegType.PSI)
@@ -99,7 +111,7 @@ class SrtSender(
99111
val mediaFrame = runInterruptible { queue.poll(1, TimeUnit.SECONDS) }
100112
getMpegTsPackets(mediaFrame) { mpegTsPackets ->
101113
val isKey = mpegTsPackets[0].isKey
102-
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer)
114+
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer, chunkSize)
103115
bytesSend += sendPackets(psiPackets, MpegType.PSI)
104116
bytesSend += sendPackets(mpegTsPackets, mpegTsPackets[0].type)
105117
}

srt/src/main/java/com/pedro/srt/utils/Extensions.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.pedro.srt.utils
1818

1919
import com.pedro.common.AudioCodec
2020
import com.pedro.common.VideoCodec
21+
import com.pedro.common.toByteArray
2122
import com.pedro.srt.mpeg2ts.Codec
2223
import java.io.InputStream
2324
import java.io.OutputStream
@@ -64,4 +65,16 @@ fun AudioCodec.toCodec(): Codec {
6465
AudioCodec.OPUS -> Codec.OPUS
6566
else -> throw IllegalArgumentException("Unsupported codec: $name")
6667
}
68+
}
69+
70+
fun List<ByteArray>.chunkPackets(size: Int): List<ByteArray> {
71+
val chunked = this.chunked(size)
72+
val packets = mutableListOf<ByteArray>()
73+
chunked.forEach { chunks ->
74+
val chunkSize = chunks.sumOf { it.size }
75+
val buffer = ByteBuffer.allocate(chunkSize)
76+
chunks.forEach { buffer.put(it) }
77+
packets.add(buffer.toByteArray())
78+
}
79+
return packets
6780
}

udp/src/main/java/com/pedro/udp/UdpSender.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import com.pedro.srt.mpeg2ts.packets.AacPacket
3131
import com.pedro.srt.mpeg2ts.packets.BasePacket
3232
import com.pedro.srt.mpeg2ts.packets.H26XPacket
3333
import com.pedro.srt.mpeg2ts.packets.OpusPacket
34+
import com.pedro.srt.mpeg2ts.psi.Psi
3435
import com.pedro.srt.mpeg2ts.psi.PsiManager
3536
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
3637
import com.pedro.srt.srt.packets.data.PacketPosition
3738
import com.pedro.srt.utils.Constants
39+
import com.pedro.srt.utils.chunkPackets
3840
import com.pedro.srt.utils.toCodec
3941
import com.pedro.udp.utils.UdpSocket
4042
import kotlinx.coroutines.isActive
@@ -86,11 +88,17 @@ class UdpSender(
8688
}
8789

8890
override suspend fun onRun() {
91+
val limitSize = this.limitSize
92+
val chunkSize = limitSize / MpegTsPacketizer.packetSize
93+
audioPacket.setLimitSize(limitSize)
94+
videoPacket.setLimitSize(limitSize)
95+
8996
setTrackConfig(!commandManager.videoDisabled, !commandManager.audioDisabled)
9097
//send config
91-
val psiList = mutableListOf(psiManager.getSdt(), psiManager.getPat())
98+
val psiList = mutableListOf<Psi>(psiManager.getPat())
9299
psiManager.getPmt()?.let { psiList.add(0, it) }
93-
val psiPacketsConfig = mpegTsPacketizer.write(psiList).map { b ->
100+
psiList.add(psiManager.getSdt())
101+
val psiPacketsConfig = mpegTsPacketizer.write(psiList).chunkPackets(chunkSize).map { b ->
94102
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
95103
}
96104
sendPackets(psiPacketsConfig, MpegType.PSI)
@@ -99,7 +107,7 @@ class UdpSender(
99107
val mediaFrame = runInterruptible { queue.poll(1, TimeUnit.SECONDS) }
100108
getMpegTsPackets(mediaFrame) { mpegTsPackets ->
101109
val isKey = mpegTsPackets[0].isKey
102-
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer)
110+
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer, chunkSize)
103111
bytesSend += sendPackets(psiPackets, MpegType.PSI)
104112
bytesSend += sendPackets(mpegTsPackets, mpegTsPackets[0].type)
105113
}

0 commit comments

Comments
 (0)