Skip to content

Commit b18901e

Browse files
authored
Merge pull request #2 from fatedier/dev
bump version v0.0.1
2 parents c6e55a2 + d8eb92f commit b18901e

File tree

33 files changed

+2101
-2
lines changed

33 files changed

+2101
-2
lines changed

.gitignore

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,9 @@
1010
# Output of the go coverage tool, specifically when used with LiteIDE
1111
*.out
1212

13-
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
14-
.glide/
13+
# Self
14+
bin/
15+
packages/
16+
17+
# Cache
18+
*.swp

Makefile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
all: fmt build
2+
3+
build: fft fftw ffts
4+
5+
fmt:
6+
go fmt ./...
7+
8+
fft:
9+
go build -ldflags "-s -w" -o bin/fft ./cmd/fft
10+
11+
fftw:
12+
go build -ldflags "-s -w" -o bin/fftw ./cmd/fftw
13+
14+
ffts:
15+
go build -ldflags "-s -w" -o bin/ffts ./cmd/ffts

Makefile.cross-compiles

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
export PATH := $(GOPATH)/bin:$(PATH)
2+
LDFLAGS := -s -w
3+
4+
all: build
5+
6+
build: app
7+
8+
app:
9+
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fft_darwin_amd64 ./cmd/fft
10+
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fftw_darwin_amd64 ./cmd/fftw
11+
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./ffts_darwin_amd64 ./cmd/ffts
12+
env CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fft_freebsd_amd64 ./cmd/fft
13+
env CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fftw_freebsd_amd64 ./cmd/fftw
14+
env CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./ffts_freebsd_amd64 ./cmd/ffts
15+
env CGO_ENABLED=0 GOOS=linux GOARCH=386 go build -ldflags "$(LDFLAGS)" -o ./fft_linux_386 ./cmd/fft
16+
env CGO_ENABLED=0 GOOS=linux GOARCH=386 go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_386 ./cmd/fftw
17+
env CGO_ENABLED=0 GOOS=linux GOARCH=386 go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_386 ./cmd/ffts
18+
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fft_linux_amd64 ./cmd/fft
19+
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_amd64 ./cmd/fftw
20+
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_amd64 ./cmd/ffts
21+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -ldflags "$(LDFLAGS)" -o ./fft_linux_arm ./cmd/fft
22+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_arm ./cmd/fftw
23+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_arm ./cmd/ffts
24+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o ./fft_linux_arm64 ./cmd/fft
25+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_arm64 ./cmd/fftw
26+
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_arm64 ./cmd/ffts
27+
env CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fft_windows_amd64.exe ./cmd/fft
28+
env CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./fftw_windows_amd64.exe ./cmd/fftw
29+
env CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./ffts_windows_amd64.exe ./cmd/ffts
30+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64 go build -ldflags "$(LDFLAGS)" -o ./fft_linux_mips64 ./cmd/fft
31+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64 go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_mips64 ./cmd/fftw
32+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64 go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_mips64 ./cmd/ffts
33+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64le go build -ldflags "$(LDFLAGS)" -o ./fft_linux_mips64le ./cmd/fft
34+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64le go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_mips64le ./cmd/fftw
35+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips64le go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_mips64le ./cmd/ffts
36+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./fft_linux_mips ./cmd/fft
37+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_mips ./cmd/fftw
38+
env CGO_ENABLED=0 GOOS=linux GOARCH=mips GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_mips ./cmd/ffts
39+
env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./fft_linux_mipsle ./cmd/fft
40+
env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./fftw_linux_mipsle ./cmd/fftw
41+
env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle GOMIPS=softfloat go build -ldflags "$(LDFLAGS)" -o ./ffts_linux_mipsle ./cmd/ffts

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,37 @@
11
# fft
2+
3+
fft 是一个分布式的文件传输工具,可以同时利用多个中转节点来并行传输文件。
4+
5+
## 开发状态
6+
7+
目前处于早期开发阶段,功能不完善,仅用于测试使用。
8+
9+
master 分支用于发布稳定版本,dev 分支用于开发,您可以尝试下载最新的 release 版本进行测试。
10+
11+
**目前的交互协议可能随时改变,不保证向后兼容,升级新版本时需要注意公告说明。**
12+
13+
## 使用示例
14+
15+
* ffts: server 控制节点,部署一个。
16+
* fftw: worker 节点,负责中转流量,部署任意多个,更多的 worker 节点可以提高传输文件的速度。
17+
* fft: 客户端,用于发送和接收文件。
18+
19+
每一个程序都可以通过 `-h` 来查看使用参数的说明。
20+
21+
ffts 和 fftw 需要部署在有公网 IP 的机器上,且开放对应的端口供 fft 访问。
22+
23+
### 发送文件
24+
25+
`./fft -i 123 -l ./filename`
26+
27+
`-i 123` 指定这次传输请求的 ID,需要是一个和其他人不重复的自定义值,之后将在这个 ID 通知接收方,接收方通过此 ID 来接收文件。
28+
29+
`-l ./filename` 指定需要传输的本地文件路径。
30+
31+
### 接收文件
32+
33+
`./fft -i 123 -t ./`
34+
35+
`-i 123` 指定这次接收传输请求的 ID。
36+
37+
`-t ./` 指定保存文件到本地的路径,如果是目录,则保存发送方的文件名到指定目录,否则会创建一个新的文件。

