Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ jobs:
with:
go-version: '1.23'

- name: Check Format
run: |
unformatted=$(gofmt -l .)
if [ -n "$unformatted" ]; then
echo "The following files are not formatted with go fmt:"
echo "$unformatted"
exit 1
fi

- name: Build
run: go build ./...

Expand Down
125 changes: 125 additions & 0 deletions cmd/bandwidth-test/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"sync"
)

type Process struct {
cmd *exec.Cmd
cancel context.CancelFunc
errorOutput *bytes.Buffer
stdOutput *bytes.Buffer
mutex sync.Mutex
stopped bool
name string // Process name for logging
showOutput bool // Whether to show output in real-time
}

type OutputWriter struct {
buffer *bytes.Buffer
prefix string
}

func (w *OutputWriter) Write(p []byte) (n int, err error) {
n, err = w.buffer.Write(p)
if err != nil {
return n, err
}

fmt.Print(w.prefix)
fmt.Print(string(p))

return n, nil
}

func NewProcess(name, path string, params []string, showOutput bool) *Process {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, path, params...)
p := &Process{
cmd: cmd,
cancel: cancel,
name: name,
showOutput: showOutput,
}
p.errorOutput = bytes.NewBufferString("")
p.stdOutput = bytes.NewBufferString("")

if showOutput {
stdoutWriter := &OutputWriter{
buffer: p.stdOutput,
prefix: fmt.Sprintf("[%s] ", name),
}
stderrWriter := &OutputWriter{
buffer: p.errorOutput,
prefix: fmt.Sprintf("[%s ERROR] ", name),
}
cmd.Stdout = stdoutWriter
cmd.Stderr = stderrWriter
} else {
cmd.Stdout = p.stdOutput
cmd.Stderr = p.errorOutput
}

return p
}

func (p *Process) Start() error {
return p.cmd.Start()
}

func (p *Process) Stop() error {
p.mutex.Lock()
defer p.mutex.Unlock()

if p.stopped {
return nil
}

p.stopped = true
p.cancel()
return p.cmd.Wait()
}

func (p *Process) ErrorOutput() string {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.errorOutput.String()
}

func (p *Process) StdOutput() string {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.stdOutput.String()
}

func GetExecutablePath(name string) string {
currentDirPath := "./" + name
if _, err := os.Stat(currentDirPath); err == nil {
return currentDirPath
}

if _, err := os.Stat(name); err == nil {
return name
}

cwd, err := os.Getwd()
if err == nil {
binPath := filepath.Join(cwd, "bin", name)
if _, err := os.Stat(binPath); err == nil {
return binPath
}
}

path, err := exec.LookPath(name)
if err == nil {
return path
}

return name
}
154 changes: 0 additions & 154 deletions cmd/bandwidth-test/recv.go
Original file line number Diff line number Diff line change
@@ -1,159 +1,5 @@
package main

import (
"crypto/tls"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"time"

fio "github.com/fatedier/fft/pkg/io"
"github.com/fatedier/fft/pkg/msg"
"github.com/fatedier/fft/pkg/receiver"
"github.com/fatedier/fft/pkg/stream"

"github.com/cheggaaa/pb"
)

func recvFile(serverAddr, id, filePath string, cacheCount int, callback func(n int)) error {
isDir := false
finfo, err := os.Stat(filePath)
if err == nil && finfo.IsDir() {
isDir = true
}

conn, err := net.Dial("tcp", serverAddr)
if err != nil {
return err
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
defer conn.Close()

fmt.Printf("Receiver sending registration to server...\n")
err = msg.WriteMsg(conn, &msg.ReceiveFile{
ID: id,
CacheCount: int64(cacheCount),
})
if err != nil {
return fmt.Errorf("failed to send receive file message: %v", err)
}
fmt.Printf("Receiver waiting for server response...\n")

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
return fmt.Errorf("receiver failed to read response: %v", err)
}
conn.SetReadDeadline(time.Time{})
fmt.Printf("Receiver got response from server\n")

m, ok := raw.(*msg.ReceiveFileResp)
if !ok {
return fmt.Errorf("get receive file response format error")
}
if m.Error != "" {
return fmt.Errorf(m.Error)
}

if len(m.Workers) == 0 {
return fmt.Errorf("no available workers")
}

fmt.Printf("Recv filename: %s Size: %s\n", m.Name, pb.Format(m.Fsize).To(pb.U_BYTES).String())
fmt.Printf("Workers: %v\n", m.Workers)

realPath := filePath
if isDir {
realPath = filepath.Join(filePath, m.Name)
}
f, err := os.Create(realPath)
if err != nil {
return err
}
defer f.Close()

callbackWriter := fio.NewCallbackWriter(f, callback)
recv := receiver.NewReceiver(0, callbackWriter)

var wait sync.WaitGroup
for _, worker := range m.Workers {
wait.Add(1)
go func(addr string) {
newRecvStream(recv, id, addr)
wait.Done()
}(worker)
}

recvDoneCh := make(chan struct{})
streamCloseCh := make(chan struct{})
go func() {
recv.Run()
close(recvDoneCh)
}()
go func() {
wait.Wait()
close(streamCloseCh)
}()

select {
case <-recvDoneCh:
case <-streamCloseCh:
select {
case <-recvDoneCh:
case <-time.After(2 * time.Second):
}
}

return nil
}

func newRecvStream(recv *receiver.Receiver, id string, addr string) {
conn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Printf("[%s] Error connecting to worker: %v\n", addr, err)
return
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
defer conn.Close()

msg.WriteMsg(conn, &msg.NewReceiveFileStream{
ID: id,
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
fmt.Printf("[%s] Error reading response: %v\n", addr, err)
return
}
conn.SetReadDeadline(time.Time{})

m, ok := raw.(*msg.NewReceiveFileStreamResp)
if !ok {
fmt.Printf("[%s] Invalid response format\n", addr)
return
}

if m.Error != "" {
fmt.Printf("[%s] Worker error: %s\n", addr, m.Error)
return
}

s := stream.NewFrameStream(conn)
for {
frame, err := s.ReadFrame()
if err != nil {
return
}
recv.RecvFrame(frame)
err = s.WriteAck(&stream.Ack{
FileID: frame.FileID,
FrameID: frame.FrameID,
})
if err != nil {
return
}
}
}
Loading