diff --git a/client/recv.go b/client/recv.go index 655c961..dc3540a 100644 --- a/client/recv.go +++ b/client/recv.go @@ -85,6 +85,14 @@ func (svc *Service) recvFile(id string, filePath string) error { } recv := receiver.NewReceiver(0, fio.NewCallbackWriter(f, callback)) + + if svc.enableUnorderedProcessing { + if svc.debugMode { + fmt.Printf("Enabling unordered frame processing with buffer size: %d\n", svc.bufferSize) + } + recv.EnableUnorderedProcessing(svc.bufferSize) + } + for _, worker := range m.Workers { wait.Add(1) go func(addr string) { diff --git a/client/send.go b/client/send.go index 33a1e5c..efb629d 100644 --- a/client/send.go +++ b/client/send.go @@ -88,6 +88,13 @@ func (svc *Service) sendFile(id string, filePath string) error { return err } + if svc.dynamicAllocation { + if svc.debugMode { + fmt.Println("Enabling dynamic frame allocation") + } + s.EnableDynamicAllocation() + } + for _, worker := range m.Workers { wait.Add(1) go func(addr string) { diff --git a/client/service.go b/client/service.go index 968015f..f27bc3a 100644 --- a/client/service.go +++ b/client/service.go @@ -5,13 +5,16 @@ import ( ) type Options struct { - ServerAddr string - ID string - SendFile string - FrameSize int - CacheCount int - RecvFile string - DebugMode bool + ServerAddr string + ID string + SendFile string + FrameSize int + CacheCount int + RecvFile string + DebugMode bool + EnableUnorderedProcessing bool + DynamicAllocation bool + BufferSize int } func (op *Options) Check() error { @@ -32,10 +35,13 @@ func (op *Options) Check() error { } type Service struct { - debugMode bool - serverAddr string - frameSize int - cacheCount int + debugMode bool + serverAddr string + frameSize int + cacheCount int + enableUnorderedProcessing bool + dynamicAllocation bool + bufferSize int runHandler func() error } @@ -46,10 +52,13 @@ func NewService(options Options) (*Service, error) { } svc := &Service{ - debugMode: options.DebugMode, - serverAddr: options.ServerAddr, - frameSize: options.FrameSize, - cacheCount: options.CacheCount, + debugMode: options.DebugMode, + serverAddr: options.ServerAddr, + frameSize: options.FrameSize, + cacheCount: options.CacheCount, + enableUnorderedProcessing: options.EnableUnorderedProcessing, + dynamicAllocation: options.DynamicAllocation, + bufferSize: options.BufferSize, } if options.SendFile != "" { diff --git a/cmd/bandwidth-test/root.go b/cmd/bandwidth-test/root.go index 3e161c8..743dea2 100644 --- a/cmd/bandwidth-test/root.go +++ b/cmd/bandwidth-test/root.go @@ -17,12 +17,15 @@ import ( ) var ( - showVersion bool - fileSize int64 - duration int - tempDir string - workers string - verbose bool // Whether to show detailed output from external processes + showVersion bool + fileSize int64 + duration int + tempDir string + workers string + verbose bool // Whether to show detailed output from external processes + enableUnorderedProcessing bool // Enable out-of-order frame processing + dynamicAllocation bool // Enable dynamic frame allocation + bufferSize int // Maximum buffer size for out-of-order frames ) func init() { @@ -32,6 +35,10 @@ func init() { rootCmd.PersistentFlags().StringVarP(&tempDir, "temp-dir", "t", os.TempDir(), "directory to store temporary files") rootCmd.PersistentFlags().StringVarP(&workers, "workers", "w", "100KB,500KB", "worker bandwidth configuration, comma-separated list of bandwidth limits (e.g., '200KB' for one worker, '200KB,200KB,300KB' for three workers)") rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "", false, "show detailed output from external processes") + + rootCmd.PersistentFlags().BoolVar(&enableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers") + rootCmd.PersistentFlags().BoolVar(&dynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance") + rootCmd.PersistentFlags().IntVar(&bufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)") } var rootCmd = &cobra.Command{ @@ -229,6 +236,11 @@ func startSender(serverAddr, transferID, testFilePath string) (*Process, chan er senderArgs = append(senderArgs, "--debug") } + if dynamicAllocation { + senderArgs = append(senderArgs, "--dynamic-allocation") + fmt.Println("Dynamic frame allocation enabled") + } + senderProcess := NewProcess("Sender", fftPath, senderArgs, verbose) fmt.Println("Starting sender...") @@ -276,6 +288,14 @@ func startReceiver(serverAddr, transferID, recvDir string, fileSizeBytes int64) receiverArgs = append(receiverArgs, "--debug") } + if enableUnorderedProcessing { + receiverArgs = append(receiverArgs, "--enable-unordered-processing") + if bufferSize > 0 { + receiverArgs = append(receiverArgs, "--buffer-size", fmt.Sprintf("%d", bufferSize)) + } + fmt.Printf("Unordered frame processing enabled with buffer size: %d\n", bufferSize) + } + receiverProcess := NewProcess("Receiver", fftPath, receiverArgs, verbose) fmt.Println("Starting receiver...") diff --git a/cmd/fft/root.go b/cmd/fft/root.go index 0a0fdc4..f36dc3d 100644 --- a/cmd/fft/root.go +++ b/cmd/fft/root.go @@ -24,6 +24,10 @@ func init() { rootCmd.PersistentFlags().IntVarP(&options.CacheCount, "cache-count", "c", 512, "how many frames be cached, it will be set to the min value between sender and receiver") rootCmd.PersistentFlags().StringVarP(&options.RecvFile, "recv-file", "t", "", "specify local file path to store received file") rootCmd.PersistentFlags().BoolVarP(&options.DebugMode, "debug", "g", false, "print more debug info") + + rootCmd.PersistentFlags().BoolVar(&options.EnableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers") + rootCmd.PersistentFlags().BoolVar(&options.DynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance") + rootCmd.PersistentFlags().IntVar(&options.BufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)") } var rootCmd = &cobra.Command{ diff --git a/pkg/receiver/receiver.go b/pkg/receiver/receiver.go index aeafd32..26b76e3 100644 --- a/pkg/receiver/receiver.go +++ b/pkg/receiver/receiver.go @@ -17,22 +17,40 @@ type Receiver struct { framesIDMap map[uint32]struct{} notifyCh chan struct{} + unorderedEnabled bool + orderedBuffer map[uint32]*stream.Frame // Buffer for out-of-order frames + maxBufferSize int // Maximum number of frames to buffer + mu sync.RWMutex } func NewReceiver(fileID uint32, dst io.Writer) *Receiver { return &Receiver{ - fileID: fileID, - nextFrameID: 0, - dst: dst, - frames: make([]*stream.Frame, 0), - framesIDMap: make(map[uint32]struct{}), - notifyCh: make(chan struct{}, 1), + fileID: fileID, + nextFrameID: 0, + dst: dst, + frames: make([]*stream.Frame, 0), + framesIDMap: make(map[uint32]struct{}), + notifyCh: make(chan struct{}, 1), + unorderedEnabled: false, + orderedBuffer: make(map[uint32]*stream.Frame), + maxBufferSize: 1000, // Default buffer size + } +} + +func (r *Receiver) EnableUnorderedProcessing(maxBufferSize int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.unorderedEnabled = true + if maxBufferSize > 0 { + r.maxBufferSize = maxBufferSize } } func (r *Receiver) RecvFrame(frame *stream.Frame) { r.mu.Lock() + if frame.FrameID < r.nextFrameID { r.mu.Unlock() return @@ -43,11 +61,20 @@ func (r *Receiver) RecvFrame(frame *stream.Frame) { return } - r.frames = append(r.frames, frame) r.framesIDMap[frame.FrameID] = struct{}{} - sort.Slice(r.frames, func(i, j int) bool { - return r.frames[i].FrameID < r.frames[j].FrameID - }) + + if r.unorderedEnabled { + if frame.FrameID == r.nextFrameID { + r.frames = append(r.frames, frame) + } else { + r.orderedBuffer[frame.FrameID] = frame + } + } else { + r.frames = append(r.frames, frame) + sort.Slice(r.frames, func(i, j int) bool { + return r.frames[i].FrameID < r.frames[j].FrameID + }) + } r.mu.Unlock() select { @@ -67,6 +94,7 @@ func (r *Receiver) Run() { ii := 0 finished := false r.mu.Lock() + for i, frame := range r.frames { if r.nextFrameID == frame.FrameID { ii = i + 1 @@ -85,6 +113,27 @@ func (r *Receiver) Run() { } } r.frames = r.frames[ii:] + + if r.unorderedEnabled { + continueProcessing := true + for continueProcessing { + if frame, ok := r.orderedBuffer[r.nextFrameID]; ok { + delete(r.orderedBuffer, r.nextFrameID) + delete(r.framesIDMap, r.nextFrameID) + + // Check if it's the last frame + if len(frame.Buf) == 0 { + finished = true + break + } + + buffer.Write(frame.Buf) + r.nextFrameID++ + } else { + continueProcessing = false + } + } + } r.mu.Unlock() buf := buffer.Bytes() diff --git a/pkg/sender/frame.go b/pkg/sender/frame.go index 2eb82fc..b9ae86e 100644 --- a/pkg/sender/frame.go +++ b/pkg/sender/frame.go @@ -13,13 +13,15 @@ type SendFrame struct { sendTime time.Time retryTimes int hasAck bool + transferID int // ID of the transfer this frame is assigned to mu sync.Mutex } func NewSendFrame(frame *stream.Frame) *SendFrame { return &SendFrame{ - frame: frame, + frame: frame, + transferID: -1, // -1 means not assigned to any specific transfer } } @@ -44,3 +46,11 @@ func (sf *SendFrame) HasAck() bool { func (sf *SendFrame) SetAck() { sf.hasAck = true } + +func (sf *SendFrame) SetTransferID(id int) { + sf.transferID = id +} + +func (sf *SendFrame) GetTransferID() int { + return sf.transferID +} diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 994b444..fcc338e 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -32,6 +32,12 @@ type Sender struct { // get each ack message from ackCh ackCh chan *stream.Ack + dynamicAllocationEnabled bool + transfers map[int]*Transfer + totalThroughput float64 + allocationRatios map[int]float64 + transfersMu sync.RWMutex + retryFrames []*SendFrame maxBufferCount int @@ -57,18 +63,22 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se } s := &Sender{ - id: id, - frameSize: frameSize, - src: src, - frameCh: make(chan *SendFrame), - ackCh: make(chan *stream.Ack), - maxBufferCount: maxBufferCount, - retryFrames: make([]*SendFrame, 0), - limiter: make(chan struct{}, maxBufferCount), - waitAcks: make(map[uint32]*SendFrame), - bufferFrames: make([]*SendFrame, 0), - sendShutdown: shutdown.New(), - ackShutdown: shutdown.New(), + id: id, + frameSize: frameSize, + src: src, + frameCh: make(chan *SendFrame), + ackCh: make(chan *stream.Ack), + dynamicAllocationEnabled: false, + transfers: make(map[int]*Transfer), + totalThroughput: 0, + allocationRatios: make(map[int]float64), + maxBufferCount: maxBufferCount, + retryFrames: make([]*SendFrame, 0), + limiter: make(chan struct{}, maxBufferCount), + waitAcks: make(map[uint32]*SendFrame), + bufferFrames: make([]*SendFrame, 0), + sendShutdown: shutdown.New(), + ackShutdown: shutdown.New(), } for i := 0; i < maxBufferCount; i++ { s.limiter <- struct{}{} @@ -76,6 +86,13 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se return s, nil } +func (sender *Sender) EnableDynamicAllocation() { + sender.transfersMu.Lock() + defer sender.transfersMu.Unlock() + + sender.dynamicAllocationEnabled = true +} + func (sender *Sender) HandleStream(s *stream.FrameStream) { sender.mu.Lock() if sender.sendAll { @@ -92,8 +109,29 @@ func (sender *Sender) HandleStream(s *stream.FrameStream) { } tr := NewTransfer(int(id), trBufferCount, s, sender.frameCh, sender.ackCh) + if sender.dynamicAllocationEnabled { + sender.transfersMu.Lock() + sender.transfers[int(id)] = tr + totalTransfers := len(sender.transfers) + if totalTransfers > 0 { + equalRatio := 1.0 / float64(totalTransfers) + for transferID := range sender.transfers { + sender.allocationRatios[transferID] = equalRatio + } + } + sender.transfersMu.Unlock() + } + // block until transfer exit noAckFrames := tr.Run() + + if sender.dynamicAllocationEnabled { + sender.transfersMu.Lock() + delete(sender.transfers, int(id)) + delete(sender.allocationRatios, int(id)) + sender.transfersMu.Unlock() + } + if len(noAckFrames) > 0 { sender.mu.Lock() sender.retryFrames = append(sender.retryFrames, noAckFrames...) @@ -112,6 +150,43 @@ func (sender *Sender) Run() { sender.ackShutdown.WaitDone() } +func (sender *Sender) updateAllocationRatios() { + sender.transfersMu.RLock() + defer sender.transfersMu.RUnlock() + + if len(sender.transfers) <= 1 { + for id := range sender.transfers { + sender.allocationRatios[id] = 1.0 + } + return + } + + // Calculate total throughput across all transfers + totalThroughput := 0.0 + for _, transfer := range sender.transfers { + if transfer.currentThroughput > 0 { + totalThroughput += transfer.currentThroughput + } else { + totalThroughput += 1.0 + } + } + + if totalThroughput > 0 { + for id, transfer := range sender.transfers { + throughput := transfer.currentThroughput + if throughput <= 0 { + throughput = 1.0 // Default value if no data yet + } + sender.allocationRatios[id] = throughput / totalThroughput + } + } else { + equalRatio := 1.0 / float64(len(sender.transfers)) + for id := range sender.transfers { + sender.allocationRatios[id] = equalRatio + } + } +} + func (sender *Sender) loopSend() { defer sender.sendShutdown.Done() @@ -161,12 +236,46 @@ func (sender *Sender) loopSend() { } buf = buf[:n] - // send frames to transfers f := stream.NewFrame(0, count, buf) sf := NewSendFrame(f) sender.mu.Lock() sender.waitAcks[sf.FrameID()] = sf sender.bufferFrames = append(sender.bufferFrames, sf) + + if sender.dynamicAllocationEnabled { + if count%10 == 0 { + sender.updateAllocationRatios() + } + + if len(sender.transfers) > 1 { + var maxThroughput, minThroughput float64 + maxThroughput = 0 + minThroughput = float64(^uint(0) >> 1) // Max int value + + for _, transfer := range sender.transfers { + if transfer.currentThroughput > maxThroughput { + maxThroughput = transfer.currentThroughput + } + if transfer.currentThroughput > 0 && transfer.currentThroughput < minThroughput { + minThroughput = transfer.currentThroughput + } + } + + if minThroughput == float64(^uint(0)>>1) || maxThroughput == 0 || + maxThroughput/minThroughput < 1.5 { + sf.SetTransferID(-1) + } else { + var fastestWorkerID int + for id, transfer := range sender.transfers { + if transfer.currentThroughput == maxThroughput { + fastestWorkerID = id + break + } + } + sf.SetTransferID(fastestWorkerID) + } + } + } sender.mu.Unlock() sender.frameCh <- sf diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 7adbcde..7d9f840 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -17,6 +17,12 @@ type Transfer struct { inSlowStart bool waitAcks map[uint32]*SendFrame + framesSent uint64 + bytesTransferred uint64 + startTime time.Time + lastMetricTime time.Time + currentThroughput float64 // bytes per second + s *stream.FrameStream limiter *limit.Limiter frameCh chan *SendFrame @@ -32,18 +38,26 @@ func NewTransfer(id int, maxBufferCount int, s *stream.FrameStream, if maxBufferCount <= 0 { maxBufferCount = 10 } + now := time.Now() t := &Transfer{ - id: id, - maxBufferCount: maxBufferCount, - inSlowStart: true, - waitAcks: make(map[uint32]*SendFrame), - s: s, - limiter: limit.NewLimiter(int64(1)), - frameCh: frameCh, - ackCh: ackCh, - sendShutdown: shutdown.New(), - recvShutdown: shutdown.New(), + id: id, + maxBufferCount: maxBufferCount, + inSlowStart: true, + waitAcks: make(map[uint32]*SendFrame), + framesSent: 0, + bytesTransferred: 0, + startTime: now, + lastMetricTime: now, + currentThroughput: 0, + s: s, + limiter: limit.NewLimiter(int64(1)), + frameCh: frameCh, + ackCh: ackCh, + sendShutdown: shutdown.New(), + recvShutdown: shutdown.New(), } + + t.limiter.SetLimit(int64(1)) return t } @@ -107,6 +121,15 @@ func (t *Transfer) frameSender() { return } + if sf.GetTransferID() != -1 && sf.GetTransferID() != t.id { + select { + case t.frameCh <- sf: + default: + sf.SetTransferID(t.id) + } + continue + } + t.mu.Lock() t.waitAcks[sf.FrameID()] = sf t.mu.Unlock() @@ -129,8 +152,44 @@ func (t *Transfer) ackReceiver() { } t.mu.Lock() - _, ok := t.waitAcks[ack.FrameID] + sf, ok := t.waitAcks[ack.FrameID] if ok { + t.framesSent++ + if sf.Frame().Buf != nil { + t.bytesTransferred += uint64(len(sf.Frame().Buf)) + } + + // Calculate throughput every 10 frames or at least once per second + now := time.Now() + if t.framesSent%10 == 0 || now.Sub(t.lastMetricTime) > time.Second { + elapsedSeconds := now.Sub(t.lastMetricTime).Seconds() + if elapsedSeconds > 0 { + // Calculate bytes per second + t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds + + currentLimit := t.limiter.LimitNum() + if t.currentThroughput > 0 { + newLimit := currentLimit + if t.inSlowStart { + newLimit = currentLimit * 2 + } else { + newLimit = currentLimit + (currentLimit / 10) + } + + // Cap at maxBufferCount + if newLimit > int64(t.maxBufferCount) { + newLimit = int64(t.maxBufferCount) + t.inSlowStart = false + } + + t.limiter.SetLimit(newLimit) + } + + t.lastMetricTime = now + t.bytesTransferred = 0 + } + } + delete(t.waitAcks, ack.FrameID) } t.mu.Unlock()