Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
18aeb4e
Implement advanced bandwidth optimization with adaptive frame sizing …
devin-ai-integration[bot] May 28, 2025
a41d698
Fix code formatting issues
devin-ai-integration[bot] May 28, 2025
c6fcd20
Fix type conversion issue with time.Duration comparison
devin-ai-integration[bot] May 28, 2025
edc5de8
Fix deadlock in predictive retransmission by separating lock acquisit…
devin-ai-integration[bot] May 28, 2025
d8b1a1d
Fix Go formatting in transfer.go
devin-ai-integration[bot] May 28, 2025
c3eda9d
Implement dynamic RTT probing for network environment detection and o…
devin-ai-integration[bot] May 28, 2025
2b9fc8c
Fix type conversion issues between time.Duration and float64
devin-ai-integration[bot] May 28, 2025
e89ab8d
Enable adaptive frame sizing when dynamic allocation is enabled
devin-ai-integration[bot] May 28, 2025
2c1331e
Implement dynamic RTT probing with network environment detection and …
devin-ai-integration[bot] May 28, 2025
8f5097d
Fix build errors in dynamic RTT probing implementation
devin-ai-integration[bot] May 28, 2025
5d3eef3
Fix indentation in transfer.go for dynamic RTT probing
devin-ai-integration[bot] May 28, 2025
79ca2e4
Fix indentation in transfer.go for dynamic RTT probing
devin-ai-integration[bot] May 28, 2025
f19d493
Enhance adaptive frame sizing with improved RTT thresholds
devin-ai-integration[bot] May 28, 2025
bfa847a
Refactor transfer.go to reduce nesting with helper functions
devin-ai-integration[bot] May 28, 2025
7ac13ab
Fix formatting in rtt.go with go fmt
devin-ai-integration[bot] May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (svc *Service) sendFile(id string, filePath string) error {
fmt.Println("Enabling dynamic frame allocation")
}
s.EnableDynamicAllocation()
s.EnableAdaptiveFrameSizing()
}

