Skip to content

Commit 880a0d0

Browse files
Modify bandwidth-test to use command-line tool execution (#20)
1 parent b0a0653 commit 880a0d0

File tree

14 files changed

+393
-436
lines changed

14 files changed

+393
-436
lines changed

.github/workflows/go.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ jobs:
1818
with:
1919
go-version: '1.23'
2020

21+
- name: Check Format
22+
run: |
23+
unformatted=$(gofmt -l .)
24+
if [ -n "$unformatted" ]; then
25+
echo "The following files are not formatted with go fmt:"
26+
echo "$unformatted"
27+
exit 1
28+
fi
29+
2130
- name: Build
2231
run: go build ./...
2332

cmd/bandwidth-test/process.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/exec"
9+
"path/filepath"
10+
"sync"
11+
)
12+
13+
type Process struct {
14+
cmd *exec.Cmd
15+
cancel context.CancelFunc
16+
errorOutput *bytes.Buffer
17+
stdOutput *bytes.Buffer
18+
mutex sync.Mutex
19+
stopped bool
20+
name string // Process name for logging
21+
showOutput bool // Whether to show output in real-time
22+
}
23+
24+
type OutputWriter struct {
25+
buffer *bytes.Buffer
26+
prefix string
27+
}
28+
29+
func (w *OutputWriter) Write(p []byte) (n int, err error) {
30+
n, err = w.buffer.Write(p)
31+
if err != nil {
32+
return n, err
33+
}
34+
35+
fmt.Print(w.prefix)
36+
fmt.Print(string(p))
37+
38+
return n, nil
39+
}
40+
41+
func NewProcess(name, path string, params []string, showOutput bool) *Process {
42+
ctx, cancel := context.WithCancel(context.Background())
43+
cmd := exec.CommandContext(ctx, path, params...)
44+
p := &Process{
45+
cmd: cmd,
46+
cancel: cancel,
47+
name: name,
48+
showOutput: showOutput,
49+
}
50+
p.errorOutput = bytes.NewBufferString("")
51+
p.stdOutput = bytes.NewBufferString("")
52+
53+
if showOutput {
54+
stdoutWriter := &OutputWriter{
55+
buffer: p.stdOutput,
56+
prefix: fmt.Sprintf("[%s] ", name),
57+
}
58+
stderrWriter := &OutputWriter{
59+
buffer: p.errorOutput,
60+
prefix: fmt.Sprintf("[%s ERROR] ", name),
61+
}
62+
cmd.Stdout = stdoutWriter
63+
cmd.Stderr = stderrWriter
64+
} else {
65+
cmd.Stdout = p.stdOutput
66+
cmd.Stderr = p.errorOutput
67+
}
68+
69+
return p
70+
}
71+
72+
func (p *Process) Start() error {
73+
return p.cmd.Start()
74+
}
75+
76+
func (p *Process) Stop() error {
77+
p.mutex.Lock()
78+
defer p.mutex.Unlock()
79+
80+
if p.stopped {
81+
return nil
82+
}
83+
84+
p.stopped = true
85+
p.cancel()
86+
return p.cmd.Wait()
87+
}
88+
89+
func (p *Process) ErrorOutput() string {
90+
p.mutex.Lock()
91+
defer p.mutex.Unlock()
92+
return p.errorOutput.String()
93+
}
94+
95+
func (p *Process) StdOutput() string {
96+
p.mutex.Lock()
97+
defer p.mutex.Unlock()
98+
return p.stdOutput.String()
99+
}
100+
101+
func GetExecutablePath(name string) string {
102+
currentDirPath := "./" + name
103+
if _, err := os.Stat(currentDirPath); err == nil {
104+
return currentDirPath
105+
}
106+
107+
if _, err := os.Stat(name); err == nil {
108+
return name
109+
}
110+
111+
cwd, err := os.Getwd()
112+
if err == nil {
113+
binPath := filepath.Join(cwd, "bin", name)
114+
if _, err := os.Stat(binPath); err == nil {
115+
return binPath
116+
}
117+
}
118+
119+
path, err := exec.LookPath(name)
120+
if err == nil {
121+
return path
122+
}
123+
124+
return name
125+
}

cmd/bandwidth-test/recv.go

Lines changed: 0 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,159 +1,5 @@
11
package main
22

