Skip to content

Commit 176dbba

Browse files
authored
Syncrhonizer tidying up (#842)
Changes aimed to reduce duplication a bit. Extracted 2 functions which are relatively independent, wanted to do more but that requires either having functions with a lot of params of some sort of context which doesn't make it really easier to read (high coupling with caller). Smaller tidying up changes: replacing mono with time and interface with any
1 parent 2ec7166 commit 176dbba

File tree

2 files changed

+48
-62
lines changed

2 files changed

+48
-62
lines changed

pkg/synchronizer/synchronizer.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sync"
1919
"time"
2020

21-
"github.com/livekit/protocol/utils/mono"
2221
"github.com/pion/rtcp"
2322
)
2423

@@ -318,7 +317,7 @@ func (s *Synchronizer) getExternalMediaDeadline() (time.Duration, bool) {
318317
maxDelay := s.config.MaxMediaRunningTimeDelay
319318
s.RUnlock()
320319

321-
now := mono.Now()
320+
now := time.Now()
322321

323322
if startTime.IsZero() && cb != nil {
324323
if mediaRunningTime, ok := cb(); ok {

pkg/synchronizer/track.go

Lines changed: 47 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer
127127
rtcpSenderReportRebaseEnabled: s.config.RTCPSenderReportRebaseEnabled,
128128
oldPacketThreshold: s.config.OldPacketThreshold,
129129
enableStartGate: s.config.EnableStartGate,
130-
nextPTSAdjustmentAt: mono.Now(),
130+
nextPTSAdjustmentAt: time.Now(),
131131
propagationDelayEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault),
132132
maxMediaRunningTimeDelay: s.config.MaxMediaRunningTimeDelay,
133133
lastPTSAdjustedLogBucket: math.MaxInt64,
@@ -249,15 +249,7 @@ func (t *TrackSynchronizer) GetPTS(pkt jitter.ExtPacket) (time.Duration, error)
249249
}
250250

251251
func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Duration, error) {
252-
t.Lock()
253-
sync := t.sync
254-
t.Unlock()
255-
256-
var deadline time.Duration
257-
var hasDeadline bool
258-
if sync != nil {
259-
deadline, hasDeadline = sync.getExternalMediaDeadline()
260-
}
252+
deadline, hasDeadline := t.getSynchronizerDeadline()
261253

262254
t.Lock()
263255
defer t.Unlock()
@@ -331,24 +323,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
331323
pts = estimatedPTS
332324
}
333325

334-
if t.shouldAdjustPTS(pts) {
335-
prevCurrentPTSOffset := t.currentPTSOffset
336-
if t.currentPTSOffset > t.desiredPTSOffset {
337-
t.currentPTSOffset = max(t.currentPTSOffset-t.maxDriftAdjustment, t.desiredPTSOffset)
338-
t.totalPTSAdjustmentNegative += prevCurrentPTSOffset - t.currentPTSOffset
339-
} else if t.currentPTSOffset < t.desiredPTSOffset {
340-
t.currentPTSOffset = min(t.currentPTSOffset+t.maxDriftAdjustment, t.desiredPTSOffset)
341-
t.totalPTSAdjustmentPositive += t.currentPTSOffset - prevCurrentPTSOffset
342-
}
343-
344-
// throttle further adjustment till a window proportional to this adjustment elapses
345-
throttle := time.Duration(0)
346-
if t.driftAdjustmentWindowPercent > 0.0 {
347-
throttle = time.Duration(math.Abs(float64(t.currentPTSOffset-prevCurrentPTSOffset)) * 100.0 / t.driftAdjustmentWindowPercent)
348-
}
349-
t.nextPTSAdjustmentAt = time.Now().Add(throttle)
350-
t.logPTSAdjustmentSampled(ts, pts, estimatedPTS, prevCurrentPTSOffset, throttle)
351-
}
326+
t.maybeAdjustPTSOffset(ts, pts, estimatedPTS)
352327

353328
adjusted, pts := t.normalizePTSToMediaPipelineTimeline(pts, ts, now, deadline, hasDeadline)
354329

@@ -387,17 +362,9 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
387362
}
388363

389364
func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duration, error) {
390-
// Get sync reference and deadline BEFORE main lock to prevent deadlock
365+
// Get deadline from synchronizer BEFORE main lock to prevent deadlock
391366
// with Synchronizer.End() which holds Synchronizer lock while calling into tracks
392-
t.Lock()
393-
sync := t.sync
394-
t.Unlock()
395-
396-
var deadline time.Duration
397-
var hasDeadline bool
398-
if sync != nil {
399-
deadline, hasDeadline = sync.getExternalMediaDeadline()
400-
}
367+
deadline, hasDeadline := t.getSynchronizerDeadline()
401368

402369
t.Lock()
403370
defer t.Unlock()
@@ -474,24 +441,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
474441
pts = estimatedPTS
475442
}
476443

