Skip to content

Commit e18d0d1

Browse files
adityaa30mengelbart
authored andcommitted
Optimise SendSideBWE.WriteRTCP
- instead of passing each `cc.Acknowledgement` 1 by 1 in respective channel; pass them all at once to decrease the internal overhead of go channels
1 parent 66cca33 commit e18d0d1

File tree

6 files changed

+95
-89
lines changed

6 files changed

+95
-89
lines changed

pkg/gcc/arrival_group_accumulator.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,36 @@ func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
2020
}
2121
}
2222

23-
func (a *arrivalGroupAccumulator) run(in <-chan cc.Acknowledgment, agWriter func(arrivalGroup)) {
23+
func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter func(arrivalGroup)) {
2424
init := false
2525
group := arrivalGroup{}
26-
for next := range in {
27-
if !init {
28-
group.add(next)
29-
init = true
30-
continue
31-
}
32-
if next.Arrival.Before(group.arrival) {
33-
// ignore out of order arrivals
34-
continue
35-
}
36-
if next.Departure.After(group.departure) {
37-
if interDepartureTimePkt(group, next) <= a.interDepartureThreshold {
26+
for acks := range in {
27+
for _, next := range acks {
28+
if !init {
3829
group.add(next)
30+
init = true
3931
continue
4032
}
41-
42-
if interArrivalTimePkt(group, next) <= a.interArrivalThreshold &&
43-
interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold {
44-
group.add(next)
33+
if next.Arrival.Before(group.arrival) {
34+
// ignore out of order arrivals
4535
continue
4636
}
37+
if next.Departure.After(group.departure) {
38+
if interDepartureTimePkt(group, next) <= a.interDepartureThreshold {
39+
group.add(next)
40+
continue
41+
}
4742

48-
agWriter(group)
49-
group = arrivalGroup{}
50-
group.add(next)
43+
if interArrivalTimePkt(group, next) <= a.interArrivalThreshold &&
44+
interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold {
45+
group.add(next)
46+
continue
47+
}
48+
49+
agWriter(group)
50+
group = arrivalGroup{}
51+
group.add(next)
52+
}
5153
}
5254
}
5355
}

pkg/gcc/arrival_group_accumulator_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func TestArrivalGroupAccumulator(t *testing.T) {
159159
tc := tc
160160
t.Run(tc.name, func(t *testing.T) {
161161
aga := newArrivalGroupAccumulator()
162-
in := make(chan cc.Acknowledgment)
162+
in := make(chan []cc.Acknowledgment)
163163
out := make(chan arrivalGroup)
164164
go func() {
165165
defer close(out)
@@ -168,9 +168,7 @@ func TestArrivalGroupAccumulator(t *testing.T) {
168168
})
169169
}()
170170
go func() {
171-
for _, as := range tc.log {
172-
in <- as
173-
}
171+
in <- tc.log
174172
close(in)
175173
}()
176174
received := []arrivalGroup{}

pkg/gcc/delay_based_bwe.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type DelayStats struct {
2424
type now func() time.Time
2525

2626
type delayController struct {
27-
ackPipe chan<- cc.Acknowledgment
28-
ackRatePipe chan<- cc.Acknowledgment
27+
ackPipe chan<- []cc.Acknowledgment
28+
ackRatePipe chan<- []cc.Acknowledgment
2929
ackRTTPipe chan<- []cc.Acknowledgment
3030

3131
*arrivalGroupAccumulator
@@ -43,8 +43,8 @@ type delayControllerConfig struct {
4343
}
4444

4545
func newDelayController(c delayControllerConfig) *delayController {
46-
ackPipe := make(chan cc.Acknowledgment)
47-
ackRatePipe := make(chan cc.Acknowledgment)
46+
ackPipe := make(chan []cc.Acknowledgment)
47+
ackRatePipe := make(chan []cc.Acknowledgment)
4848
ackRTTPipe := make(chan []cc.Acknowledgment)
4949

5050
delayController := &delayController{
@@ -90,10 +90,8 @@ func (d *delayController) onUpdate(f func(DelayStats)) {
9090
}
9191

9292
func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) {
93-
for _, ack := range acks {
94-
d.ackPipe <- ack
95-
d.ackRatePipe <- ack
96-
}
93+
d.ackPipe <- acks
94+
d.ackRatePipe <- acks
9795
d.ackRTTPipe <- acks
9896
}
9997

pkg/gcc/rate_calculator.go

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,46 @@ func newRateCalculator(window time.Duration) *rateCalculator {
1616
}
1717
}
1818

19-
func (c *rateCalculator) run(in <-chan cc.Acknowledgment, onRateUpdate func(int)) {
19+
func (c *rateCalculator) run(in <-chan []cc.Acknowledgment, onRateUpdate func(int)) {
2020
var history []cc.Acknowledgment
2121
init := false
2222
sum := 0
23-
for next := range in {
24-
if next.Arrival.IsZero() {
25-
// Ignore packet if it didn't arrive
26-
continue
27-
}
28-
history = append(history, next)
29-
sum += next.Size
23+
for acks := range in {
24+
for _, next := range acks {
25+
if next.Arrival.IsZero() {
26+
// Ignore packet if it didn't arrive
27+
continue
28+
}
29+
history = append(history, next)
30+
sum += next.Size
3031

31-
if !init {
32-
init = true
33-
// Don't know any timeframe here, only arrival of last packet,
34-
// which is by definition in the window that ends with the last
35-
// arrival time
36-
onRateUpdate(next.Size * 8)
37-
continue
38-
}
32+
if !init {
33+
init = true
34+
// Don't know any timeframe here, only arrival of last packet,
35+
// which is by definition in the window that ends with the last
36+
// arrival time
37+
onRateUpdate(next.Size * 8)
38+
continue
39+
}
3940

40-
del := 0
41-
for _, ack := range history {
42-
deadline := next.Arrival.Add(-c.window)
43-
if !ack.Arrival.Before(deadline) {
44-
break
41+
del := 0
42+
for _, ack := range history {
43+
deadline := next.Arrival.Add(-c.window)
44+
if !ack.Arrival.Before(deadline) {
45+
break
46+
}
47+
del++
48+
sum -= ack.Size
4549
}
46-
del++
47-
sum -= ack.Size
48-
}
49-
history = history[del:]
50-
if len(history) == 0 {
51-
onRateUpdate(0)
52-
continue
50+
history = history[del:]
51+
if len(history) == 0 {
52+
onRateUpdate(0)
53+
continue
54+
}
55+
dt := next.Arrival.Sub(history[0].Arrival)
56+
bits := 8 * sum
57+
rate := int(float64(bits) / dt.Seconds())
58+
onRateUpdate(rate)
5359
}
54-
dt := next.Arrival.Sub(history[0].Arrival)
55-
bits := 8 * sum
56-
rate := int(float64(bits) / dt.Seconds())
57-
onRateUpdate(rate)
5860
}
5961
}

pkg/gcc/rate_calculator_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestRateCalculator(t *testing.T) {
8181
tc := tc
8282
t.Run(tc.name, func(t *testing.T) {
8383
rc := newRateCalculator(500 * time.Millisecond)
84-
in := make(chan cc.Acknowledgment)
84+
in := make(chan []cc.Acknowledgment)
8585
out := make(chan int)
8686
onRateUpdate := func(rate int) {
8787
out <- rate
@@ -91,9 +91,7 @@ func TestRateCalculator(t *testing.T) {
9191
rc.run(in, onRateUpdate)
9292
}()
9393
go func() {
94-
for _, ack := range tc.acks {
95-
in <- ack
96-
}
94+
in <- tc.acks
9795
close(in)
9896
}()
9997

pkg/gcc/send_side_bwe_test.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gcc
22

33
import (
4+
"fmt"
45
"math/rand"
56
"testing"
67

@@ -106,32 +107,39 @@ func TestSendSideBWE_ErrorOnWriteRTCPAtClosedState(t *testing.T) {
106107
}
107108

108109
func BenchmarkSendSideBWE_WriteRTCP(b *testing.B) {
109-
bwe, err := NewSendSideBWE(SendSideBWEPacer(NewNoOpPacer()))
110-
require.NoError(b, err)
111-
require.NotNil(b, bwe)
110+
numSequencesPerTwccReport := []int{10, 100, 500, 1000}
112111

113-
r := twcc.NewRecorder(5000)
114-
seq := uint16(0)
115-
arrivalTime := int64(0)
112+
for _, count := range numSequencesPerTwccReport {
113+
b.Run(fmt.Sprintf("num_sequences=%d", count), func(b *testing.B) {
114+
bwe, err := NewSendSideBWE(SendSideBWEPacer(NewNoOpPacer()))
115+
require.NoError(b, err)
116+
require.NotNil(b, bwe)
116117

117-
for i := 0; i < b.N; i++ {
118-
seqs := rand.Intn(1000) + 500
119-
for j := 0; j < seqs; j++ {
120-
seq++
118+
r := twcc.NewRecorder(5000)
119+
seq := uint16(0)
120+
arrivalTime := int64(0)
121121

122-
if rand.Intn(5) == 0 {
123-
// skip this packet
124-
}
122+
for i := 0; i < b.N; i++ {
123+
// nolint:gosec
124+
seqs := rand.Intn(count/2) + count // [count, count * 1.5)
125+
for j := 0; j < seqs; j++ {
126+
seq++
125127

126-
arrivalTime += int64(rtcp.TypeTCCDeltaScaleFactor * (rand.Intn(128) + 1))
127-
r.Record(5000, seq, arrivalTime)
128-
}
128+
if rand.Intn(5) == 0 { //nolint:gosec,staticcheck
129+
// skip this packet
130+
}
129131

130-
rtcpPackets := r.BuildFeedbackPacket()
131-
require.Equal(b, 1, len(rtcpPackets))
132+
arrivalTime += int64(rtcp.TypeTCCDeltaScaleFactor * (rand.Intn(128) + 1)) //nolint:gosec
133+
r.Record(5000, seq, arrivalTime)
134+
}
132135

133-
require.NoError(b, bwe.WriteRTCP(rtcpPackets, nil))
134-
}
136+
rtcpPackets := r.BuildFeedbackPacket()
137+
require.Equal(b, 1, len(rtcpPackets))
135138

136-
require.NoError(b, bwe.Close())
139+
require.NoError(b, bwe.WriteRTCP(rtcpPackets, nil))
140+
}
141+
142+
require.NoError(b, bwe.Close())
143+
})
144+
}
137145
}

0 commit comments

Comments
 (0)