Skip to content

Commit 8bf11cb

Browse files
committed
Support the rate-limiting
1 parent 6857912 commit 8bf11cb

File tree

7 files changed

+88
-29
lines changed

7 files changed

+88
-29
lines changed

.client.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
{
2-
"server_ip" : "127.0.0.1",
3-
"server_port": 1234
4-
}
2+
"ServerIP" : "127.0.0.1",
3+
"ServerPort": 1234,
4+
"LimitRate" : [
5+
"5001:512"
6+
]
7+
}

.server.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"ServerPort" : 1234,
44
"MappingPort" : [
55
"5000:22",
6-
"5001:3306"
6+
"5002:5001"
77
]
88
}

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
## 介绍
66

7-
通过在一台具有公网 IP 的小型服务器(阿里云轻量)部署 `spleen`, 可以随时随地访问你的家庭服务器/内网主机(闲置笔记本)的 `TCP` 服务, 例如 `SSH`, `HTTP/S` 等.
7+
通过在一台具有公网 IP 的小型服务器(阿里云轻量)部署 `spleen`, 可以随时随地访问你的家庭服务器/内网主机(闲置笔记本)的 `TCP` 服务, 例如 `SSH`, `HTTP/S` 等. `spleen`支持针对内网服务器指定服务进行流速限制, 避免打爆公网小水管.
88

99
例如 `SSH` 服务, 在顺利部署 `spleen` 的客户端和服务端后, 通过公网服务器(假定 IP 为`1.1.1.1`), 可以直接通过端口映射来连接你的家庭服务器/内网主机:
1010

@@ -64,6 +64,14 @@
6464
"server_port": 1234 # 公网服务器监听端口
6565
}
6666

67+
{
68+
"ServerIP" : "127.0.0.1", # 公网服务器 IP
69+
"ServerPort": 1234, # 公网服务器监听端口
70+
"LimitRate" : [
71+
"5001:512" # 指定内网服务的流速限制, 例如指定内网 5001 端口的流速不要超过 512 KB/s, 单位是 KB, 默认不限制, 0 值即为不限制.
72+
]
73+
}
74+
6775
# 启动
6876
> ./spleen-client -c .client.json # 默认预留 10 个活跃连接
6977
2022/01/12 18:55:19 Connect to the server 127.0.0.1:1234 successful.

client/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ import (
99
"github.com/leviathan1995/spleen/client/util"
1010
)
1111

12+
type Configuration struct {
13+
ServerIP string
14+
ServerPort int
15+
LimitRate []string
16+
}
17+
1218
func main() {
1319
var conf string
14-
var config map[string]interface{}
1520
flag.StringVar(&conf, "c", ".client.json", "The client configuration.")
1621
flag.Parse()
1722

@@ -20,12 +25,11 @@ func main() {
2025
log.Fatalf("Reading %s failed.", conf)
2126
}
2227

28+
var config Configuration
2329
if err := json.Unmarshal(bytes, &config); err != nil {
2430
log.Fatalf("Parsing %s failed.", conf)
2531
}
2632

27-
serverIP := config["server_ip"].(string)
28-
serverPort := int(config["server_port"].(float64))
29-
c := client.NewClient(serverIP, serverPort)
33+
c := client.NewClient(config.ServerIP, config.ServerPort, config.LimitRate)
3034
c.Run()
3135
}

client/util/client.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,53 @@ import (
55
"log"
66
"net"
77
"strconv"
8+
"strings"
89
"time"
910

1011
"github.com/leviathan1995/spleen/service"
1112
)
1213

1314
type client struct {
1415
*service.Service
15-
srvAddr *net.TCPAddr
16+
srvAddr *net.TCPAddr
17+
limitRate map[int64]int64
1618
}
1719

