Skip to content

Commit e030b05

Browse files
committed
Clean up delay based bwe and use less goroutines
Reduces the number of goroutines that is spawned for a process that always runs sequentially anyway. Additionally cleans up and moves some code to more appropriate files.
1 parent 9d4ff17 commit e030b05

13 files changed

+348
-332
lines changed

pkg/gcc/arrival_group_accumulator.go

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,49 @@ func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
2020
}
2121
}
2222

23-
func (a *arrivalGroupAccumulator) run(in <-chan cc.Acknowledgment) <-chan arrivalGroup {
24-
out := make(chan arrivalGroup)
25-
go func() {
26-
init := false
27-
group := arrivalGroup{}
28-
for next := range in {
29-
if !init {
23+
func (a *arrivalGroupAccumulator) run(in <-chan cc.Acknowledgment, agWriter func(arrivalGroup)) {
24+
init := false
25+
group := arrivalGroup{}
26+
for next := range in {
27+
if !init {
28+
group.add(next)
29+
init = true
30+
continue
31+
}
32+
if next.Arrival.Before(group.arrival) {
33+
// ignore out of order arrivals
34+
continue
35+
}
36+
if next.Departure.After(group.departure) {
37+
if interDepartureTimePkt(group, next) <= a.interDepartureThreshold {
3038
group.add(next)
31-
init = true
32-
continue
33-
}
34-
if next.Arrival.Before(group.arrival) {
35-
// ignore out of order arrivals
3639
continue
3740
}
38-
if next.Departure.After(group.departure) {
39-
if interDepartureTimePkt(group, next) <= a.interDepartureThreshold {
40-
group.add(next)
41-
continue
42-
}
43-
44-
if interArrivalTimePkt(group, next) <= a.interArrivalThreshold &&
45-
interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold {
46-
group.add(next)
47-
continue
48-
}
49-
50-
out <- group
51-
group = arrivalGroup{}
41+
42+
if interArrivalTimePkt(group, next) <= a.interArrivalThreshold &&
43+
interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold {
5244
group.add(next)
45+
continue
5346
}
47+
48+
agWriter(group)
49+
group = arrivalGroup{}
50+
group.add(next)
5451
}
55-
close(out)
56-
}()
57-
return out
52+
}
53+
}
54+
55+
func interArrivalTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
56+
return b.Arrival.Sub(a.arrival)
57+
}
58+
59+
func interDepartureTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
60+
if len(a.packets) == 0 {
61+
return 0
62+
}
63+
return b.Departure.Sub(a.packets[len(a.packets)-1].Departure)
64+
}
65+
66+
func interGroupDelayVariationPkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
67+
return b.Arrival.Sub(a.arrival) - b.Departure.Sub(a.departure)
5868
}

pkg/gcc/arrival_group_accumulator_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,13 @@ func TestArrivalGroupAccumulator(t *testing.T) {
160160
t.Run(tc.name, func(t *testing.T) {
161161
aga := newArrivalGroupAccumulator()
162162
in := make(chan cc.Acknowledgment)
163-
out := aga.run(in)
163+
out := make(chan arrivalGroup)
164+
go func() {
165+
defer close(out)
166+
aga.run(in, func(ag arrivalGroup) {
167+
out <- ag
168+
})
169+
}()
164170
go func() {
165171
for _, as := range tc.log {
166172
in <- as

pkg/gcc/delay_based_bwe.go

Lines changed: 33 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ import (
77
"github.com/pion/interceptor/internal/cc"
88
)
99

10-
const (
11-
decreaseEMAAlpha = 0.95
12-
beta = 0.85
13-
)
14-
1510
// DelayStats contains some internal statistics of the delay based congestion
1611
// controller
1712
type DelayStats struct {
@@ -26,23 +21,15 @@ type DelayStats struct {
2621
RTT time.Duration
2722
}
2823

29-
type estimator interface {
30-
updateEstimate(measurement time.Duration) time.Duration
31-
}
32-
33-
type estimatorFunc func(time.Duration) time.Duration
34-
35-
func (f estimatorFunc) updateEstimate(d time.Duration) time.Duration {
36-
return f(d)
37-
}
38-
3924
type now func() time.Time
4025

4126
type delayController struct {
4227
ackPipe chan<- cc.Acknowledgment
4328
ackRatePipe chan<- cc.Acknowledgment
4429
ackRTTPipe chan<- []cc.Acknowledgment
4530

31+
*arrivalGroupAccumulator
32+
4633
onUpdateCallback func(DelayStats)
4734

4835
wg sync.WaitGroup
@@ -61,28 +48,39 @@ func newDelayController(c delayControllerConfig) *delayController {
6148
ackRTTPipe := make(chan []cc.Acknowledgment)
6249

6350
delayController := &delayController{
64-
ackPipe: ackPipe,
65-
ackRatePipe: ackRatePipe,
66-
ackRTTPipe: ackRTTPipe,
67-
wg: sync.WaitGroup{},
51+
ackPipe: ackPipe,
52+
ackRatePipe: ackRatePipe,
53+
ackRTTPipe: ackRTTPipe,
54+
arrivalGroupAccumulator: nil,
55+
onUpdateCallback: nil,
56+
wg: sync.WaitGroup{},
6857
}
6958

59+
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, func(ds DelayStats) {
60+
if delayController.onUpdateCallback != nil {
61+
delayController.onUpdateCallback(ds)
62+
}
63+
})
64+
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats)
65+
slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats)
66+
arrivalGroupAccumulator := newArrivalGroupAccumulator()
67+
7068
rc := newRateCalculator(500 * time.Millisecond)
7169
re := newRTTEstimator()
7270

73-
reaceivedRate := rc.run(ackRatePipe)
74-
rtt := re.run(ackRTTPipe)
75-
76-
arrivalGroupAccumulator := newArrivalGroupAccumulator()
77-
slopeEstimator := newSlopeEstimator(newKalman())
78-
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond)
79-
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate)
80-
81-
arrival := arrivalGroupAccumulator.run(ackPipe)
82-
estimate := slopeEstimator.run(arrival)
83-
state := overuseDetector.run(estimate)
84-
delayStats := rateController.run(state, reaceivedRate, rtt)
85-
delayController.loop(delayStats)
71+
delayController.wg.Add(3)
72+
go func() {
73+
defer delayController.wg.Done()
74+
arrivalGroupAccumulator.run(ackPipe, slopeEstimator.onArrivalGroup)
75+
}()
76+
go func() {
77+
defer delayController.wg.Done()
78+
rc.run(ackRatePipe, rateController.onReceivedRate)
79+
}()
80+
go func() {
81+
defer delayController.wg.Done()
82+
re.run(ackRTTPipe, rateController.onRTT)
83+
}()
8684

8785
return delayController
8886
}
@@ -99,42 +97,12 @@ func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) {
9997
d.ackRTTPipe <- acks
10098
}
10199

102-
func (d *delayController) loop(in chan DelayStats) {
103-
d.wg.Add(1)
104-
defer d.wg.Done()
105-
106-
go func() {
107-
for next := range in {
108-
if d.onUpdateCallback != nil {
109-
d.onUpdateCallback(next)
110-
}
111-
}
112-
}()
113-
}
114-
115100
func (d *delayController) Close() error {
101+
defer d.wg.Wait()
102+
116103
close(d.ackPipe)
117104
close(d.ackRTTPipe)
118105
close(d.ackRatePipe)
119-
d.wg.Wait()
120-
return nil
121-
}
122-
123-
func interArrivalTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
124-
return b.Arrival.Sub(a.arrival)
125-
}
126106

127-
func interDepartureTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
128-
if len(a.packets) == 0 {
129-
return 0
130-
}
131-
return b.Departure.Sub(a.packets[len(a.packets)-1].Departure)
132-
}
133-
134-
func interGroupDelayVariationPkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
135-
return b.Arrival.Sub(a.arrival) - b.Departure.Sub(a.departure)
136-
}
137-
138-
func interGroupDelayVariation(a, b arrivalGroup) time.Duration {
139-
return b.arrival.Sub(a.arrival) - b.departure.Sub(a.departure)
107+
return nil
140108
}

