diff --git a/client/send.go b/client/send.go index efb629d..de95cf1 100644 --- a/client/send.go +++ b/client/send.go @@ -93,6 +93,7 @@ func (svc *Service) sendFile(id string, filePath string) error { fmt.Println("Enabling dynamic frame allocation") } s.EnableDynamicAllocation() + s.EnableAdaptiveFrameSizing() } for _, worker := range m.Workers { diff --git a/pkg/sender/frame.go b/pkg/sender/frame.go index b9ae86e..d8615b2 100644 --- a/pkg/sender/frame.go +++ b/pkg/sender/frame.go @@ -13,7 +13,8 @@ type SendFrame struct { sendTime time.Time retryTimes int hasAck bool - transferID int // ID of the transfer this frame is assigned to + transferID int // ID of the transfer this frame is assigned to + rtt time.Duration // Last measured RTT for this frame mu sync.Mutex } @@ -31,6 +32,24 @@ func (sf *SendFrame) UpdateSendTime() { sf.mu.Unlock() } +func (sf *SendFrame) GetSendTime() time.Time { + sf.mu.Lock() + defer sf.mu.Unlock() + return sf.sendTime +} + +func (sf *SendFrame) SetRTT(rtt time.Duration) { + sf.mu.Lock() + sf.rtt = rtt + sf.mu.Unlock() +} + +func (sf *SendFrame) GetRTT() time.Duration { + sf.mu.Lock() + defer sf.mu.Unlock() + return sf.rtt +} + func (sf *SendFrame) FrameID() uint32 { return sf.frame.FrameID } diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go new file mode 100644 index 0000000..97c5cff --- /dev/null +++ b/pkg/sender/rtt.go @@ -0,0 +1,195 @@ +package sender + +import ( + "math" + "sync" + "time" +) + +type NetworkEnvironmentType int + +const ( + NetworkEnvironmentUnknown NetworkEnvironmentType = iota + NetworkEnvironmentLocal + NetworkEnvironmentRemote +) + +var RTTThresholds = struct { + LocalThreshold time.Duration + RemoteThreshold time.Duration +}{ + LocalThreshold: 5 * time.Millisecond, + RemoteThreshold: 50 * time.Millisecond, +} + +type RTTStats struct { + minRTT time.Duration + smoothedRTT time.Duration + rttVar time.Duration + latestRTT time.Duration + samples int + networkEnvironment NetworkEnvironmentType + environmentDetected bool + lastEnvCheck time.Time + mu sync.Mutex +} + +func NewRTTStats() *RTTStats { + return &RTTStats{ + minRTT: time.Hour, // Initialize to a large value + smoothedRTT: 0, + rttVar: 0, + latestRTT: 0, + samples: 0, + networkEnvironment: NetworkEnvironmentUnknown, + environmentDetected: false, + lastEnvCheck: time.Now(), + } +} + +func (r *RTTStats) UpdateRTT(sendTime time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + + rtt := time.Since(sendTime) + r.latestRTT = rtt + + if rtt < r.minRTT { + r.minRTT = rtt + } + + if r.samples == 0 { + r.smoothedRTT = rtt + r.rttVar = rtt / 2 + } else { + rttDelta := time.Duration(math.Abs(float64(r.smoothedRTT - rtt))) + r.rttVar = r.rttVar*3/4 + rttDelta/4 + + r.smoothedRTT = r.smoothedRTT*7/8 + rtt/8 + } + + r.samples++ + + if !r.environmentDetected || r.samples%10 == 0 { + r.detectNetworkEnvironment() + } +} + +func (r *RTTStats) detectNetworkEnvironment() { + if r.samples < 5 { + return + } + + if r.smoothedRTT <= RTTThresholds.LocalThreshold { + r.networkEnvironment = NetworkEnvironmentLocal + } else if r.smoothedRTT >= RTTThresholds.RemoteThreshold { + r.networkEnvironment = NetworkEnvironmentRemote + } else { + if r.minRTT <= RTTThresholds.LocalThreshold*2 { + if r.rttVar < r.smoothedRTT/4 { + r.networkEnvironment = NetworkEnvironmentLocal + } else { + if r.latestRTT < r.smoothedRTT { + r.networkEnvironment = NetworkEnvironmentLocal + } else { + r.networkEnvironment = NetworkEnvironmentRemote + } + } + } else { + r.networkEnvironment = NetworkEnvironmentRemote + } + } + + r.environmentDetected = true + r.lastEnvCheck = time.Now() +} + +func (r *RTTStats) GetSmoothedRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.samples == 0 { + return time.Millisecond * 100 // Default value if no samples + } + return r.smoothedRTT +} + +func (r *RTTStats) GetRTTVariation() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.samples == 0 { + return time.Millisecond * 50 // Default value if no samples + } + return r.rttVar +} + +func (r *RTTStats) GetMinRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.minRTT == time.Hour { + return time.Millisecond * 50 // Default value if no valid minimum + } + return r.minRTT +} + +func (r *RTTStats) GetLatestRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + return r.latestRTT +} + +func (r *RTTStats) GetNetworkEnvironment() NetworkEnvironmentType { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment +} + +func (r *RTTStats) IsLocalNetwork() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment == NetworkEnvironmentLocal +} + +func (r *RTTStats) IsRemoteNetwork() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment == NetworkEnvironmentRemote +} + +func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { + r.mu.Lock() + defer r.mu.Unlock() + + // Base multiplier based on network environment + var baseMultiplier float64 + switch r.networkEnvironment { + case NetworkEnvironmentLocal: + baseMultiplier = 0.5 // More aggressive for local networks + case NetworkEnvironmentRemote: + baseMultiplier = 2.0 // More conservative for remote networks + default: + baseMultiplier = 1.0 // Default multiplier + } + + if r.samples < 10 { + return baseMultiplier // Not enough samples to make adjustments + } + + stabilityRatio := float64(r.rttVar) / float64(r.smoothedRTT) + + if stabilityRatio < 0.1 { + return baseMultiplier * 0.8 + } else if stabilityRatio > 0.3 { + return baseMultiplier * 1.5 + } + + if r.latestRTT < r.smoothedRTT { + return baseMultiplier * 0.9 + } else if r.latestRTT > time.Duration(float64(r.smoothedRTT)*1.2) { + return baseMultiplier * 1.3 + } + + return baseMultiplier +} diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index fcc338e..d90a8ea 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -22,7 +22,10 @@ type Sender struct { id uint32 // each frame size - frameSize int + frameSize int + minFrameSize int + maxFrameSize int + adaptiveFrameSizingEnabled bool // send src to remote Receiver src io.Reader @@ -45,6 +48,9 @@ type Sender struct { waitAcks map[uint32]*SendFrame bufferFrames []*SendFrame + rttStats *RTTStats + lastFrameSizeAdj time.Time + // 1 means all frames has been sent sendAll bool mu sync.Mutex @@ -62,23 +68,38 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se maxBufferCount = 100 } + minFrameSize := 1024 // 1KB minimum + maxFrameSize := 64 * 1024 // 64KB maximum + + if frameSize < minFrameSize { + frameSize = minFrameSize + } else if frameSize > maxFrameSize { + frameSize = maxFrameSize + } + + now := time.Now() s := &Sender{ - id: id, - frameSize: frameSize, - src: src, - frameCh: make(chan *SendFrame), - ackCh: make(chan *stream.Ack), - dynamicAllocationEnabled: false, - transfers: make(map[int]*Transfer), - totalThroughput: 0, - allocationRatios: make(map[int]float64), - maxBufferCount: maxBufferCount, - retryFrames: make([]*SendFrame, 0), - limiter: make(chan struct{}, maxBufferCount), - waitAcks: make(map[uint32]*SendFrame), - bufferFrames: make([]*SendFrame, 0), - sendShutdown: shutdown.New(), - ackShutdown: shutdown.New(), + id: id, + frameSize: frameSize, + minFrameSize: minFrameSize, + maxFrameSize: maxFrameSize, + adaptiveFrameSizingEnabled: true, + src: src, + frameCh: make(chan *SendFrame), + ackCh: make(chan *stream.Ack), + dynamicAllocationEnabled: false, + transfers: make(map[int]*Transfer), + totalThroughput: 0, + allocationRatios: make(map[int]float64), + maxBufferCount: maxBufferCount, + retryFrames: make([]*SendFrame, 0), + limiter: make(chan struct{}, maxBufferCount), + waitAcks: make(map[uint32]*SendFrame), + bufferFrames: make([]*SendFrame, 0), + rttStats: NewRTTStats(), + lastFrameSizeAdj: now, + sendShutdown: shutdown.New(), + ackShutdown: shutdown.New(), } for i := 0; i < maxBufferCount; i++ { s.limiter <- struct{}{} @@ -93,6 +114,37 @@ func (sender *Sender) EnableDynamicAllocation() { sender.dynamicAllocationEnabled = true } +func (sender *Sender) EnableAdaptiveFrameSizing() { + sender.mu.Lock() + defer sender.mu.Unlock() + + sender.adaptiveFrameSizingEnabled = true +} + +func (sender *Sender) SetFrameSizeBounds(minSize, maxSize int) error { + if !stream.IsValidFrameSize(minSize) || !stream.IsValidFrameSize(maxSize) { + return fmt.Errorf("invalid frame size bounds") + } + + if minSize > maxSize { + return fmt.Errorf("minimum frame size cannot be larger than maximum") + } + + sender.mu.Lock() + defer sender.mu.Unlock() + + sender.minFrameSize = minSize + sender.maxFrameSize = maxSize + + if sender.frameSize < minSize { + sender.frameSize = minSize + } else if sender.frameSize > maxSize { + sender.frameSize = maxSize + } + + return nil +} + func (sender *Sender) HandleStream(s *stream.FrameStream) { sender.mu.Lock() if sender.sendAll { @@ -194,6 +246,69 @@ func (sender *Sender) loopSend() { for { <-sender.limiter + if sender.adaptiveFrameSizingEnabled { + now := time.Now() + if now.Sub(sender.lastFrameSizeAdj) > time.Second { + sender.mu.Lock() + + smoothedRTT := sender.rttStats.GetSmoothedRTT() + rttVar := sender.rttStats.GetRTTVariation() + minRTT := sender.rttStats.GetMinRTT() + rttMultiplier := sender.rttStats.GetAdaptiveRTTMultiplier() + + currentSize := sender.frameSize + newSize := currentSize + + isLocalNetwork := sender.rttStats.IsLocalNetwork() + isRemoteNetwork := sender.rttStats.IsRemoteNetwork() + + if isLocalNetwork { + if rttVar < smoothedRTT/4 { + newSize = int(float64(currentSize) * 1.5) // Increase by 50% + } else if rttVar < smoothedRTT/2 { + newSize = int(float64(currentSize) * 1.25) // Increase by 25% + } else if rttVar > smoothedRTT/2 { + newSize = int(float64(currentSize) * 0.9) // Decrease by 10% + } + } else if isRemoteNetwork { + if rttVar < smoothedRTT/8 && smoothedRTT < time.Duration(float64(minRTT)*1.25) { + newSize = int(float64(currentSize) * 1.1) // Increase by 10% + } else if rttVar > smoothedRTT/4 || smoothedRTT > time.Duration(float64(minRTT)*1.5) { + newSize = int(float64(currentSize) * 0.75) // Decrease by 25% + } + } else { + if rttVar < smoothedRTT/4 && smoothedRTT < minRTT+minRTT/2 { + // Network is stable, increase frame size + newSize = int(float64(currentSize) * (1.0 + 0.25/rttMultiplier)) + + if newSize > sender.maxFrameSize { + newSize = sender.maxFrameSize + } + } else if rttVar > smoothedRTT/2 || smoothedRTT > time.Duration(float64(minRTT)*2) { + // Network is unstable or congested, decrease frame size + newSize = int(float64(currentSize) * (1.0 - 0.25*rttMultiplier)) + + if newSize < sender.minFrameSize { + newSize = sender.minFrameSize + } + } + } + + if newSize > sender.maxFrameSize { + newSize = sender.maxFrameSize + } else if newSize < sender.minFrameSize { + newSize = sender.minFrameSize + } + + if newSize != currentSize { + sender.frameSize = newSize + } + + sender.lastFrameSizeAdj = now + sender.mu.Unlock() + } + } + // retry first var retryFrame *SendFrame sender.mu.Lock() @@ -213,8 +328,12 @@ func (sender *Sender) loopSend() { continue } + sender.mu.Lock() + currentFrameSize := sender.frameSize + sender.mu.Unlock() + // no retry frames, get a new frame from src - buf := make([]byte, sender.frameSize) + buf := make([]byte, currentFrameSize) n, err := sender.src.Read(buf) if err == io.EOF { // send last frame and it's buffer is nil @@ -296,6 +415,12 @@ func (sender *Sender) ackHandler() { sender.mu.Lock() waitSendFrame, ok := sender.waitAcks[ack.FrameID] if ok { + if !waitSendFrame.GetSendTime().IsZero() { + rtt := time.Since(waitSendFrame.GetSendTime()) + waitSendFrame.SetRTT(rtt) + sender.rttStats.UpdateRTT(waitSendFrame.GetSendTime()) + } + waitSendFrame.SetAck() delete(sender.waitAcks, ack.FrameID) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 7d9f840..08ab780 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -23,6 +23,10 @@ type Transfer struct { lastMetricTime time.Time currentThroughput float64 // bytes per second + rttStats *RTTStats + congestionWindow int64 + lastCongestionAdj time.Time + s *stream.FrameStream limiter *limit.Limiter frameCh chan *SendFrame @@ -49,6 +53,9 @@ func NewTransfer(id int, maxBufferCount int, s *stream.FrameStream, startTime: now, lastMetricTime: now, currentThroughput: 0, + rttStats: NewRTTStats(), + congestionWindow: int64(1), + lastCongestionAdj: now, s: s, limiter: limit.NewLimiter(int64(1)), frameCh: frameCh, @@ -105,7 +112,14 @@ func (t *Transfer) frameSender() { if t.inSlowStart { n = 2 * n } else { - n++ + rttVar := t.rttStats.GetRTTVariation() + smoothedRTT := t.rttStats.GetSmoothedRTT() + + if rttVar < smoothedRTT/4 { + n += n / 4 + } else { + n++ + } } if n > t.maxBufferCount { @@ -130,10 +144,48 @@ func (t *Transfer) frameSender() { continue } + sf.UpdateSendTime() + t.mu.Lock() t.waitAcks[sf.FrameID()] = sf t.mu.Unlock() + if !t.inSlowStart && t.framesSent > 20 { + var framesToRetry []*SendFrame + + t.mu.Lock() + now := time.Now() + smoothedRTT := t.rttStats.GetSmoothedRTT() + rttTimeout := smoothedRTT * 3 // Timeout threshold + + retryCount := 0 + maxRetryPerCycle := 5 + + for frameID, waitFrame := range t.waitAcks { + if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() && waitFrame.retryTimes < 3 { + elapsed := now.Sub(waitFrame.sendTime) + if elapsed > rttTimeout { + framesToRetry = append(framesToRetry, waitFrame) + retryCount++ + if retryCount >= maxRetryPerCycle { + break + } + } + } + } + t.mu.Unlock() + + for _, waitFrame := range framesToRetry { + waitFrame.retryTimes++ + waitFrame.UpdateSendTime() + + err = t.s.WriteFrame(waitFrame.Frame()) + if err != nil { + return + } + } + } + err = t.s.WriteFrame(sf.Frame()) if err != nil { return @@ -141,6 +193,224 @@ func (t *Transfer) frameSender() { } } +func (t *Transfer) adjustLimitInSlowStart(currentLimit int64, smoothedRTT, minRTT time.Duration, + rttMultiplier float64, isLocalNetwork, isRemoteNetwork bool) (newLimit int64, exitSlowStart bool) { + + newLimit = currentLimit + exitSlowStart = false + + if isLocalNetwork { + newLimit = currentLimit * 3 + if smoothedRTT > time.Duration(float64(minRTT)*1.5) && t.framesSent > 10 { + exitSlowStart = true + newLimit = currentLimit * 2 + } + } else if isRemoteNetwork { + newLimit = currentLimit * 2 + if smoothedRTT > time.Duration(float64(minRTT)*3) && t.framesSent > 30 { + exitSlowStart = true + newLimit = currentLimit + } + } else { + newLimit = currentLimit * 2 + multiplier := time.Duration(int64(2 * rttMultiplier)) + if smoothedRTT > minRTT*multiplier && t.framesSent > 20 { + exitSlowStart = true + newLimit = currentLimit + } + } + + return newLimit, exitSlowStart +} + +func (t *Transfer) classifyWorkerType(isLocalNetwork, isRemoteNetwork bool) (isSlowWorker, isFastWorker bool) { + if t.framesSent <= 20 { + return false, false + } + + // Calculate expected throughput based on network type + var expectedThroughput float64 + if isLocalNetwork { + expectedThroughput = 500 * 1024 // 500KB/s as a reference point + } else if isRemoteNetwork { + expectedThroughput = 200 * 1024 // 200KB/s as a reference point + } else { + expectedThroughput = 300 * 1024 // 300KB/s as a reference point + } + + isSlowWorker = t.currentThroughput < expectedThroughput*0.5 + isFastWorker = t.currentThroughput > expectedThroughput*1.5 + + return isSlowWorker, isFastWorker +} + +func (t *Transfer) adjustLimitForLocalNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + isSlowWorker bool) int64 { + + newLimit := currentLimit + + if isSlowWorker { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 2) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 3) + } else { + newLimit = currentLimit + (currentLimit / 5) + } + } else { // Normal or fast worker + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 4) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 6) + } else { + newLimit = currentLimit + (currentLimit / 10) + } + } + + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 4 / 5 + } else { + newLimit = currentLimit * 3 / 4 + } + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) adjustLimitForRemoteNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + isSlowWorker bool) int64 { + + newLimit := currentLimit + + if isSlowWorker { + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 8) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 12) + } else { + newLimit = currentLimit + (currentLimit / 20) + } + } else { // Normal or fast worker + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 10) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 16) + } else { + newLimit = currentLimit + (currentLimit / 32) + } + } + + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 3 / 5 + } else { + newLimit = currentLimit / 2 + } + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) adjustLimitForUnknownNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + rttMultiplier float64, isSlowWorker, isFastWorker bool) int64 { + + newLimit := currentLimit + + adjustedMultiplier := rttMultiplier + if isSlowWorker { + adjustedMultiplier *= 0.8 + } else if isFastWorker { + adjustedMultiplier *= 1.2 + } + + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + int64(float64(currentLimit)/(6*adjustedMultiplier)) + } else { + newLimit = currentLimit + int64(float64(currentLimit)/(12*adjustedMultiplier)) + } + + if smoothedRTT > time.Duration(float64(minRTT)*3) { + backoffFactor := 2.0 * adjustedMultiplier + if isSlowWorker { + backoffFactor *= 0.8 // Less aggressive backoff for slow workers + } + newLimit = int64(float64(currentLimit) / backoffFactor) + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) updateCongestionControl(now time.Time) { + if now.Sub(t.lastCongestionAdj) <= 100*time.Millisecond { + return + } + + smoothedRTT := t.rttStats.GetSmoothedRTT() + rttVar := t.rttStats.GetRTTVariation() + minRTT := t.rttStats.GetMinRTT() + rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() + isLocalNetwork := t.rttStats.IsLocalNetwork() + isRemoteNetwork := t.rttStats.IsRemoteNetwork() + + currentLimit := t.limiter.LimitNum() + var newLimit int64 = currentLimit + + if t.inSlowStart { + newLimit, t.inSlowStart = t.adjustLimitInSlowStart( + currentLimit, smoothedRTT, minRTT, rttMultiplier, isLocalNetwork, isRemoteNetwork) + } else { + isSlowWorker, isFastWorker := t.classifyWorkerType(isLocalNetwork, isRemoteNetwork) + + if isLocalNetwork { + newLimit = t.adjustLimitForLocalNetwork(currentLimit, rttVar, smoothedRTT, minRTT, isSlowWorker) + } else if isRemoteNetwork { + newLimit = t.adjustLimitForRemoteNetwork(currentLimit, rttVar, smoothedRTT, minRTT, isSlowWorker) + } else { + newLimit = t.adjustLimitForUnknownNetwork( + currentLimit, rttVar, smoothedRTT, minRTT, rttMultiplier, isSlowWorker, isFastWorker) + } + } + + // Cap at maxBufferCount + if newLimit > int64(t.maxBufferCount) { + newLimit = int64(t.maxBufferCount) + t.inSlowStart = false + } + + t.congestionWindow = newLimit + t.limiter.SetLimit(newLimit) + t.lastCongestionAdj = now +} + +func (t *Transfer) updateThroughputMetrics(now time.Time) { + if t.framesSent%10 != 0 && now.Sub(t.lastMetricTime) <= time.Second { + return + } + + elapsedSeconds := now.Sub(t.lastMetricTime).Seconds() + if elapsedSeconds <= 0 { + return + } + + // Calculate bytes per second + t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds + + t.updateCongestionControl(now) + + t.lastMetricTime = now + t.bytesTransferred = 0 +} + func (t *Transfer) ackReceiver() { defer t.recvShutdown.Done() @@ -154,45 +424,22 @@ func (t *Transfer) ackReceiver() { t.mu.Lock() sf, ok := t.waitAcks[ack.FrameID] if ok { + if !sf.sendTime.IsZero() { + t.rttStats.UpdateRTT(sf.sendTime) + } + t.framesSent++ if sf.Frame().Buf != nil { t.bytesTransferred += uint64(len(sf.Frame().Buf)) } - // Calculate throughput every 10 frames or at least once per second now := time.Now() - if t.framesSent%10 == 0 || now.Sub(t.lastMetricTime) > time.Second { - elapsedSeconds := now.Sub(t.lastMetricTime).Seconds() - if elapsedSeconds > 0 { - // Calculate bytes per second - t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds - - currentLimit := t.limiter.LimitNum() - if t.currentThroughput > 0 { - newLimit := currentLimit - if t.inSlowStart { - newLimit = currentLimit * 2 - } else { - newLimit = currentLimit + (currentLimit / 10) - } - - // Cap at maxBufferCount - if newLimit > int64(t.maxBufferCount) { - newLimit = int64(t.maxBufferCount) - t.inSlowStart = false - } - - t.limiter.SetLimit(newLimit) - } - - t.lastMetricTime = now - t.bytesTransferred = 0 - } - } + t.updateThroughputMetrics(now) delete(t.waitAcks, ack.FrameID) } t.mu.Unlock() + if ok { t.limiter.Release() }