Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions client/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (svc *Service) recvFile(id string, filePath string) error {
}

recv := receiver.NewReceiver(0, fio.NewCallbackWriter(f, callback))

if svc.enableUnorderedProcessing {
if svc.debugMode {
fmt.Printf("Enabling unordered frame processing with buffer size: %d\n", svc.bufferSize)
}
recv.EnableUnorderedProcessing(svc.bufferSize)
}

for _, worker := range m.Workers {
wait.Add(1)
go func(addr string) {
Expand Down
7 changes: 7 additions & 0 deletions client/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func (svc *Service) sendFile(id string, filePath string) error {
return err
}

if svc.dynamicAllocation {
if svc.debugMode {
fmt.Println("Enabling dynamic frame allocation")
}
s.EnableDynamicAllocation()
}

for _, worker := range m.Workers {
wait.Add(1)
go func(addr string) {
Expand Down
39 changes: 24 additions & 15 deletions client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
)

type Options struct {
ServerAddr string
ID string
SendFile string
FrameSize int
CacheCount int
RecvFile string
DebugMode bool
ServerAddr string
ID string
SendFile string
FrameSize int
CacheCount int
RecvFile string
DebugMode bool
EnableUnorderedProcessing bool
DynamicAllocation bool
BufferSize int
}

func (op *Options) Check() error {
Expand All @@ -32,10 +35,13 @@ func (op *Options) Check() error {
}

type Service struct {
debugMode bool
serverAddr string
frameSize int
cacheCount int
debugMode bool
serverAddr string
frameSize int
cacheCount int
enableUnorderedProcessing bool
dynamicAllocation bool
bufferSize int

runHandler func() error
}
Expand All @@ -46,10 +52,13 @@ func NewService(options Options) (*Service, error) {
}

svc := &Service{
debugMode: options.DebugMode,
serverAddr: options.ServerAddr,
frameSize: options.FrameSize,
cacheCount: options.CacheCount,
debugMode: options.DebugMode,
serverAddr: options.ServerAddr,
frameSize: options.FrameSize,
cacheCount: options.CacheCount,
enableUnorderedProcessing: options.EnableUnorderedProcessing,
dynamicAllocation: options.DynamicAllocation,
bufferSize: options.BufferSize,
}

if options.SendFile != "" {
Expand Down
32 changes: 26 additions & 6 deletions cmd/bandwidth-test/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
)

var (
showVersion bool
fileSize int64
duration int
tempDir string
workers string
verbose bool // Whether to show detailed output from external processes
showVersion bool
fileSize int64
duration int
tempDir string
workers string
verbose bool // Whether to show detailed output from external processes
enableUnorderedProcessing bool // Enable out-of-order frame processing
dynamicAllocation bool // Enable dynamic frame allocation
bufferSize int // Maximum buffer size for out-of-order frames
)

func init() {
Expand All @@ -32,6 +35,10 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&tempDir, "temp-dir", "t", os.TempDir(), "directory to store temporary files")
rootCmd.PersistentFlags().StringVarP(&workers, "workers", "w", "100KB,500KB", "worker bandwidth configuration, comma-separated list of bandwidth limits (e.g., '200KB' for one worker, '200KB,200KB,300KB' for three workers)")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "", false, "show detailed output from external processes")

rootCmd.PersistentFlags().BoolVar(&enableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers")
rootCmd.PersistentFlags().BoolVar(&dynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance")
rootCmd.PersistentFlags().IntVar(&bufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)")
}

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -229,6 +236,11 @@ func startSender(serverAddr, transferID, testFilePath string) (*Process, chan er
senderArgs = append(senderArgs, "--debug")
}

if dynamicAllocation {
senderArgs = append(senderArgs, "--dynamic-allocation")
fmt.Println("Dynamic frame allocation enabled")
}

senderProcess := NewProcess("Sender", fftPath, senderArgs, verbose)
fmt.Println("Starting sender...")

Expand Down Expand Up @@ -276,6 +288,14 @@ func startReceiver(serverAddr, transferID, recvDir string, fileSizeBytes int64)
receiverArgs = append(receiverArgs, "--debug")
}

if enableUnorderedProcessing {
receiverArgs = append(receiverArgs, "--enable-unordered-processing")
if bufferSize > 0 {
receiverArgs = append(receiverArgs, "--buffer-size", fmt.Sprintf("%d", bufferSize))
}
fmt.Printf("Unordered frame processing enabled with buffer size: %d\n", bufferSize)
}

receiverProcess := NewProcess("Receiver", fftPath, receiverArgs, verbose)
fmt.Println("Starting receiver...")

Expand Down
4 changes: 4 additions & 0 deletions cmd/fft/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func init() {
rootCmd.PersistentFlags().IntVarP(&options.CacheCount, "cache-count", "c", 512, "how many frames be cached, it will be set to the min value between sender and receiver")
rootCmd.PersistentFlags().StringVarP(&options.RecvFile, "recv-file", "t", "", "specify local file path to store received file")
rootCmd.PersistentFlags().BoolVarP(&options.DebugMode, "debug", "g", false, "print more debug info")

rootCmd.PersistentFlags().BoolVar(&options.EnableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers")
rootCmd.PersistentFlags().BoolVar(&options.DynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance")
rootCmd.PersistentFlags().IntVar(&options.BufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)")
}

var rootCmd = &cobra.Command{
Expand Down
69 changes: 59 additions & 10 deletions pkg/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,40 @@ type Receiver struct {
framesIDMap map[uint32]struct{}
notifyCh chan struct{}

unorderedEnabled bool
orderedBuffer map[uint32]*stream.Frame // Buffer for out-of-order frames
maxBufferSize int // Maximum number of frames to buffer

mu sync.RWMutex
}

func NewReceiver(fileID uint32, dst io.Writer) *Receiver {
return &Receiver{
fileID: fileID,
nextFrameID: 0,
dst: dst,
frames: make([]*stream.Frame, 0),
framesIDMap: make(map[uint32]struct{}),
notifyCh: make(chan struct{}, 1),
fileID: fileID,
nextFrameID: 0,
dst: dst,
frames: make([]*stream.Frame, 0),
framesIDMap: make(map[uint32]struct{}),
notifyCh: make(chan struct{}, 1),
unorderedEnabled: false,
orderedBuffer: make(map[uint32]*stream.Frame),
maxBufferSize: 1000, // Default buffer size
}
}

func (r *Receiver) EnableUnorderedProcessing(maxBufferSize int) {
r.mu.Lock()
defer r.mu.Unlock()

r.unorderedEnabled = true
if maxBufferSize > 0 {
r.maxBufferSize = maxBufferSize
}
}

func (r *Receiver) RecvFrame(frame *stream.Frame) {
r.mu.Lock()

if frame.FrameID < r.nextFrameID {
r.mu.Unlock()
return
Expand All @@ -43,11 +61,20 @@ func (r *Receiver) RecvFrame(frame *stream.Frame) {
return
}

r.frames = append(r.frames, frame)
r.framesIDMap[frame.FrameID] = struct{}{}
sort.Slice(r.frames, func(i, j int) bool {
return r.frames[i].FrameID < r.frames[j].FrameID
})

if r.unorderedEnabled {
if frame.FrameID == r.nextFrameID {
r.frames = append(r.frames, frame)
} else {
r.orderedBuffer[frame.FrameID] = frame
}
} else {
r.frames = append(r.frames, frame)
sort.Slice(r.frames, func(i, j int) bool {
return r.frames[i].FrameID < r.frames[j].FrameID
})
}
r.mu.Unlock()

select {
Expand All @@ -67,6 +94,7 @@ func (r *Receiver) Run() {
ii := 0
finished := false
r.mu.Lock()

for i, frame := range r.frames {
if r.nextFrameID == frame.FrameID {
ii = i + 1
Expand All @@ -85,6 +113,27 @@ func (r *Receiver) Run() {
}
}
r.frames = r.frames[ii:]

if r.unorderedEnabled {
continueProcessing := true
for continueProcessing {
if frame, ok := r.orderedBuffer[r.nextFrameID]; ok {
delete(r.orderedBuffer, r.nextFrameID)
delete(r.framesIDMap, r.nextFrameID)

// Check if it's the last frame
if len(frame.Buf) == 0 {
finished = true
break
}

buffer.Write(frame.Buf)
r.nextFrameID++
} else {
continueProcessing = false
}
}
}
r.mu.Unlock()

buf := buffer.Bytes()
Expand Down
12 changes: 11 additions & 1 deletion pkg/sender/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type SendFrame struct {
sendTime time.Time
retryTimes int
hasAck bool
transferID int // ID of the transfer this frame is assigned to

mu sync.Mutex
}

func NewSendFrame(frame *stream.Frame) *SendFrame {
return &SendFrame{
frame: frame,
frame: frame,
transferID: -1, // -1 means not assigned to any specific transfer
}
}

Expand All @@ -44,3 +46,11 @@ func (sf *SendFrame) HasAck() bool {
func (sf *SendFrame) SetAck() {
sf.hasAck = true
}

func (sf *SendFrame) SetTransferID(id int) {
sf.transferID = id
}

func (sf *SendFrame) GetTransferID() int {
return sf.transferID
}
Loading