Skip to content

Commit 3c4e836

Browse files
committed
chore(core): ts: use byte buffer pool when allocating TS buffer
1 parent 8e2894a commit 3c4e836

File tree

13 files changed

+63
-21
lines changed

13 files changed

+63
-21
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/TsMuxer.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
3333
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.av.OpusControlHeader
3434
import io.github.thibaultbee.streampack.core.elements.utils.av.audio.aac.ADTSFrameWriter
3535
import io.github.thibaultbee.streampack.core.elements.utils.av.audio.aac.LATMFrameWriter
36+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
3637
import java.nio.ByteBuffer
3738
import java.util.MissingFormatArgumentException
3839
import kotlin.random.Random
3940

4041
class TsMuxer : IMuxerInternal {
42+
private val byteBufferPool = ByteBufferPool(true)
43+
4144
override val info by lazy { TSMuxerInfo }
4245
private val tsServices = mutableListOf<Service>()
4346
private val tsPes = mutableListOf<Pes>()
@@ -56,10 +59,10 @@ class TsMuxer : IMuxerInternal {
5659

5760
private val tsId = Random.nextInt(Byte.MIN_VALUE.toInt(), Byte.MAX_VALUE.toInt()).toShort()
5861
private var pat = Pat(
59-
listener, tsServices, tsId, packetCount = 0
62+
byteBufferPool, listener, tsServices, tsId, packetCount = 0
6063
)
6164
private var sdt = Sdt(
62-
listener, tsServices, tsId, packetCount = 0
65+
byteBufferPool, listener, tsServices, tsId, packetCount = 0
6366
)
6467

6568
override val streamConfigs: List<CodecConfig>
@@ -339,11 +342,12 @@ class TsMuxer : IMuxerInternal {
339342
service.pmt = service.pmt?.apply {
340343
versionNumber = (versionNumber + 1).toByte()
341344
streams = service.streams
342-
} ?: Pmt(listener, service, service.streams, getNewPid())
345+
} ?: Pmt(byteBufferPool, listener, service, service.streams, getNewPid())
343346

344347
// Init PES
345348
newStreams.forEach {
346349
Pes(
350+
byteBufferPool,
347351
listener,
348352
it,
349353
service.pcrPid == it.pid,
@@ -419,10 +423,12 @@ class TsMuxer : IMuxerInternal {
419423
tsServices.forEach {
420424
removeStreams(it)
421425
}
426+
byteBufferPool.clear()
422427
}
423428

424429
override fun release() {
425430
tsServices.clear()
431+
byteBufferPool.close()
426432
}
427433

428434
/**

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/Pat.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@ package io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxe
1818
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal
1919
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.ITSElement
2020
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.Service
21+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2122
import java.nio.ByteBuffer
2223
import kotlin.experimental.or
2324

2425
class Pat(
26+
byteBufferPool: ByteBufferPool,
2527
listener: IMuxerInternal.IMuxerListener? = null,
2628
private val services: List<Service>,
2729
tsId: Short,
2830
versionNumber: Byte = 0,
2931
var packetCount: Int = 0,
3032
) : Psi(
33+
byteBufferPool,
3134
listener,
3235
PID,
3336
TID,

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/Pes.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2323
import io.github.thibaultbee.streampack.core.elements.utils.TimeUtils
2424
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAudio
2525
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isVideo
26+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2627

2728
class Pes(
29+
byteBufferPool: ByteBufferPool,
2830
muxerListener: IMuxerInternal.IMuxerListener? = null,
2931
val stream: Stream,
3032
private val hasPcr: Boolean,
31-
) : TS(muxerListener, stream.pid) {
33+
) : TS(byteBufferPool, muxerListener, stream.pid) {
3234
fun write(frame: Frame) {
3335
val programClockReference = if (hasPcr) {
3436
TimeUtils.currentTime()
@@ -48,7 +50,13 @@ class Pes(
4850
dts = frame.dtsInUs
4951
)
5052

51-
write(frame.buffer, adaptationField.toByteBuffer(), header.toByteBuffer(), true, frame.ptsInUs)
53+
write(
54+
frame.buffer,
55+
adaptationField.toByteBuffer(),
56+
header.toByteBuffer(),
57+
true,
58+
frame.ptsInUs
59+
)
5260
}
5361

5462
enum class StreamId(val value: Short) {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/Pmt.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2424
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.Stream
2525
import io.github.thibaultbee.streampack.core.elements.utils.extensions.put
2626
import io.github.thibaultbee.streampack.core.elements.utils.extensions.putShort
27+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2728
import java.nio.ByteBuffer
2829

2930
class Pmt(
31+
byteBufferPool: ByteBufferPool,
3032
listener: IMuxerInternal.IMuxerListener? = null,
3133
private val service: Service,
3234
var streams: List<Stream>,
3335
pid: Short,
3436
versionNumber: Byte = 0,
3537
) : Psi(
38+
byteBufferPool,
3639
listener,
3740
pid,
3841
TID,

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/Psi.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets
1717

18+
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal
1819
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.CRC32
1920
import io.github.thibaultbee.streampack.core.elements.utils.extensions.put
21+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2022
import java.nio.ByteBuffer
2123

2224
open class Psi(
23-
listener: io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal.IMuxerListener? = null,
25+
byteBufferPool: ByteBufferPool,
26+
listener: IMuxerInternal.IMuxerListener? = null,
2427
pid: Short,
2528
private val tableId: Byte,
2629
private val sectionSyntaxIndicator: Boolean = false,
@@ -29,7 +32,7 @@ open class Psi(
2932
var versionNumber: Byte = 0,
3033
private val sectionNumber: Byte = 0,
3134
private val lastSectionNumber: Byte = 0,
32-
) : TS(listener, pid) {
35+
) : TS(byteBufferPool, listener, pid) {
3336
companion object {
3437
const val CRC_SIZE = 4
3538
const val PSI_HEADER_SIZE = 9 // contains pointer_field

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/Sdt.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,26 @@
1515
*/
1616
package io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets
1717

18+
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal
1819
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.ITSElement
1920
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.Service
2021
import io.github.thibaultbee.streampack.core.elements.utils.extensions.put
2122
import io.github.thibaultbee.streampack.core.elements.utils.extensions.putShort
2223
import io.github.thibaultbee.streampack.core.elements.utils.extensions.putString
24+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2325
import java.nio.ByteBuffer
2426

2527

2628
class Sdt(
27-
listener: io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal.IMuxerListener? = null,
29+
byteBufferPool: ByteBufferPool,
30+
listener: IMuxerInternal.IMuxerListener? = null,
2831
private val services: List<Service>,
2932
tsId: Short,
3033
private val originalNetworkId: Short = 0xff01.toShort(),
3134
versionNumber: Byte = 0,
3235
var packetCount: Int = 0,
3336
) : Psi(
37+
byteBufferPool = byteBufferPool,
3438
listener,
3539
PID,
3640
TID,

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/TS.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2020
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.MuxerConst
2121
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.TSOutputCallback
2222
import io.github.thibaultbee.streampack.core.elements.utils.extensions.toInt
23+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2324
import java.nio.ByteBuffer
2425
import java.security.InvalidParameterException
2526

2627
open class TS(
28+
byteBufferPool: ByteBufferPool,
2729
listener: IMuxerInternal.IMuxerListener? = null,
2830
val pid: Short,
2931
private val transportErrorIndicator: Boolean = false,
3032
private val transportPriority: Boolean = false,
3133
private val transportScramblingControl: Byte = 0, // Not scrambled
3234
private var continuityCounter: Byte = 0,
33-
) : TSOutputCallback(listener) {
35+
) : TSOutputCallback(byteBufferPool, listener) {
3436

3537
companion object {
3638
const val SYNC_BYTE: Byte = 0x47
@@ -52,7 +54,7 @@ open class TS(
5254

5355
var packetIndicator = 0
5456

55-
val buffer = ByteBuffer.allocateDirect(PACKET_SIZE * MuxerConst.MAX_OUTPUT_PACKET_NUMBER)
57+
val buffer = byteBufferPool.get(PACKET_SIZE * MuxerConst.MAX_OUTPUT_PACKET_NUMBER)
5658

5759
while (payload?.hasRemaining() == true || adaptationFieldIndicator) {
5860
buffer.limit(buffer.position() + PACKET_SIZE)

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/utils/TSOutputCallback.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@ package io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxe
1717

1818
import io.github.thibaultbee.streampack.core.elements.data.SrtPacket
1919
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.IMuxerInternal
20+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2021

21-
open class TSOutputCallback(var listener: IMuxerInternal.IMuxerListener? = null) {
22+
open class TSOutputCallback(
23+
protected val byteBufferPool: ByteBufferPool,
24+
var listener: IMuxerInternal.IMuxerListener? = null
25+
) {
2226
protected fun writePacket(packet: SrtPacket) {
2327
packet.buffer.rewind()
2428
listener?.onOutputFrame(packet)
29+
byteBufferPool.put(packet.buffer)
2530
}
2631
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ class ByteBufferPool(private val isDirect: Boolean) : IBufferPool<ByteBuffer>, C
4646
buffers.tailMap(
4747
capacity,
4848
true
49-
).values
50-
.mapNotNull(ArrayDeque<ByteBuffer>::removeFirstOrNull)
51-
.firstOrNull()
49+
).values.firstNotNullOfOrNull(ArrayDeque<ByteBuffer>::removeFirstOrNull)
5250
}
5351
return if (buffer != null) {
5452
buffer.clear().limit(capacity)

core/src/test/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/ts/packets/PesTest.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2424
import io.github.thibaultbee.streampack.core.elements.utils.FakeFrames
2525
import io.github.thibaultbee.streampack.core.elements.utils.MockUtils
2626
import io.github.thibaultbee.streampack.core.elements.utils.ResourcesUtils
27+
import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool
2728
import org.junit.Test
2829
import java.io.File
2930
import java.nio.ByteBuffer
@@ -63,6 +64,7 @@ class PesTest {
6364

6465
val expectedBuffers = readFrames(TEST_SAMPLES_DIR + "pes-video1")
6566
Pes(
67+
ByteBufferPool(true),
6668
AssertEqualsBuffersMockMuxerListener(expectedBuffers),
6769
Stream(
6870
VideoCodecConfig(
@@ -91,6 +93,7 @@ class PesTest {
9193

9294
val expectedBuffers = readFrames(TEST_SAMPLES_DIR + "pes-audio1")
9395
Pes(
96+
ByteBufferPool(true),
9497
AssertEqualsBuffersMockMuxerListener(expectedBuffers),
9598
Stream(AudioCodecConfig(), 256),
9699
true
@@ -114,6 +117,7 @@ class PesTest {
114117

115118
val expectedBuffers = readFrames(TEST_SAMPLES_DIR + "pes-audio2")
116119
Pes(
120+
ByteBufferPool(true),
117121
AssertEqualsBuffersMockMuxerListener(expectedBuffers),
118122
Stream(AudioCodecConfig(), 256),
119123
true

0 commit comments

Comments
 (0)