for _, worker := range m.Workers {
Expand Down
21 changes: 20 additions & 1 deletion pkg/sender/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type SendFrame struct {
sendTime time.Time
retryTimes int
hasAck bool
transferID int // ID of the transfer this frame is assigned to
transferID int // ID of the transfer this frame is assigned to
rtt time.Duration // Last measured RTT for this frame

mu sync.Mutex
}
Expand All @@ -31,6 +32,24 @@ func (sf *SendFrame) UpdateSendTime() {
sf.mu.Unlock()
}

func (sf *SendFrame) GetSendTime() time.Time {
sf.mu.Lock()
defer sf.mu.Unlock()
return sf.sendTime
}

func (sf *SendFrame) SetRTT(rtt time.Duration) {
sf.mu.Lock()
sf.rtt = rtt
sf.mu.Unlock()
}

func (sf *SendFrame) GetRTT() time.Duration {
sf.mu.Lock()
defer sf.mu.Unlock()
return sf.rtt
}

func (sf *SendFrame) FrameID() uint32 {
return sf.frame.FrameID
}
Expand Down
195 changes: 195 additions & 0 deletions pkg/sender/rtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package sender

import (
"math"
"sync"
"time"
)

type NetworkEnvironmentType int

const (
NetworkEnvironmentUnknown NetworkEnvironmentType = iota
NetworkEnvironmentLocal
NetworkEnvironmentRemote
)

var RTTThresholds = struct {
LocalThreshold time.Duration
RemoteThreshold time.Duration
}{
LocalThreshold: 5 * time.Millisecond,
RemoteThreshold: 50 * time.Millisecond,
}

type RTTStats struct {
minRTT time.Duration
smoothedRTT time.Duration
rttVar time.Duration
latestRTT time.Duration
samples int
networkEnvironment NetworkEnvironmentType
environmentDetected bool
lastEnvCheck time.Time
mu sync.Mutex
}

func NewRTTStats() *RTTStats {
return &RTTStats{
minRTT: time.Hour, // Initialize to a large value
smoothedRTT: 0,
rttVar: 0,
latestRTT: 0,
samples: 0,
networkEnvironment: NetworkEnvironmentUnknown,
environmentDetected: false,
lastEnvCheck: time.Now(),
}
}

func (r *RTTStats) UpdateRTT(sendTime time.Time) {
r.mu.Lock()
defer r.mu.Unlock()

rtt := time.Since(sendTime)
r.latestRTT = rtt

if rtt < r.minRTT {
r.minRTT = rtt
}

if r.samples == 0 {
r.smoothedRTT = rtt
r.rttVar = rtt / 2
} else {
rttDelta := time.Duration(math.Abs(float64(r.smoothedRTT - rtt)))
r.rttVar = r.rttVar*3/4 + rttDelta/4

r.smoothedRTT = r.smoothedRTT*7/8 + rtt/8
}

r.samples++

if !r.environmentDetected || r.samples%10 == 0 {
r.detectNetworkEnvironment()
}
}

func (r *RTTStats) detectNetworkEnvironment() {
if r.samples < 5 {
return
}

if r.smoothedRTT <= RTTThresholds.LocalThreshold {
r.networkEnvironment = NetworkEnvironmentLocal
} else if r.smoothedRTT >= RTTThresholds.RemoteThreshold {
r.networkEnvironment = NetworkEnvironmentRemote
} else {
if r.minRTT <= RTTThresholds.LocalThreshold*2 {
if r.rttVar < r.smoothedRTT/4 {
r.networkEnvironment = NetworkEnvironmentLocal
} else {
if r.latestRTT < r.smoothedRTT {
r.networkEnvironment = NetworkEnvironmentLocal
} else {
r.networkEnvironment = NetworkEnvironmentRemote
}
}
} else {
r.networkEnvironment = NetworkEnvironmentRemote
}
}

r.environmentDetected = true
r.lastEnvCheck = time.Now()
}

func (r *RTTStats) GetSmoothedRTT() time.Duration {
r.mu.Lock()
defer r.mu.Unlock()

if r.samples == 0 {
return time.Millisecond * 100 // Default value if no samples
}
return r.smoothedRTT
}

func (r *RTTStats) GetRTTVariation() time.Duration {
r.mu.Lock()
defer r.mu.Unlock()

if r.samples == 0 {
return time.Millisecond * 50 // Default value if no samples
}
return r.rttVar
}

func (r *RTTStats) GetMinRTT() time.Duration {
r.mu.Lock()
defer r.mu.Unlock()

if r.minRTT == time.Hour {
return time.Millisecond * 50 // Default value if no valid minimum
}
return r.minRTT
}

func (r *RTTStats) GetLatestRTT() time.Duration {
r.mu.Lock()
defer r.mu.Unlock()
return r.latestRTT
}

func (r *RTTStats) GetNetworkEnvironment() NetworkEnvironmentType {
r.mu.Lock()
defer r.mu.Unlock()
return r.networkEnvironment
}

func (r *RTTStats) IsLocalNetwork() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.networkEnvironment == NetworkEnvironmentLocal
}

func (r *RTTStats) IsRemoteNetwork() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.networkEnvironment == NetworkEnvironmentRemote
}

func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 {
r.mu.Lock()
defer r.mu.Unlock()

// Base multiplier based on network environment
var baseMultiplier float64
switch r.networkEnvironment {
case NetworkEnvironmentLocal:
baseMultiplier = 0.5 // More aggressive for local networks
case NetworkEnvironmentRemote:
baseMultiplier = 2.0 // More conservative for remote networks
default:
baseMultiplier = 1.0 // Default multiplier
}

if r.samples < 10 {
return baseMultiplier // Not enough samples to make adjustments
}

stabilityRatio := float64(r.rttVar) / float64(r.smoothedRTT)

if stabilityRatio < 0.1 {
return baseMultiplier * 0.8
} else if stabilityRatio > 0.3 {
return baseMultiplier * 1.5
}

if r.latestRTT < r.smoothedRTT {
return baseMultiplier * 0.9
} else if r.latestRTT > time.Duration(float64(r.smoothedRTT)*1.2) {
return baseMultiplier * 1.3
}

return baseMultiplier
}
Loading