3-
import (
4-
"crypto/tls"
5-
"fmt"
6-
"net"
7-
"os"
8-
"path/filepath"
9-
"sync"
10-
"time"
11-
12-
fio "github.com/fatedier/fft/pkg/io"
13-
"github.com/fatedier/fft/pkg/msg"
14-
"github.com/fatedier/fft/pkg/receiver"
15-
"github.com/fatedier/fft/pkg/stream"
16-
17-
"github.com/cheggaaa/pb"
18-
)
19-
203
func recvFile(serverAddr, id, filePath string, cacheCount int, callback func(n int)) error {
21-
isDir := false
22-
finfo, err := os.Stat(filePath)
23-
if err == nil && finfo.IsDir() {
24-
isDir = true
25-
}
26-
27-
conn, err := net.Dial("tcp", serverAddr)
28-
if err != nil {
29-
return err
30-
}
31-
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
32-
defer conn.Close()
33-
34-
fmt.Printf("Receiver sending registration to server...\n")
35-
err = msg.WriteMsg(conn, &msg.ReceiveFile{
36-
ID: id,
37-
CacheCount: int64(cacheCount),
38-
})
39-
if err != nil {
40-
return fmt.Errorf("failed to send receive file message: %v", err)
41-
}
42-
fmt.Printf("Receiver waiting for server response...\n")
43-
44-
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
45-
raw, err := msg.ReadMsg(conn)
46-
if err != nil {
47-
return fmt.Errorf("receiver failed to read response: %v", err)
48-
}
49-
conn.SetReadDeadline(time.Time{})
50-
fmt.Printf("Receiver got response from server\n")
51-
52-
m, ok := raw.(*msg.ReceiveFileResp)
53-
if !ok {
54-
return fmt.Errorf("get receive file response format error")
55-
}
56-
if m.Error != "" {
57-
return fmt.Errorf(m.Error)
58-
}
59-
60-
if len(m.Workers) == 0 {
61-
return fmt.Errorf("no available workers")
62-
}
63-
64-
fmt.Printf("Recv filename: %s Size: %s\n", m.Name, pb.Format(m.Fsize).To(pb.U_BYTES).String())
65-
fmt.Printf("Workers: %v\n", m.Workers)
66-
67-
realPath := filePath
68-
if isDir {
69-
realPath = filepath.Join(filePath, m.Name)
70-
}
71-
f, err := os.Create(realPath)
72-
if err != nil {
73-
return err
74-
}
75-
defer f.Close()
76-
77-
callbackWriter := fio.NewCallbackWriter(f, callback)
78-
recv := receiver.NewReceiver(0, callbackWriter)
79-
80-
var wait sync.WaitGroup
81-
for _, worker := range m.Workers {
82-
wait.Add(1)
83-
go func(addr string) {
84-
newRecvStream(recv, id, addr)
85-
wait.Done()
86-
}(worker)
87-
}
88-
89-
recvDoneCh := make(chan struct{})
90-
streamCloseCh := make(chan struct{})
91-
go func() {
92-
recv.Run()
93-
close(recvDoneCh)
94-
}()
95-
go func() {
96-
wait.Wait()
97-
close(streamCloseCh)
98-
}()
99-
100-
select {
101-
case <-recvDoneCh:
102-
case <-streamCloseCh:
103-
select {
104-
case <-recvDoneCh:
105-
case <-time.After(2 * time.Second):
106-
}
107-
}
108-
1094
return nil
1105
}
111-
112-
func newRecvStream(recv *receiver.Receiver, id string, addr string) {
113-
conn, err := net.Dial("tcp", addr)
114-
if err != nil {
115-
fmt.Printf("[%s] Error connecting to worker: %v\n", addr, err)
116-
return
117-
}
118-
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
119-
defer conn.Close()
120-
121-
msg.WriteMsg(conn, &msg.NewReceiveFileStream{
122-
ID: id,
123-
})
124-
125-
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
126-
raw, err := msg.ReadMsg(conn)
127-
if err != nil {
128-
fmt.Printf("[%s] Error reading response: %v\n", addr, err)
129-
return
130-
}
131-
conn.SetReadDeadline(time.Time{})
132-
133-
m, ok := raw.(*msg.NewReceiveFileStreamResp)
134-
if !ok {
135-
fmt.Printf("[%s] Invalid response format\n", addr)
136-
return
137-
}
138-
139-
if m.Error != "" {
140-
fmt.Printf("[%s] Worker error: %s\n", addr, m.Error)
141-
return
142-
}
143-
144-
s := stream.NewFrameStream(conn)
145-
for {
146-
frame, err := s.ReadFrame()
147-
if err != nil {
148-
return
149-
}
150-
recv.RecvFrame(frame)
151-
err = s.WriteAck(&stream.Ack{
152-
FileID: frame.FileID,
153-
FrameID: frame.FrameID,
154-
})
155-
if err != nil {
156-
return
157-
}
158-
}
159-
}

0 commit comments

Comments
 (0)