client/recv.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"os"
7+
"path/filepath"
8+
"time"
9+
10+
"github.com/fatedier/fft/pkg/msg"
11+
"github.com/fatedier/fft/pkg/receiver"
12+
"github.com/fatedier/fft/pkg/stream"
13+
)
14+
15+
func (svc *Service) recvFile(id string, filePath string) error {
16+
isDir := false
17+
finfo, err := os.Stat(filePath)
18+
if err == nil && finfo.IsDir() {
19+
isDir = true
20+
}
21+
22+
conn, err := net.Dial("tcp", svc.serverAddr)
23+
if err != nil {
24+
return err
25+
}
26+
defer conn.Close()
27+
28+
msg.WriteMsg(conn, &msg.ReceiveFile{
29+
ID: id,
30+
})
31+
32+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
33+
raw, err := msg.ReadMsg(conn)
34+
if err != nil {
35+
return err
36+
}
37+
conn.SetReadDeadline(time.Time{})
38+
39+
m, ok := raw.(*msg.ReceiveFileResp)
40+
if !ok {
41+
return fmt.Errorf("get send file response format error")
42+
}
43+
if m.Error != "" {
44+
return fmt.Errorf(m.Error)
45+
}
46+
47+
if len(m.Workers) == 0 {
48+
return fmt.Errorf("no available workers")
49+
}
50+
fmt.Printf("Recv filename: %s\n", m.Name)
51+
if svc.debugMode {
52+
fmt.Printf("Workers: %v\n", m.Workers)
53+
}
54+
55+
realPath := filePath
56+
if isDir {
57+
realPath = filepath.Join(filePath, m.Name)
58+
}
59+
f, err := os.Create(realPath)
60+
if err != nil {
61+
return err
62+
}
63+
64+
recv := receiver.NewReceiver(0, f)
65+
for _, worker := range m.Workers {
66+
addr := worker
67+
go newRecvStream(recv, id, addr, svc.debugMode)
68+
}
69+
recv.Run()
70+
return nil
71+
}
72+
73+
func newRecvStream(recv *receiver.Receiver, id string, addr string, debugMode bool) {
74+
first := true
75+
for {
76+
if !first {
77+
time.Sleep(3 * time.Second)
78+
} else {
79+
first = false
80+
}
81+
82+
conn, err := net.Dial("tcp", addr)
83+
if err != nil {
84+
log(debugMode, "[%s] %v", addr, err)
85+
return
86+
}
87+
88+
msg.WriteMsg(conn, &msg.NewReceiveFileStream{
89+
ID: id,
90+
})
91+
92+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
93+
raw, err := msg.ReadMsg(conn)
94+
if err != nil {
95+
conn.Close()
96+
log(debugMode, "[%s] %v", addr, err)
97+
continue
98+
}
99+
conn.SetReadDeadline(time.Time{})
100+
m, ok := raw.(*msg.NewReceiveFileStreamResp)
101+
if !ok {
102+
conn.Close()
103+
log(debugMode, "[%s] read NewReceiveFileStreamResp format error", addr)
104+
continue
105+
}
106+
107+
if m.Error != "" {
108+
conn.Close()
109+
log(debugMode, "[%s] new recv file stream error: %s", addr, m.Error)
110+
continue
111+
}
112+
fmt.Printf("connect to worker [%s] success\n", addr)
113+
114+
s := stream.NewFrameStream(conn)
115+
for {
116+
frame, err := s.ReadFrame()
117+
if err != nil {
118+
return
119+
}
120+
recv.RecvFrame(frame)
121+
}
122+
}
123+
}

