From 84e3ce17bdb2381eb497db07948f5d19a5044ae3 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Fri, 28 Nov 2025 16:26:52 +0530 Subject: [PATCH] Use OWD estimator from mediatransportutil. --- go.mod | 2 +- go.sum | 4 +- pkg/synchronizer/owd_estimator.go | 196 ------------------------------ pkg/synchronizer/track.go | 5 +- 4 files changed, 6 insertions(+), 201 deletions(-) delete mode 100644 pkg/synchronizer/owd_estimator.go diff --git a/go.mod b/go.mod index d5af8064..efdc06c7 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20251106223430-dd8f5e0de2cf - github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 + github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22 github.com/livekit/protocol v1.43.1-0.20251111125113-f20b33cc16b9 github.com/magefile/mage v1.15.0 github.com/pion/dtls/v3 v3.0.7 diff --git a/go.sum b/go.sum index dcc12d15..9ff3e365 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/media-sdk v0.0.0-20251106223430-dd8f5e0de2cf h1:gvun6axx2Mrh8+NvuQBLQUoiG5MBubI435XoX68VGL8= github.com/livekit/media-sdk v0.0.0-20251106223430-dd8f5e0de2cf/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8= -github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo= -github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= +github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22 h1:dzCBxOGLLWVtQhL7OYK2EGN+5Q+23Mq/jfz4vQisirA= +github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= github.com/livekit/protocol v1.43.1-0.20251111125113-f20b33cc16b9 h1:ECXv1c9S/1qXytKfDGe5m5AJagEbSgea/ZE0DspkwVY= github.com/livekit/protocol v1.43.1-0.20251111125113-f20b33cc16b9/go.mod h1:TpqU2qCI1ES4Lk7PAWSgYO4RaexfVXb54ZO2hXv0Bmc= github.com/livekit/psrpc v0.7.1-0.20251105165826-1016ad610a7e h1:K6GKMFGJW8U2RoEyi8Zar7pPwUB7RMsn3AEJXf/Mgfc= diff --git a/pkg/synchronizer/owd_estimator.go b/pkg/synchronizer/owd_estimator.go deleted file mode 100644 index bdd95ee7..00000000 --- a/pkg/synchronizer/owd_estimator.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package synchronizer - -import ( - "time" - - "go.uber.org/zap/zapcore" -) - -type OWDEstimatorParams struct { - PropagationDelayFallFactor float64 - PropagationDelayRiseFactor float64 - - PropagationDelaySpikeAdaptationFactor float64 - - PropagationDelayDeltaThresholdMin time.Duration - PropagationDelayDeltaThresholdMaxFactor int64 - PropagationDelayDeltaHighResetNumReports int - PropagationDelayDeltaHighResetWait time.Duration - PropagationDelayDeltaLongTermAdaptationThreshold time.Duration -} - -var OWDEstimatorParamsDefault = OWDEstimatorParams{ - // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver. - // As they operate on different clock domains, it is not possible to get exact propagation delay easily. - // So, this module is an estimator using a simple approach explained below. It should not be used for - // things that require high accuracy. - // - // One example is RTCP Sender Reports getting re-based to SFU time base so that all subscriber side - // can have the same time base (i. e. SFU time base). To convert publisher side - // RTCP Sender Reports to SFU timebase, a propagation delay is maintained. - // propagation_delay = time_of_report_reception - ntp_timestamp_in_report - // - // Propagation delay is adapted continuously. If it falls, adapt quickly to the - // lower value as that could be the real propagation delay. If it rises, adapt slowly - // as it might be a temporary change or slow drift. See below for handling of high deltas - // which could be a result of a path change. - PropagationDelayFallFactor: 0.9, - PropagationDelayRiseFactor: 0.1, - - PropagationDelaySpikeAdaptationFactor: 0.5, - - // To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset. - // Reset at whichever of the below happens later. - // 1. 10 seconds of persistent high delta. - // 2. at least 2 consecutive reports with high delta. - // - // A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding - // a factor of the long term estimate is considered a sharp increase. That will trigger the start of the - // path change condition and if it persists, propagation delay will be reset. - PropagationDelayDeltaThresholdMin: 10 * time.Millisecond, - PropagationDelayDeltaThresholdMaxFactor: 2, - PropagationDelayDeltaHighResetNumReports: 2, - PropagationDelayDeltaHighResetWait: 10 * time.Second, - PropagationDelayDeltaLongTermAdaptationThreshold: 50 * time.Millisecond, -} - -type OWDEstimator struct { - params OWDEstimatorParams - - initialized bool - initialAdjustmentDone bool - lastSenderClockTimeNs int64 - lastPropagationDelayNs int64 - lastDeltaPropagationDelayNs int64 - estimatedPropagationDelayNs int64 - longTermDeltaPropagationDelayNs int64 - propagationDelayDeltaHighCount int - propagationDelayDeltaHighStartTime time.Time - propagationDelaySpikeNs int64 -} - -func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator { - return &OWDEstimator{ - params: params, - } -} - -func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error { - if o != nil { - e.AddTime("lastSenderClockTimeNs", time.Unix(0, o.lastSenderClockTimeNs)) - e.AddDuration("lastPropagationDelayNs", time.Duration(o.lastPropagationDelayNs)) - e.AddDuration("lastDeltaPropagationDelayNs", time.Duration(o.lastDeltaPropagationDelayNs)) - e.AddDuration("estimatedPropagationDelayNs", time.Duration(o.estimatedPropagationDelayNs)) - e.AddDuration("longTermDeltaPropagationDelayNs", time.Duration(o.longTermDeltaPropagationDelayNs)) - e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount) - e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime) - e.AddDuration("propagationDelaySpikeNs", time.Duration(o.propagationDelaySpikeNs)) - } - return nil -} - -func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64) (int64, bool) { - resetDelta := func() { - o.propagationDelayDeltaHighCount = 0 - o.propagationDelayDeltaHighStartTime = time.Time{} - o.propagationDelaySpikeNs = 0 - } - - initPropagationDelay := func(pd int64) { - o.estimatedPropagationDelayNs = pd - o.longTermDeltaPropagationDelayNs = 0 - resetDelta() - } - - o.lastPropagationDelayNs = receiverClockTimeNs - senderClockTimeNs - if !o.initialized { - o.initialized = true - o.lastSenderClockTimeNs = senderClockTimeNs - initPropagationDelay(o.lastPropagationDelayNs) - return o.estimatedPropagationDelayNs, true - } - - stepChange := false - o.lastDeltaPropagationDelayNs = o.lastPropagationDelayNs - o.estimatedPropagationDelayNs - // check for path changes, i. e. a step jump increase in propagation delay observed over time - if o.lastDeltaPropagationDelayNs > o.params.PropagationDelayDeltaThresholdMin.Nanoseconds() { // ignore small changes for path change consideration - if o.longTermDeltaPropagationDelayNs != 0 && - o.lastDeltaPropagationDelayNs > o.longTermDeltaPropagationDelayNs*o.params.PropagationDelayDeltaThresholdMaxFactor { - o.propagationDelayDeltaHighCount++ - if o.propagationDelayDeltaHighStartTime.IsZero() { - o.propagationDelayDeltaHighStartTime = time.Now() - } - if o.propagationDelaySpikeNs == 0 { - o.propagationDelaySpikeNs = o.lastPropagationDelayNs - } else { - o.propagationDelaySpikeNs += int64(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelayNs-o.propagationDelaySpikeNs)) - } - - if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait { - stepChange = true - initPropagationDelay(o.propagationDelaySpikeNs) - } - } else { - resetDelta() - } - } else { - resetDelta() - - factor := o.params.PropagationDelayFallFactor - if o.lastPropagationDelayNs > o.estimatedPropagationDelayNs { - factor = o.params.PropagationDelayRiseFactor - } - o.estimatedPropagationDelayNs += int64(factor * float64(o.lastPropagationDelayNs-o.estimatedPropagationDelayNs)) - } - - if o.lastDeltaPropagationDelayNs < o.params.PropagationDelayDeltaLongTermAdaptationThreshold.Nanoseconds() { - if o.longTermDeltaPropagationDelayNs == 0 { - o.longTermDeltaPropagationDelayNs = o.lastDeltaPropagationDelayNs - } else { - // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late - // if the spike is in fact a path change, it will persist and handled by path change detection above - sinceLast := senderClockTimeNs - o.lastSenderClockTimeNs - adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait)) - o.longTermDeltaPropagationDelayNs += int64(adaptationFactor * float64(o.lastDeltaPropagationDelayNs-o.longTermDeltaPropagationDelayNs)) - } - } - if o.longTermDeltaPropagationDelayNs < 0 { - o.longTermDeltaPropagationDelayNs = 0 - } - o.lastSenderClockTimeNs = senderClockTimeNs - return o.estimatedPropagationDelayNs, stepChange -} - -func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 { - if o.initialAdjustmentDone { - return o.estimatedPropagationDelayNs - } - - o.initialAdjustmentDone = true - // one time adjustment at init - // example: when this is used to measure one-way-delay of RTCP sender reports, - // it is possible that the first sender report is delayed and experiences more - // than existing propagation delay. This allows adjustment of initial estimate. - if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs { - o.estimatedPropagationDelayNs += adjustmentNs - } - return o.estimatedPropagationDelayNs -} - -func (o *OWDEstimator) EstimatedPropagationDelay() int64 { - return o.estimatedPropagationDelayNs -} diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 0bc26e17..9d38b58c 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -28,6 +28,7 @@ import ( "github.com/livekit/media-sdk/jitter" "github.com/livekit/mediatransportutil" + "github.com/livekit/mediatransportutil/pkg/latency" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" "github.com/livekit/protocol/utils/rtputil" @@ -101,7 +102,7 @@ type TrackSynchronizer struct { nextPTSAdjustmentAt time.Time - propagationDelayEstimator *OWDEstimator + propagationDelayEstimator *latency.OWDEstimator totalStartTimeAdjustment time.Duration startTimeAdjustResidual time.Duration initialized bool @@ -127,7 +128,7 @@ func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer oldPacketThreshold: s.config.OldPacketThreshold, enableStartGate: s.config.EnableStartGate, nextPTSAdjustmentAt: mono.Now(), - propagationDelayEstimator: NewOWDEstimator(OWDEstimatorParamsDefault), + propagationDelayEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault), maxMediaRunningTimeDelay: s.config.MaxMediaRunningTimeDelay, lastPTSAdjustedLogBucket: math.MaxInt64, }