Skip to content

Commit 913e4a7

Browse files
committed
make all metrics atomics
1 parent 6d45da3 commit 913e4a7

File tree

10 files changed

+67
-67
lines changed

10 files changed

+67
-67
lines changed

common/src/main/java/com/pedro/common/StreamBlockingQueue.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.util.concurrent.PriorityBlockingQueue
55
import java.util.concurrent.atomic.AtomicBoolean
66
import kotlin.math.max
77

8-
class StreamBlockingQueue(private val size: Int) {
8+
class StreamBlockingQueue(var size: Int) {
99

1010
private val queue = PriorityBlockingQueue<MediaFrame>(size) { p0, p1 ->
1111
p0.info.timestamp.compare(p1.info.timestamp)

common/src/main/java/com/pedro/common/base/BaseSender.kt

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import kotlinx.coroutines.delay
1414
import kotlinx.coroutines.isActive
1515
import kotlinx.coroutines.launch
1616
import java.nio.ByteBuffer
17+
import java.util.concurrent.atomic.AtomicLong
1718

1819
abstract class BaseSender(
1920
protected val connectChecker: ConnectChecker,
@@ -23,23 +24,21 @@ abstract class BaseSender(
2324
@Volatile
2425
protected var running = false
2526
private var cacheSize = 400
26-
@Volatile
27-
protected var queue = StreamBlockingQueue(cacheSize)
28-
protected var audioFramesSent: Long = 0
29-
protected var videoFramesSent: Long = 0
30-
var droppedAudioFrames: Long = 0
31-
protected set
32-
var droppedVideoFrames: Long = 0
33-
protected set
27+
28+
protected val queue = StreamBlockingQueue(cacheSize)
29+
30+
protected val audioFramesSent = AtomicLong(0)
31+
protected val videoFramesSent = AtomicLong(0)
32+
private val droppedAudioFrames = AtomicLong(0)
33+
private val droppedVideoFrames = AtomicLong(0)
34+
3435
private val bitrateManager: BitrateManager = BitrateManager(connectChecker)
3536
protected var isEnableLogs = true
3637
private var job: Job? = null
3738
protected val scope = CoroutineScope(Dispatchers.IO)
38-
@Volatile
39-
var bytesSend = 0L
40-
protected set
41-
@Volatile
42-
protected var bytesSendPerSecond = 0L
39+
40+
protected val bytesSend = AtomicLong(0)
41+
protected val bytesSendPerSecond = AtomicLong(0)
4342

4443
abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?)
4544
abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean)
@@ -51,11 +50,11 @@ abstract class BaseSender(
5150
when (mediaFrame.type) {
5251
MediaFrame.Type.VIDEO -> {
5352
Log.i(TAG, "Video frame discarded")
54-
droppedVideoFrames++
53+
droppedVideoFrames.incrementAndGet()
5554
}
5655
MediaFrame.Type.AUDIO -> {
5756
Log.i(TAG, "Audio frame discarded")
58-
droppedAudioFrames++
57+
droppedAudioFrames.incrementAndGet()
5958
}
6059
}
6160
}
@@ -69,8 +68,8 @@ abstract class BaseSender(
6968
val bitrateTask = async {
7069
while (scope.isActive && running) {
7170
//bytes to bits
72-
bitrateManager.calculateBitrate(bytesSendPerSecond * 8)
73-
bytesSendPerSecond = 0
71+
bitrateManager.calculateBitrate(bytesSendPerSecond.get() * 8)
72+
bytesSendPerSecond.set(0)
7473
delay(timeMillis = 1000)
7574
}
7675
}
@@ -104,9 +103,7 @@ abstract class BaseSender(
104103
if (newSize < queue.getSize() - queue.remainingCapacity()) {
105104
throw RuntimeException("Can't fit current cache inside new cache size")
106105
}
107-
val tempQueue = StreamBlockingQueue(newSize)
108-
queue.drainTo(tempQueue)
109-
queue = tempQueue
106+
queue.size = newSize
110107
}
111108

112109
fun getCacheSize(): Int = cacheSize
@@ -117,24 +114,30 @@ abstract class BaseSender(
117114
queue.clear()
118115
}
119116

120-
fun getSentAudioFrames(): Long = audioFramesSent
117+
fun getSentAudioFrames(): Long = audioFramesSent.get()
118+
119+
fun getSentVideoFrames(): Long = videoFramesSent.get()
120+
121+
fun getDroppedAudioFrames(): Long = droppedAudioFrames.get()
122+
123+
fun getDroppedVideoFrames(): Long = droppedVideoFrames.get()
121124

122-
fun getSentVideoFrames(): Long = videoFramesSent
125+
fun getBytesSend(): Long = bytesSend.get()
123126

124127
fun resetSentAudioFrames() {
125-
audioFramesSent = 0
128+
audioFramesSent.set(0)
126129
}
127130

128131
fun resetSentVideoFrames() {
129-
videoFramesSent = 0
132+
videoFramesSent.set(0)
130133
}
131134

132135
fun resetDroppedAudioFrames() {
133-
droppedAudioFrames = 0
136+
droppedAudioFrames.set(0)
134137
}
135138

136139
fun resetDroppedVideoFrames() {
137-
droppedVideoFrames = 0
140+
droppedVideoFrames.set(0)
138141
}
139142

140143
fun setLogs(enable: Boolean) {
@@ -152,6 +155,6 @@ abstract class BaseSender(
152155
}
153156

154157
fun resetBytesSend() {
155-
bytesSend = 0
158+
bytesSend.set(0)
156159
}
157160
}

rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
9494
private var publishPermitted = false
9595

9696
val droppedAudioFrames: Long
97-
get() = rtmpSender.droppedAudioFrames
97+
get() = rtmpSender.getDroppedAudioFrames()
9898
val droppedVideoFrames: Long
99-
get() = rtmpSender.droppedVideoFrames
99+
get() = rtmpSender.getDroppedVideoFrames()
100100

101101
val cacheSize: Int
102102
get() = rtmpSender.getCacheSize()
@@ -105,7 +105,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
105105
val sentVideoFrames: Long
106106
get() = rtmpSender.getSentVideoFrames()
107107
val bytesSend: Long
108-
get() = rtmpSender.bytesSend
108+
get() = rtmpSender.getBytesSend()
109109
var socketType = SocketType.KTOR
110110
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
111111
var shouldFailOnRead = false

rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,26 @@ class RtmpSender(
7979
val error = runCatching {
8080
val mediaFrame = runInterruptible { queue.take() }
8181
getFlvPacket(mediaFrame) { flvPacket ->
82-
var size = 0
82+
var size = 0L
8383
if (flvPacket.type == FlvType.VIDEO) {
84-
videoFramesSent++
84+
videoFramesSent.incrementAndGet()
8585
socket?.let { socket ->
86-
size = commandsManager.sendVideoPacket(flvPacket, socket)
86+
size = commandsManager.sendVideoPacket(flvPacket, socket).toLong()
8787
if (isEnableLogs) {
8888
Log.i(TAG, "wrote Video packet, size $size")
8989
}
9090
}
9191
} else {
92-
audioFramesSent++
92+
audioFramesSent.incrementAndGet()
9393
socket?.let { socket ->
94-
size = commandsManager.sendAudioPacket(flvPacket, socket)
94+
size = commandsManager.sendAudioPacket(flvPacket, socket).toLong()
9595
if (isEnableLogs) {
9696
Log.i(TAG, "wrote Audio packet, size $size")
9797
}
9898
}
9999
}
100-
bytesSend += size
101-
bytesSendPerSecond += size
100+
bytesSend.addAndGet(size)
101+
bytesSendPerSecond.addAndGet(size)
102102
}
103103
}.exceptionOrNull()
104104
if (error != null) {

rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspClient.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ class RtspClient(private val connectChecker: ConnectChecker) {
8181
private var checkServerAlive = false
8282

8383
val droppedAudioFrames: Long
84-
get() = rtspSender.droppedAudioFrames
84+
get() = rtspSender.getDroppedAudioFrames()
8585
val droppedVideoFrames: Long
86-
get() = rtspSender.droppedVideoFrames
86+
get() = rtspSender.getDroppedVideoFrames()
8787

8888
val cacheSize: Int
8989
get() = rtspSender.getCacheSize()
@@ -92,7 +92,7 @@ class RtspClient(private val connectChecker: ConnectChecker) {
9292
val sentVideoFrames: Long
9393
get() = rtspSender.getSentVideoFrames()
9494
val bytesSend: Long
95-
get() = rtspSender.bytesSend
95+
get() = rtspSender.getBytesSend()
9696
var socketType = SocketType.KTOR
9797
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
9898

rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,26 +107,23 @@ class RtspSender(
107107
val error = runCatching {
108108
val mediaFrame = runInterruptible { queue.take() }
109109
getRtpPackets(mediaFrame) { rtpFrames ->
110-
var size = 0
110+
var size = 0L
111111
var isVideo = false
112112
rtpFrames.forEach { rtpFrame ->
113113
rtpSocket?.sendFrame(rtpFrame)
114114
//4 is tcp header length
115-
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
116-
bytesSend += packetSize
117-
bytesSendPerSecond += packetSize
115+
val packetSize = (if (isTcp) rtpFrame.length + 4 else rtpFrame.length).toLong()
116+
bytesSend.addAndGet(packetSize)
117+
bytesSendPerSecond.addAndGet(packetSize)
118118
size += packetSize
119119
isVideo = rtpFrame.isVideoFrame()
120-
if (isVideo) {
121-
videoFramesSent++
122-
} else {
123-
audioFramesSent++
124-
}
120+
if (isVideo) videoFramesSent.incrementAndGet()
121+
else audioFramesSent.incrementAndGet()
125122
if (baseSenderReport?.update(rtpFrame) == true) {
126123
//4 is tcp header length
127-
val reportSize = if (isTcp) RtpConstants.REPORT_PACKET_LENGTH + 4 else RtpConstants.REPORT_PACKET_LENGTH
128-
bytesSend += reportSize
129-
bytesSendPerSecond += reportSize
124+
val reportSize = (if (isTcp) RtpConstants.REPORT_PACKET_LENGTH + 4 else RtpConstants.REPORT_PACKET_LENGTH).toLong()
125+
bytesSend.addAndGet(reportSize)
126+
bytesSendPerSecond.addAndGet(reportSize)
130127
if (isEnableLogs) Log.i(TAG, "wrote report")
131128
}
132129
}

srt/src/main/java/com/pedro/srt/srt/SrtClient.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ class SrtClient(private val connectChecker: ConnectChecker) {
8989
private var reTries = 0
9090

9191
val droppedAudioFrames: Long
92-
get() = srtSender.droppedAudioFrames
92+
get() = srtSender.getDroppedAudioFrames()
9393
val droppedVideoFrames: Long
94-
get() = srtSender.droppedVideoFrames
94+
get() = srtSender.getDroppedVideoFrames()
9595

9696
val cacheSize: Int
9797
get() = srtSender.getCacheSize()
@@ -100,7 +100,7 @@ class SrtClient(private val connectChecker: ConnectChecker) {
100100
val sentVideoFrames: Long
101101
get() = srtSender.getSentVideoFrames()
102102
val bytesSend: Long
103-
get() = srtSender.bytesSend
103+
get() = srtSender.getBytesSend()
104104
var rtt = 0 //in micro
105105
private set
106106
var packetsLost = 0

srt/src/main/java/com/pedro/srt/srt/SrtSender.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ class SrtSender(
127127
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer, chunkSize)
128128
val bytesPsi = sendPackets(psiPackets, MpegType.PSI)
129129
val bytes = sendPackets(mpegTsPackets, mpegTsPackets[0].type)
130-
bytesSend += bytesPsi + bytes
131-
bytesSendPerSecond += bytesPsi + bytes
130+
bytesSend.addAndGet(bytesPsi + bytes)
131+
bytesSendPerSecond.addAndGet(bytesPsi + bytes)
132132
}
133133
}.exceptionOrNull()
134134
if (error != null) {
@@ -158,8 +158,8 @@ class SrtSender(
158158
size += commandsManager.writeData(mpegTsPacket, socket)
159159
bytesSend += size
160160
}
161-
if (type == MpegType.VIDEO) videoFramesSent++
162-
else if (type == MpegType.AUDIO) audioFramesSent++
161+
if (type == MpegType.VIDEO) videoFramesSent.incrementAndGet()
162+
else if (type == MpegType.AUDIO) audioFramesSent.incrementAndGet()
163163
if (isEnableLogs) {
164164
Log.i(TAG, "wrote ${type.name} packet, size $bytesSend")
165165
}

udp/src/main/java/com/pedro/udp/UdpClient.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ class UdpClient(private val connectChecker: ConnectChecker) {
6767
private var reTries = 0
6868

6969
val droppedAudioFrames: Long
70-
get() = udpSender.droppedAudioFrames
70+
get() = udpSender.getDroppedAudioFrames()
7171
val droppedVideoFrames: Long
72-
get() = udpSender.droppedVideoFrames
72+
get() = udpSender.getDroppedVideoFrames()
7373

7474
val cacheSize: Int
7575
get() = udpSender.getCacheSize()
@@ -78,7 +78,7 @@ class UdpClient(private val connectChecker: ConnectChecker) {
7878
val sentVideoFrames: Long
7979
get() = udpSender.getSentVideoFrames()
8080
val bytesSend: Long
81-
get() = udpSender.bytesSend
81+
get() = udpSender.getBytesSend()
8282
var socketType = SocketType.KTOR
8383
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
8484

udp/src/main/java/com/pedro/udp/UdpSender.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ class UdpSender(
109109
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer, chunkSize)
110110
val bytesPsi = sendPackets(psiPackets, MpegType.PSI)
111111
val bytes = sendPackets(mpegTsPackets, mpegTsPackets[0].type)
112-
bytesSend += bytesPsi + bytes
113-
bytesSendPerSecond += bytesPsi + bytes
112+
bytesSend.addAndGet(bytesPsi + bytes)
113+
bytesSendPerSecond.addAndGet(bytesPsi + bytes)
114114
}
115115
}.exceptionOrNull()
116116
if (error != null) {
@@ -140,8 +140,8 @@ class UdpSender(
140140
size += commandManager.writeData(mpegTsPacket, socket)
141141
bytesSend += size
142142
}
143-
if (type == MpegType.VIDEO) videoFramesSent++
144-
else if (type == MpegType.AUDIO) audioFramesSent++
143+
if (type == MpegType.VIDEO) videoFramesSent.incrementAndGet()
144+
else if (type == MpegType.AUDIO) audioFramesSent.incrementAndGet()
145145
if (isEnableLogs) {
146146
Log.i(TAG, "wrote ${type.name} packet, size $bytesSend")
147147
}

0 commit comments

Comments
 (0)