client/send.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"os"
7+
"sync"
8+
"time"
9+
10+
"github.com/fatedier/fft/pkg/msg"
11+
"github.com/fatedier/fft/pkg/sender"
12+
"github.com/fatedier/fft/pkg/stream"
13+
)
14+
15+
func (svc *Service) sendFile(id string, filePath string) error {
16+
conn, err := net.Dial("tcp", svc.serverAddr)
17+
if err != nil {
18+
return err
19+
}
20+
defer conn.Close()
21+
22+
f, err := os.Open(filePath)
23+
if err != nil {
24+
return err
25+
}
26+
defer f.Close()
27+
28+
finfo, err := f.Stat()
29+
if err != nil {
30+
return err
31+
}
32+
if finfo.IsDir() {
33+
return fmt.Errorf("send file can't be a directory")
34+
}
35+
36+
msg.WriteMsg(conn, &msg.SendFile{
37+
ID: id,
38+
Name: finfo.Name(),
39+
})
40+
41+
conn.SetReadDeadline(time.Now().Add(120 * time.Second))
42+
raw, err := msg.ReadMsg(conn)
43+
if err != nil {
44+
return err
45+
}
46+
conn.SetReadDeadline(time.Time{})
47+
48+
m, ok := raw.(*msg.SendFileResp)
49+
if !ok {
50+
return fmt.Errorf("get send file response format error")
51+
}
52+
if m.Error != "" {
53+
return fmt.Errorf(m.Error)
54+
}
55+
56+
if len(m.Workers) == 0 {
57+
return fmt.Errorf("no available workers")
58+
}
59+
fmt.Printf("ID: %s\n", m.ID)
60+
if svc.debugMode {
61+
fmt.Printf("Workers: %v\n", m.Workers)
62+
}
63+
64+
var wait sync.WaitGroup
65+
doneCh := make(chan struct{})
66+
s := sender.NewSender(0, f)
67+
68+
for _, worker := range m.Workers {
69+
wait.Add(1)
70+
go func(addr string) {
71+
newSendStream(doneCh, s, m.ID, addr, svc.debugMode)
72+
wait.Done()
73+
}(worker)
74+
}
75+
s.Run()
76+
close(doneCh)
77+
wait.Wait()
78+
return nil
79+
}
80+
81+
func newSendStream(doneCh chan struct{}, s *sender.Sender, id string, addr string, debugMode bool) {
82+
first := true
83+
for {
84+
select {
85+
case <-doneCh:
86+
return
87+
default:
88+
}
89+
90+
if !first {
91+
time.Sleep(3 * time.Second)
92+
} else {
93+
first = false
94+
}
95+
96+
conn, err := net.Dial("tcp", addr)
97+
if err != nil {
98+
log(debugMode, "[%s] %v", addr, err)
99+
continue
100+
}
101+
102+
msg.WriteMsg(conn, &msg.NewSendFileStream{
103+
ID: id,
104+
})
105+
106+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
107+
raw, err := msg.ReadMsg(conn)
108+
if err != nil {
109+
conn.Close()
110+
log(debugMode, "[%s] %v", addr, err)
111+
continue
112+
}
113+
conn.SetReadDeadline(time.Time{})
114+
m, ok := raw.(*msg.NewSendFileStreamResp)
115+
if !ok {
116+
conn.Close()
117+
log(debugMode, "[%s] read NewSendFileStreamResp format error", addr)
118+
continue
119+
}
120+
121+
if m.Error != "" {
122+
conn.Close()
123+
log(debugMode, "[%s] new send file stream error: %s", addr, m.Error)
124+
continue
125+
}
126+
fmt.Printf("connect to worker [%s] success\n", addr)
127+
128+
s.HandleStream(stream.NewFrameStream(conn))
129+
break
130+
}
131+
}

0 commit comments

Comments
 (0)