18-
func NewClient(serverIP string, serverPort int) *client {
20+
func NewClient(serverIP string, serverPort int, limitRate []string) *client {
1921
srvAddr, _ := net.ResolveTCPAddr("tcp", serverIP+":"+strconv.Itoa(serverPort))
22+
limits := make(map[int64]int64)
23+
for _, rates := range limitRate {
24+
rate := strings.Split(rates, ":")
25+
port, _ := strconv.Atoi(rate[0])
26+
speed, _ := strconv.Atoi(rate[1])
27+
limits[int64(port)] = int64(speed)
28+
}
29+
2030
return &client{
2131
&service.Service{},
2232
srvAddr,
33+
limits,
2334
}
2435
}
2536

2637
func (c *client) Run() {
38+
for port, rate := range c.limitRate {
39+
log.Printf("The limiting rate configurations Port: %d, Speed: %d KB/s", port, rate)
40+
}
41+
2742
for {
2843
if len(connectionPool) < 10 {
29-
srvConn, err := c.DialSrv()
30-
if err != nil {
31-
log.Printf("Connect to the server %s:%d failed: %s. \n", c.srvAddr.IP.String(), c.srvAddr.Port, err)
32-
continue
44+
for i := len(connectionPool); i < 10; i++ {
45+
srvConn, err := c.DialSrv()
46+
if err != nil {
47+
log.Printf("Connect to the server %s:%d failed: %s. \n", c.srvAddr.IP.String(), c.srvAddr.Port, err)
48+
continue
49+
}
50+
log.Printf("Connect to the server %s:%d successful.\n", c.srvAddr.IP.String(), c.srvAddr.Port)
51+
srvConn.SetKeepAlive(true)
52+
connectionPool <- srvConn
53+
go c.handleConn(srvConn)
3354
}
34-
log.Printf("Connect to the server %s:%d successful.\n", c.srvAddr.IP.String(), c.srvAddr.Port)
35-
srvConn.SetKeepAlive(true)
36-
connectionPool <- srvConn
37-
go c.handleConn(srvConn)
3855
} else {
3956
time.Sleep(1 * time.Second)
4057
}
@@ -75,15 +92,22 @@ func (c *client) handleConn(srvConn *net.TCPConn) {
7592
} else {
7693
log.Printf("Connect to the destination address localhost:%d successful.", dstAddr.Port)
7794
}
78-
7995
defer dstConn.Close()
96+
97+
dstConn.SetKeepAlive(true)
8098
_ = dstConn.SetLinger(0)
8199

100+
var limitRate int64
101+
102+
if rate, found := c.limitRate[port]; found {
103+
limitRate = rate * 1024 /* bytes */
104+
}
105+
82106
go func() {
83-
errTransfer := c.TransferToTCP(dstConn, srvConn)
107+
errTransfer := c.TransferToTCP(dstConn, srvConn, limitRate)
84108
if errTransfer != nil {
85109
return
86110
}
87111
}()
88-
err = c.TransferToTCP(srvConn, dstConn)
112+
err = c.TransferToTCP(srvConn, dstConn, 0)
89113
}

server/util/server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,15 @@ func (s *server) handleConn(cliConn *net.TCPConn, transferPort uint64) {
9898
log.Print("Make a successful connection between the user and the intranet server.")
9999
/* Transfer network packets. */
100100
go func() {
101-
errTransfer := s.TransferToTCP(cliConn, intranetConn)
101+
errTransfer := s.TransferToTCP(cliConn, intranetConn, 0)
102102
if errTransfer != nil {
103103
intranetConn.Close()
104104
return
105105
}
106106
}()
107-
err = s.TransferToTCP(intranetConn, cliConn)
107+
err = s.TransferToTCP(intranetConn, cliConn, 0)
108+
return
108109
}
109110
}
111+
log.Println("Currently, Do not have any active connection from the intranet server.")
110112
}

service/service.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service
22

33
import (
44
"net"
5+
"time"
56
)
67

78
const BUFFERSIZE = 1024 * 16
@@ -24,18 +25,35 @@ func (s *Service) TCPWrite(conn *net.TCPConn, buf []byte) error {
2425
return nil
2526
}
2627

27-
func (s *Service) TransferToTCP(cliConn net.Conn, dstConn *net.TCPConn) error {
28+
func (s *Service) TransferToTCP(cliConn net.Conn, dstConn *net.TCPConn, limitRate int64) error {
29+
var totalRead, lastTime int64
2830
buf := make([]byte, BUFFERSIZE)
31+
2932
for {
30-
nRead, err := cliConn.Read(buf)
31-
if err != nil {
32-
return err
33+
nRead, errRead := cliConn.Read(buf)
34+
if errRead != nil {
35+
return errRead
3336
}
3437
if nRead > 0 {
3538
errWrite := s.TCPWrite(dstConn, buf[0:nRead])
36-
if err != nil {
39+
if errWrite != nil {
3740
return errWrite
3841
}
42+
if limitRate != 0 {
43+
if totalRead > limitRate && ((time.Now().UnixNano()/1e6)-lastTime) >= 1000 {
44+
/* Reset the timeout. */
45+
totalRead = 0
46+
lastTime = time.Now().UnixNano() / 1e6 /* The millisecond */
47+
} else if totalRead > limitRate && ((time.Now().UnixNano()/1e6)-lastTime) < 1000 {
48+
/* Try to limit the rate of network. */
49+
timeout := 1000 - ((time.Now().UnixNano() / 1e6) - lastTime)
50+
time.Sleep(time.Duration(timeout) * time.Millisecond)
51+
totalRead = 0
52+
lastTime = time.Now().UnixNano() / 1e6 /* The millisecond */
53+
} else {
54+
totalRead += int64(nRead)
55+
}
56+
}
3957
}
4058
}
4159
}

0 commit comments

Comments
 (0)