Skip to content
4 changes: 4 additions & 0 deletions .github/.ci.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
# SPDX-License-Identifier: MIT

GO_MOD_VERSION_EXPECTED=1.25
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/pion/bwe-test

go 1.21
go 1.25

toolchain go1.25.1

Expand Down
7 changes: 4 additions & 3 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/pion/interceptor"
Expand Down Expand Up @@ -182,7 +183,7 @@ type trackInfo struct {
}

type trackStats struct {
rtpPacketsReceived int
rtpPacketsReceived atomic.Int64
framesAssembled int
keyframesReceived int
startTime time.Time
Expand Down Expand Up @@ -298,7 +299,7 @@ func (r *Receiver) startStatsGoroutine(ctx context.Context, bytesReceivedChan ch
rate := bits / delta.Seconds()
mBitPerSecond := rate / float64(vnet.MBit)
r.log.Infof("throughput: %.2f Mb/s | RTP packets: %d | Frames: %d | Keyframes: %d",
mBitPerSecond, stats.rtpPacketsReceived, stats.framesAssembled, stats.keyframesReceived)
mBitPerSecond, stats.rtpPacketsReceived.Load(), stats.framesAssembled, stats.keyframesReceived)
bytesReceived = 0
last = now
case newBytesReceived := <-bytesReceivedChan:
Expand Down Expand Up @@ -352,7 +353,7 @@ func (r *Receiver) processPackets(ctx context.Context, trackRemote *webrtc.Track
}

bytesReceivedChan <- packet.MarshalSize()
stats.rtpPacketsReceived++
stats.rtpPacketsReceived.Add(1)

r.processVP8Packet(packet, trackInfo, frameAssembler, stats)
}
Expand Down
5 changes: 4 additions & 1 deletion sender/simulcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func NewSimulcastFilesSource() *SimulcastFilesSource {

// SetTargetBitrate sets the target bitrate for the simulcast source.
func (s *SimulcastFilesSource) SetTargetBitrate(rate int) {
s.updateTargetBitrate <- rate
select {
case s.updateTargetBitrate <- rate:
case <-s.done:
}
}

// SetWriter sets the sample writer function.
Expand Down
11 changes: 9 additions & 2 deletions syncodec/statistical_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ func (c *StatisticalCodec) GetTargetBitrate() int {
// greater than c.rMax, bitrate will be set to c.rMax. If r is lower than
// c.rMin, bitrate will be set to c.rMin.
func (c *StatisticalCodec) SetTargetBitrate(r int) {
c.targetBitrateLock.Lock()
defer c.targetBitrateLock.Unlock()

if r < c.targetBitrateBps {
c.targetBitrateBps = maximum(r, c.rMin)

Expand All @@ -244,10 +247,14 @@ func (c *StatisticalCodec) nextFrame() Frame {
}
}

bytesPerFrame := c.targetBitrateBps / (8.0 * c.fps)
c.targetBitrateLock.Lock()
bps := c.targetBitrateBps
c.targetBitrateLock.Unlock()

bytesPerFrame := bps / (8.0 * c.fps)

if c.remainingBurstFrames > 0 {
size := (c.targetBitrateBps * c.burstFrameCount) / (c.burstFrameSize + (c.burstFrameCount - 1))
size := (bps * c.burstFrameCount) / (c.burstFrameSize + (c.burstFrameCount - 1))

return Frame{
Content: make([]byte, size),
Expand Down
58 changes: 37 additions & 21 deletions vnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"log"
"os"
"strings"
"sync"
"time"

"github.com/pion/logging"
Expand Down Expand Up @@ -151,6 +152,11 @@ func (r *Runner) runVariableAvailableCapacitySingleFlow() error {
if err != nil {
return fmt.Errorf("new manager: %w", err)
}
defer func() {
if closeErr := nm.Close(); closeErr != nil {
r.logger.Errorf("failed to close network manager: %v", closeErr)
}
}()

dataDir := fmt.Sprintf("data/%v", r.name)
err = os.MkdirAll(dataDir, 0o750)
Expand All @@ -162,21 +168,18 @@ func (r *Runner) runVariableAvailableCapacitySingleFlow() error {
if err != nil {
return fmt.Errorf("setup simple flow: %w", err)
}
defer func(flow Flow) {
err = flow.Close()
if err != nil {
r.logger.Errorf("flow close: %v", err)
}
}(flow)

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
wg.Go(func() {
err = flow.sender.sender.Start(ctx)
if err != nil {
r.logger.Errorf("sender start: %v", err)
}
}()
})

path := pathCharacteristics{
referenceCapacity: 1 * vnet.MBit,
Expand Down Expand Up @@ -205,38 +208,46 @@ func (r *Runner) runVariableAvailableCapacitySingleFlow() error {
}
r.runNetworkSimulation(path, nm)

return nil
return flow.Close()
}

func (r *Runner) runVariableAvailableCapacityMultipleFlows() error {
nm, err := NewManager()
if err != nil {
return fmt.Errorf("new manager: %w", err)
}
defer func() {
if closeErr := nm.Close(); closeErr != nil {
r.logger.Errorf("failed to close network manager: %v", closeErr)
}
}()

dataDir := fmt.Sprintf("data/%v", r.name)
err = os.MkdirAll(dataDir, 0o750)
if err != nil {
return fmt.Errorf("mkdir data: %w", err)
}

var wg sync.WaitGroup
defer wg.Wait()

var flows []Flow
for i := 0; i < 2; i++ {
flow, err := NewSimpleFlow(r.loggerFactory, nm, i, r.senderMode, dataDir)
defer func(flow Flow) {
err = flow.Close()
if err != nil {
r.logger.Errorf("flow close: %v", err)
}
}(flow)
var flow Flow
flow, err = NewSimpleFlow(r.loggerFactory, nm, i, r.senderMode, dataDir)
if err != nil {
return err
}
flows = append(flows, flow)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err = flow.sender.sender.Start(ctx)
if err != nil {
r.logger.Errorf("sender start: %v", err)
wg.Go(func() {
startErr := flow.sender.sender.Start(ctx)
if startErr != nil {
r.logger.Errorf("sender start: %v", startErr)
}
}()
})
}

path := pathCharacteristics{
Expand Down Expand Up @@ -271,6 +282,11 @@ func (r *Runner) runVariableAvailableCapacityMultipleFlows() error {
},
}
r.runNetworkSimulation(path, nm)
for _, f := range flows {
if err = f.Close(); err != nil {
panic(err)
}
}

return nil
}
Expand Down
20 changes: 19 additions & 1 deletion vnet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func (r *RouterWithConfig) getIPMapping() (private, public string, err error) {
public = mapping[0]
private = mapping[1]

return
return private, public, err
}

// NetworkManager manages the virtual network topology for bandwidth estimation tests.
type NetworkManager struct {
wan *vnet.Router
leftRouter *RouterWithConfig
leftTBF *vnet.TokenBucketFilter
rightRouter *RouterWithConfig
Expand Down Expand Up @@ -94,6 +95,7 @@ func NewManager() (*NetworkManager, error) {
}

manager := &NetworkManager{
wan: wan,
leftRouter: leftRouter,
leftTBF: leftTBF,
rightRouter: rightRouter,
Expand All @@ -107,6 +109,22 @@ func NewManager() (*NetworkManager, error) {
return manager, nil
}

func (m *NetworkManager) Close() error {
var errs []error

if err := m.leftTBF.Close(); err != nil {
errs = append(errs, err)
}
if err := m.rightTBF.Close(); err != nil {
errs = append(errs, err)
}
if err := m.wan.Stop(); err != nil {
errs = append(errs, err)
}

return errors.Join(errs...)
}

// GetLeftNet creates and returns a new Net on the left side of the network topology.
func (m *NetworkManager) GetLeftNet() (*vnet.Net, string, error) {
privateIP, publicIP, err := m.leftRouter.getIPMapping()
Expand Down
55 changes: 55 additions & 0 deletions vnet/vnet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add

//go:build !js
// +build !js

here? It would fix the test wasm failed check~

//go:build !js
// +build !js

package main

import (
"testing"
"testing/synctest"

"github.com/pion/logging"
"github.com/stretchr/testify/assert"
)

func TestVnet(t *testing.T) {
lf := logging.NewDefaultLoggerFactory()
logger := lf.NewLogger("bwe_vnet_synctest")

testCases := []struct {
name string
senderMode senderMode
flowMode flowMode
}{
{
name: "TestVnetRunnerABR/VariableAvailableCapacitySingleFlow",
senderMode: abrSenderMode,
flowMode: singleFlowMode,
},
{
name: "TestVnetRunnerABR/VariableAvailableCapacityMultipleFlows",
senderMode: abrSenderMode,
flowMode: multipleFlowsMode,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
t.Helper()
runner := Runner{
loggerFactory: lf,
logger: logger,
name: tc.name,
senderMode: tc.senderMode,
flowMode: tc.flowMode,
}
err := runner.Run()
assert.NoError(t, err)
synctest.Wait()
})
})
}
}
Loading