477-
if t.shouldAdjustPTS(pts) {
478-
prevCurrentPTSOffset := t.currentPTSOffset
479-
if t.currentPTSOffset > t.desiredPTSOffset {
480-
t.currentPTSOffset = max(t.currentPTSOffset-t.maxDriftAdjustment, t.desiredPTSOffset)
481-
t.totalPTSAdjustmentNegative += prevCurrentPTSOffset - t.currentPTSOffset
482-
} else if t.currentPTSOffset < t.desiredPTSOffset {
483-
t.currentPTSOffset = min(t.currentPTSOffset+t.maxDriftAdjustment, t.desiredPTSOffset)
484-
t.totalPTSAdjustmentPositive += t.currentPTSOffset - prevCurrentPTSOffset
485-
}
486-
487-
// throttle further adjustment till a window proportional to this adjustment elapses
488-
throttle := time.Duration(0)
489-
if t.driftAdjustmentWindowPercent > 0.0 {
490-
throttle = time.Duration(math.Abs(float64(t.currentPTSOffset-prevCurrentPTSOffset)) * 100.0 / t.driftAdjustmentWindowPercent)
491-
}
492-
t.nextPTSAdjustmentAt = mono.Now().Add(throttle)
493-
t.logPTSAdjustmentSampled(ts, pts, estimatedPTS, prevCurrentPTSOffset, throttle)
494-
}
444+
t.maybeAdjustPTSOffset(ts, pts, estimatedPTS)
495445

496446
adjusted, pts := t.normalizePTSToMediaPipelineTimeline(pts, ts, now, deadline, hasDeadline)
497447

@@ -756,8 +706,8 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
756706
adjustedStartTimeNano := now - samplesDuration.Nanoseconds()
757707
requestedAdjustment := startTimeNano - adjustedStartTimeNano
758708

759-
getLoggingFields := func() []interface{} {
760-
return []interface{}{
709+
getLoggingFields := func() []any {
710+
return []any{
761711
"nowTime", time.Unix(0, now),
762712
"before", time.Unix(0, startTimeNano),
763713
"after", time.Unix(0, adjustedStartTimeNano),
@@ -837,7 +787,7 @@ func (t *TrackSynchronizer) acceptableSRDrift(drift time.Duration) bool {
837787
}
838788

839789
func (t *TrackSynchronizer) shouldAdjustPTS(newPTS time.Duration) bool {
840-
if mono.Now().Before(t.nextPTSAdjustmentAt) {
790+
if time.Now().Before(t.nextPTSAdjustmentAt) {
841791
return false
842792
}
843793

@@ -860,8 +810,31 @@ func (t *TrackSynchronizer) shouldAdjustPTS(newPTS time.Duration) bool {
860810
return adjustmentEnabled && (t.currentPTSOffset != t.desiredPTSOffset)
861811
}
862812

813+
func (t *TrackSynchronizer) maybeAdjustPTSOffset(ts uint32, pts, estimatedPTS time.Duration) {
814+
if !t.shouldAdjustPTS(pts) {
815+
return
816+
}
817+
818+
prevCurrentPTSOffset := t.currentPTSOffset
819+
if t.currentPTSOffset > t.desiredPTSOffset {
820+
t.currentPTSOffset = max(t.currentPTSOffset-t.maxDriftAdjustment, t.desiredPTSOffset)
821+
t.totalPTSAdjustmentNegative += prevCurrentPTSOffset - t.currentPTSOffset
822+
} else if t.currentPTSOffset < t.desiredPTSOffset {
823+
t.currentPTSOffset = min(t.currentPTSOffset+t.maxDriftAdjustment, t.desiredPTSOffset)
824+
t.totalPTSAdjustmentPositive += t.currentPTSOffset - prevCurrentPTSOffset
825+
}
826+
827+
// throttle further adjustment till a window proportional to this adjustment elapses
828+
throttle := time.Duration(0)
829+
if t.driftAdjustmentWindowPercent > 0.0 {
830+
throttle = time.Duration(math.Abs(float64(t.currentPTSOffset-prevCurrentPTSOffset)) * 100.0 / t.driftAdjustmentWindowPercent)
831+
}
832+
t.nextPTSAdjustmentAt = time.Now().Add(throttle)
833+
t.logPTSAdjustmentSampled(ts, pts, estimatedPTS, prevCurrentPTSOffset, throttle)
834+
}
835+
863836
func (t *TrackSynchronizer) isPacketTooOld(packetTime time.Time) bool {
864-
return t.oldPacketThreshold != 0 && mono.Now().Sub(packetTime) > t.oldPacketThreshold
837+
return t.oldPacketThreshold != 0 && time.Since(packetTime) > t.oldPacketThreshold
865838
}
866839

867840
// avoid applying small changes to start time as it will cause subsequent PTSes
@@ -886,6 +859,20 @@ func (t *TrackSynchronizer) applyQuantizedStartTimeAdvance(deltaTotal time.Durat
886859
return 0
887860
}
888861

862+
func (t *TrackSynchronizer) getSynchronizer() *Synchronizer {
863+
t.Lock()
864+
defer t.Unlock()
865+
return t.sync
866+
}
867+
868+
func (t *TrackSynchronizer) getSynchronizerDeadline() (time.Duration, bool) {
869+
sync := t.getSynchronizer()
870+
if sync == nil {
871+
return 0, false
872+
}
873+
return sync.getExternalMediaDeadline()
874+
}
875+
889876
func (t *TrackSynchronizer) updateGapHistogram(gap uint16) {
890877
if gap < 2 {
891878
return

0 commit comments

Comments
 (0)