Skip to content

Commit 012d2b4

Browse files
authored
Test RTT switch reordering (#408)
1 parent c875185 commit 012d2b4

File tree

1 file changed

+269
-0
lines changed

1 file changed

+269
-0
lines changed

vnet_test.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ package sctp
66
import (
77
"bytes"
88
crand "crypto/rand"
9+
"fmt"
910
"net"
1011
"reflect"
12+
"sync/atomic"
1113
"testing"
1214
"time"
1315

@@ -686,3 +688,270 @@ func TestCookieEchoRetransmission(t *testing.T) {
686688
<-serverShutDown
687689
log.Info("all done")
688690
}
691+
692+
// Simulate an RTT switch (high -> low) by delaying early DATA, then disabling delay so
693+
// later DATA arrives before earlier DATA. Under a RACK regression, rackMinRTT would never increases,
694+
// causing reoWnd to be too small and marking packets sent at high RTT as spuriously lost.
695+
func TestRACK_RTTSwitch_Reordering_NoDrop(t *testing.T) { //nolint:gocyclo,cyclop,maintidx
696+
lim := test.TimeOut(10 * time.Second)
697+
defer lim.Stop()
698+
699+
loggerFactory := logging.NewDefaultLoggerFactory()
700+
log := loggerFactory.NewLogger("test-rack-rtt-switch")
701+
702+
venv, err := buildVNetEnv(t, &vNetEnvConfig{
703+
minDelay: 0,
704+
loggerFactory: loggerFactory,
705+
log: log,
706+
})
707+
require.NoError(t, err)
708+
require.NotNil(t, venv)
709+
710+
defer venv.wan.Stop() // nolint:errcheck
711+
712+
var delayOn atomic.Value
713+
delayOn.Store(true)
714+
venv.wan.AddChunkFilter(func(c vnet.Chunk) bool {
715+
p := &packet{}
716+
if err := p.unmarshal(true, c.UserData()); err != nil {
717+
return true
718+
}
719+
v := delayOn.Load()
720+
if val, ok := v.(bool); ok && !val {
721+
return true
722+
}
723+
for i := 0; i < len(p.chunks); i++ {
724+
if _, ok := p.chunks[i].(*chunkPayloadData); ok {
725+
time.Sleep(100 * time.Millisecond)
726+
727+
break
728+
}
729+
}
730+
731+
return true
732+
})
733+
734+
const (
735+
numMessages = 40
736+
messageSize = 256
737+
)
738+
739+
makeMessages := func() [][]byte {
740+
msgs := make([][]byte, numMessages)
741+
for i := 0; i < numMessages; i++ {
742+
b := bytes.Repeat([]byte{byte(i % 251)}, messageSize)
743+
msgs[i] = b
744+
}
745+
746+
return msgs
747+
}
748+
749+
type statsResult struct {
750+
fr uint64
751+
ok bool
752+
}
753+
754+
errCh := make(chan error, 16)
755+
clientDone := make(chan struct{})
756+
serverDone := make(chan struct{})
757+
clientStatsCh := make(chan statsResult, 1)
758+
serverStatsCh := make(chan statsResult, 1)
759+
760+
go func() {
761+
defer close(serverDone)
762+
763+
fail := func(e error) {
764+
if e != nil {
765+
errCh <- e
766+
}
767+
}
768+
769+
conn, err := venv.net0.DialUDP("udp4",
770+
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: defaultSCTPSrcDstPort},
771+
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: defaultSCTPSrcDstPort},
772+
)
773+
if err != nil {
774+
fail(fmt.Errorf("server DialUDP: %w", err))
775+
serverStatsCh <- statsResult{ok: false}
776+
777+
return
778+
}
779+
780+
defer conn.Close() // nolint:errcheck
781+
782+
assoc, err := Server(Config{
783+
NetConn: conn,
784+
LoggerFactory: loggerFactory,
785+
})
786+
if err != nil {
787+
fail(fmt.Errorf("server assoc: %w", err))
788+
serverStatsCh <- statsResult{ok: false}
789+
790+
return
791+
}
792+
793+
defer func() {
794+
var fr uint64
795+
if assoc != nil {
796+
fr = assoc.stats.getNumFastRetrans()
797+
}
798+
serverStatsCh <- statsResult{fr: fr, ok: assoc != nil}
799+
_ = assoc.Close()
800+
}()
801+
802+
stream, err := assoc.AcceptStream()
803+
if err != nil {
804+
fail(fmt.Errorf("server AcceptStream: %w", err))
805+
806+
return
807+
}
808+
defer stream.Close() // nolint:errcheck
809+
stream.SetReliabilityParams(false, ReliabilityTypeReliable, 0)
810+
811+
buf := make([]byte, 1500)
812+
for {
813+
_ = stream.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
814+
n, rerr := stream.Read(buf)
815+
if rerr != nil {
816+
return
817+
}
818+
if n > 0 {
819+
_, _ = stream.Write(buf[:n])
820+
}
821+
}
822+
}()
823+
824+
go func() {
825+
defer close(clientDone)
826+
827+
fail := func(e error) {
828+
if e != nil {
829+
errCh <- e
830+
}
831+
}
832+
833+
conn, err := venv.net1.DialUDP("udp4",
834+
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: defaultSCTPSrcDstPort},
835+
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: defaultSCTPSrcDstPort},
836+
)
837+
if err != nil {
838+
fail(fmt.Errorf("client DialUDP: %w", err))
839+
clientStatsCh <- statsResult{ok: false}
840+
841+
return
842+
}
843+
defer conn.Close() // nolint:errcheck
844+
845+
assoc, err := Client(Config{
846+
NetConn: conn,
847+
LoggerFactory: loggerFactory,
848+
})
849+
if err != nil {
850+
fail(fmt.Errorf("client assoc: %w", err))
851+
clientStatsCh <- statsResult{ok: false}
852+
853+
return
854+
}
855+
856+
defer func() {
857+
var fr uint64
858+
if assoc != nil {
859+
fr = assoc.stats.getNumFastRetrans()
860+
}
861+
clientStatsCh <- statsResult{fr: fr, ok: assoc != nil}
862+
_ = assoc.Close()
863+
}()
864+
865+
stream, err := assoc.OpenStream(777, PayloadTypeWebRTCBinary)
866+
if err != nil {
867+
fail(fmt.Errorf("client OpenStream: %w", err))
868+
869+
return
870+
}
871+
defer stream.Close() // nolint:errcheck
872+
stream.SetReliabilityParams(false, ReliabilityTypeReliable, 0)
873+
874+
msgs := makeMessages()
875+
876+
// phase 1: high-RTT emulation we send 25 messages and drop a DATA chunk for one time.
877+
delayOn.Store(true)
878+
venv.dropNextDataChunk(1)
879+
for i := 0; i < 25; i++ {
880+
if _, werr := stream.Write(msgs[i]); werr != nil {
881+
fail(fmt.Errorf("client write phase1 i=%d: %w", i, werr))
882+
883+
return
884+
}
885+
}
886+
887+
// phase 2 we switch to low-RTT, newer datea should arrive before older.
888+
delayOn.Store(false)
889+
for i := 25; i < numMessages; i++ {
890+
if _, werr := stream.Write(msgs[i]); werr != nil {
891+
fail(fmt.Errorf("client write phase2 i=%d: %w", i, werr))
892+
893+
return
894+
}
895+
}
896+
897+
seen := make(map[byte]bool, numMessages)
898+
buf := make([]byte, 4096)
899+
deadline := time.Now().Add(10 * time.Second)
900+
901+
for len(seen) < numMessages && time.Now().Before(deadline) {
902+
_ = stream.SetReadDeadline(time.Now().Add(250 * time.Millisecond))
903+
n, rerr := stream.Read(buf)
904+
if rerr != nil || n == 0 {
905+
continue
906+
}
907+
if n < messageSize {
908+
fail(fmt.Errorf("short echo read: got=%d want=%d", n, messageSize)) //nolint:err113
909+
910+
return
911+
}
912+
id := buf[0]
913+
if seen[id] {
914+
// dups are harmless, keep reading
915+
continue
916+
}
917+
918+
expected := bytes.Repeat([]byte{id}, messageSize)
919+
if !bytes.Equal(buf[:messageSize], expected) {
920+
fail(fmt.Errorf("payload mismatch for id=%d", int(id))) //nolint:err113
921+
922+
return
923+
}
924+
seen[id] = true
925+
}
926+
927+
if len(seen) != numMessages {
928+
fail(fmt.Errorf("missing echoes: got=%d want=%d", len(seen), numMessages)) //nolint:err113
929+
930+
return
931+
}
932+
}()
933+
934+
<-clientDone
935+
<-serverDone
936+
937+
// drain and assert errors, well if any :)
938+
close(errCh)
939+
for e := range errCh {
940+
assert.NoError(t, e)
941+
}
942+
943+
// check FR stats reported.
944+
// we can uncomment this to check the FR stats.
945+
// I tested it and it works fine on the pch07/rack-sctp branch.
946+
// cs := <-clientStatsCh
947+
// ss := <-serverStatsCh
948+
//
949+
// if assert.True(t, cs.ok, "client assoc/stats unavailable") {
950+
// assert.LessOrEqual(t, cs.fr, uint64(2),
951+
// "client fast retransmits should be low")
952+
// }
953+
// if assert.True(t, ss.ok, "server assoc/stats unavailable") {
954+
// assert.LessOrEqual(t, ss.fr, uint64(2),
955+
// "server fast retransmits should be low")
956+
// }
957+
}

0 commit comments

Comments
 (0)