Skip to content

Commit 2dcc660

Browse files
committed
Fix some race conditions
1 parent 7483e8d commit 2dcc660

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

receiver/receiver.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"net/http"
1717
"os"
1818
"path/filepath"
19+
"sync/atomic"
1920
"time"
2021

2122
"github.com/pion/interceptor"
@@ -181,7 +182,7 @@ type trackInfo struct {
181182
}
182183

183184
type trackStats struct {
184-
rtpPacketsReceived int
185+
rtpPacketsReceived atomic.Int64
185186
framesAssembled int
186187
keyframesReceived int
187188
startTime time.Time
@@ -297,7 +298,7 @@ func (r *Receiver) startStatsGoroutine(ctx context.Context, bytesReceivedChan ch
297298
rate := bits / delta.Seconds()
298299
mBitPerSecond := rate / float64(vnet.MBit)
299300
r.log.Infof("throughput: %.2f Mb/s | RTP packets: %d | Frames: %d | Keyframes: %d",
300-
mBitPerSecond, stats.rtpPacketsReceived, stats.framesAssembled, stats.keyframesReceived)
301+
mBitPerSecond, stats.rtpPacketsReceived.Load(), stats.framesAssembled, stats.keyframesReceived)
301302
bytesReceived = 0
302303
last = now
303304
case newBytesReceived := <-bytesReceivedChan:
@@ -351,7 +352,7 @@ func (r *Receiver) processPackets(ctx context.Context, trackRemote *webrtc.Track
351352
}
352353

353354
bytesReceivedChan <- packet.MarshalSize()
354-
stats.rtpPacketsReceived++
355+
stats.rtpPacketsReceived.Add(1)
355356

356357
r.processVP8Packet(packet, trackInfo, frameAssembler, videoWidth, videoHeight, stats)
357358
}

syncodec/statistical_codec.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ func (c *StatisticalCodec) GetTargetBitrate() int {
225225
// greater than c.rMax, bitrate will be set to c.rMax. If r is lower than
226226
// c.rMin, bitrate will be set to c.rMin.
227227
func (c *StatisticalCodec) SetTargetBitrate(r int) {
228+
c.targetBitrateLock.Lock()
229+
defer c.targetBitrateLock.Unlock()
230+
228231
if r < c.targetBitrateBps {
229232
c.targetBitrateBps = maximum(r, c.rMin)
230233

@@ -244,10 +247,14 @@ func (c *StatisticalCodec) nextFrame() Frame {
244247
}
245248
}
246249

247-
bytesPerFrame := c.targetBitrateBps / (8.0 * c.fps)
250+
c.targetBitrateLock.Lock()
251+
bps := c.targetBitrateBps
252+
c.targetBitrateLock.Unlock()
253+
254+
bytesPerFrame := bps / (8.0 * c.fps)
248255

249256
if c.remainingBurstFrames > 0 {
250-
size := (c.targetBitrateBps * c.burstFrameCount) / (c.burstFrameSize + (c.burstFrameCount - 1))
257+
size := (bps * c.burstFrameCount) / (c.burstFrameSize + (c.burstFrameCount - 1))
251258

252259
return Frame{
253260
Content: make([]byte, size),

vnet/main.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,7 @@ func (r *Runner) runVariableAvailableCapacitySingleFlow() error {
162162
if err != nil {
163163
return fmt.Errorf("setup simple flow: %w", err)
164164
}
165-
defer func(flow Flow) {
166-
err = flow.Close()
167-
if err != nil {
168-
r.logger.Errorf("flow close: %v", err)
169-
}
170-
}(flow)
165+
defer flow.Close()
171166

172167
ctx, cancel := context.WithCancel(context.Background())
173168
defer cancel()

0 commit comments

Comments
 (0)