pkg/gcc/overuse_detector.go

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,69 +11,69 @@ type threshold interface {
1111
type overuseDetector struct {
1212
threshold threshold
1313
overuseTime time.Duration
14+
15+
dsWriter func(DelayStats)
16+
17+
lastEstimate time.Duration
18+
lastUpdate time.Time
19+
increasingDuration time.Duration
20+
increasingCounter int
1421
}
1522

16-
func newOveruseDetector(thresh threshold, overuseTime time.Duration) *overuseDetector {
23+
func newOveruseDetector(thresh threshold, overuseTime time.Duration, dsw func(DelayStats)) *overuseDetector {
1724
return &overuseDetector{
18-
threshold: thresh,
19-
overuseTime: overuseTime,
25+
threshold: thresh,
26+
overuseTime: overuseTime,
27+
dsWriter: dsw,
28+
lastEstimate: 0,
29+
lastUpdate: time.Now(),
30+
increasingDuration: 0,
31+
increasingCounter: 0,
2032
}
2133
}
2234

23-
func (d *overuseDetector) run(in <-chan DelayStats) <-chan DelayStats {
24-
out := make(chan DelayStats)
25-
go func() {
26-
lastEstimate := 0 * time.Millisecond
27-
lastUpdate := time.Now()
28-
var increasingDuration time.Duration
29-
var increasingCounter int
30-
31-
for next := range in {
32-
now := time.Now()
33-
delta := now.Sub(lastUpdate)
34-
lastUpdate = now
35+
func (d *overuseDetector) onDelayStats(ds DelayStats) {
36+
now := time.Now()
37+
delta := now.Sub(d.lastUpdate)
38+
d.lastUpdate = now
3539

36-
thresholdUse, estimate, currentThreshold := d.threshold.compare(next.Estimate, next.lastReceiveDelta)
40+
thresholdUse, estimate, currentThreshold := d.threshold.compare(ds.Estimate, ds.lastReceiveDelta)
3741

38-
use := usageNormal
39-
if thresholdUse == usageOver {
40-
if increasingDuration == 0 {
41-
increasingDuration = delta / 2
42-
} else {
43-
increasingDuration += delta
44-
}
45-
increasingCounter++
46-
if increasingDuration > d.overuseTime && increasingCounter > 1 {
47-
if estimate > lastEstimate {
48-
use = usageOver
49-
}
50-
}
51-
}
52-
if thresholdUse == usageUnder {
53-
increasingCounter = 0
54-
increasingDuration = 0
55-
use = usageUnder
42+
use := usageNormal
43+
if thresholdUse == usageOver {
44+
if d.increasingDuration == 0 {
45+
d.increasingDuration = delta / 2
46+
} else {
47+
d.increasingDuration += delta
48+
}
49+
d.increasingCounter++
50+
if d.increasingDuration > d.overuseTime && d.increasingCounter > 1 {
51+
if estimate > d.lastEstimate {
52+
use = usageOver
5653
}
54+
}
55+
}
56+
if thresholdUse == usageUnder {
57+
d.increasingCounter = 0
58+
d.increasingDuration = 0
59+
use = usageUnder
60+
}
5761

58-
if thresholdUse == usageNormal {
59-
increasingDuration = 0
60-
increasingCounter = 0
61-
use = usageNormal
62-
}
63-
lastEstimate = estimate
62+
if thresholdUse == usageNormal {
63+
d.increasingDuration = 0
64+
d.increasingCounter = 0
65+
use = usageNormal
66+
}
67+
d.lastEstimate = estimate
6468

65-
out <- DelayStats{
66-
Measurement: next.Measurement,
67-
Estimate: estimate,
68-
Threshold: currentThreshold,
69-
lastReceiveDelta: delta,
70-
Usage: use,
71-
State: 0,
72-
TargetBitrate: 0,
73-
RTT: 0,
74-
}
75-
}
76-
close(out)
77-
}()
78-
return out
69+
d.dsWriter(DelayStats{
70+
Measurement: ds.Measurement,
71+
Estimate: estimate,
72+
Threshold: currentThreshold,
73+
lastReceiveDelta: delta,
74+
Usage: use,
75+
State: 0,
76+
TargetBitrate: 0,
77+
RTT: 0,
78+
})
7979
}

pkg/gcc/overuse_detector_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,17 @@ func TestOveruseDetectorWithoutDelay(t *testing.T) {
8686
for _, tc := range cases {
8787
tc := tc
8888
t.Run(tc.name, func(t *testing.T) {
89-
od := newOveruseDetector(tc.thresh, tc.delay)
90-
in := make(chan DelayStats)
91-
out := od.run(in)
89+
out := make(chan DelayStats)
90+
dsw := func(ds DelayStats) {
91+
out <- ds
92+
}
93+
od := newOveruseDetector(tc.thresh, tc.delay, dsw)
9294
go func() {
95+
defer close(out)
9396
for _, e := range tc.estimates {
94-
in <- e
97+
od.onDelayStats(e)
9598
time.Sleep(tc.delay)
9699
}
97-
close(in)
98100
}()
99101
received := []usage{}
100102
for s := range out {

0 commit comments

Comments
 (0)