Skip to content

Commit 473756c

Browse files
committed
feature: use tcp instead of udp (draft)
1 parent 793d717 commit 473756c

File tree

1 file changed

+162
-45
lines changed

1 file changed

+162
-45
lines changed

edge/cframe.go

Lines changed: 162 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package main
22

33
import (
4+
"encoding/binary"
45
"fmt"
6+
"io"
57
"net"
68
"strings"
9+
"sync"
10+
"time"
711

812
"github.com/ICKelin/cframe/codec"
913
"github.com/ICKelin/cframe/edge/vpc"
1014
log "github.com/ICKelin/cframe/pkg/logs"
15+
"github.com/xtaci/smux"
1116
)
1217

1318
type Server struct {
@@ -20,7 +25,8 @@ type Server struct {
2025
laddr string
2126

2227
// peers connection
23-
peerConns map[string]*peerConn
28+
peerConnMu sync.RWMutex
29+
peerConns map[string]*peerConn
2430

2531
// tun device wrap
2632
iface *Interface
@@ -33,6 +39,7 @@ type peerConn struct {
3339
// conn *net.UDPConn
3440
// conn *kcp.UDPSession
3541
// conn net.Conn
42+
conn *smux.Stream
3643
cidr string
3744
}
3845

@@ -56,47 +63,63 @@ func (s *Server) SetVPCInstance(vpcInstance vpc.IVPC) {
5663
}
5764

5865
func (s *Server) ListenAndServe() error {
59-
laddr, err := net.ResolveUDPAddr("udp", s.laddr)
66+
go s.readLocal()
67+
lis, err := net.Listen("tcp", s.laddr)
6068
if err != nil {
6169
return err
6270
}
63-
lconn, err := net.ListenUDP("udp", laddr)
64-
if err != nil {
65-
return err
66-
}
67-
defer lconn.Close()
71+
defer lis.Close()
6872

69-
go s.readLocal(lconn)
70-
s.readRemote(lconn)
71-
return nil
73+
for {
74+
rconn, err := lis.Accept()
75+
if err != nil {
76+
return err
77+
}
78+
go s.handleConn(rconn)
79+
}
7280
}
7381

74-
func (s *Server) readRemote(lconn *net.UDPConn) {
75-
rawbytes := make([]byte, 1024*64)
76-
key := s.key
77-
klen := len(key)
82+
func (s *Server) handleConn(conn net.Conn) {
83+
defer conn.Close()
84+
sess, err := smux.Server(conn, nil)
85+
if err != nil {
86+
log.Error("run smux server fail: %v", err)
87+
return
88+
}
89+
defer sess.Close()
90+
7891
for {
79-
nr, _, err := lconn.ReadFromUDP(rawbytes)
92+
stream, err := sess.AcceptStream()
8093
if err != nil {
81-
log.Error("read full fail: %v", err)
82-
continue
94+
log.Error("accept stream fail: %v", err)
95+
break
8396
}
97+
log.Info("accept stream from: %v", stream.RemoteAddr())
98+
go s.handleStream(stream)
99+
}
100+
}
84101

85-
buf := rawbytes[:nr]
102+
func (s *Server) handleStream(stream *smux.Stream) {
103+
defer stream.Close()
86104

87-
if nr < klen {
88-
log.Error("pkt to small")
89-
continue
105+
plen := make([]byte, 2)
106+
for {
107+
_, err := io.ReadFull(stream, plen)
108+
if err != nil {
109+
log.Error("read packet len fail: %v", err)
110+
break
90111
}
91112

92-
// decode key
93-
rkey := buf[:klen]
94-
if string(rkey) != key {
95-
log.Error("access forbidden!!")
96-
continue
113+
size := binary.BigEndian.Uint16(plen)
114+
115+
body := make([]byte, size)
116+
nr, err := io.ReadFull(stream, body)
117+
if err != nil {
118+
log.Error("read packet len fail: %v", err)
119+
break
97120
}
98121

99-
pkt := buf[klen:nr]
122+
pkt := body[:nr]
100123
p := Packet(pkt)
101124
if p.Invalid() {
102125
log.Error("invalid ipv4 packet")
@@ -112,7 +135,7 @@ func (s *Server) readRemote(lconn *net.UDPConn) {
112135
}
113136
}
114137

115-
func (s *Server) readLocal(sock *net.UDPConn) {
138+
func (s *Server) readLocal() {
116139
for {
117140
pkt, err := s.iface.Read()
118141
if err != nil {
@@ -137,24 +160,29 @@ func (s *Server) readLocal(sock *net.UDPConn) {
137160
continue
138161
}
139162

140-
raddr, err := net.ResolveUDPAddr("udp", peer)
163+
plen := make([]byte, 2)
164+
binary.BigEndian.PutUint16(plen, uint16(len(pkt)))
165+
buf := make([]byte, 0, len(pkt)+2)
166+
buf = append(buf, plen...)
167+
buf = append(buf, pkt...)
168+
169+
nw, err := peer.Write(buf)
141170
if err != nil {
142-
log.Error("parse %s fail: %v", peer, err)
171+
log.Error("write to peer %s fail %v", dst, err)
143172
continue
144173
}
145174

146-
// encode key
147-
buf := make([]byte, 0, len(pkt)+len(s.key))
148-
buf = append(buf, []byte(s.key)...)
149-
buf = append(buf, pkt...)
150-
_, e := sock.WriteToUDP(buf, raddr)
151-
if e != nil {
152-
log.Error("%v", e)
175+
if nw != len(buf) {
176+
log.Error("stream write not full")
177+
continue
153178
}
154179
}
155180
}
156181

157-
func (s *Server) route(dst string) (string, error) {
182+
func (s *Server) route(dst string) (*smux.Stream, error) {
183+
s.peerConnMu.RLock()
184+
defer s.peerConnMu.RUnlock()
185+
158186
for _, p := range s.peerConns {
159187
_, ipnet, err := net.ParseCIDR(p.cidr)
160188
if err != nil {
@@ -182,16 +210,40 @@ func (s *Server) route(dst string) (string, error) {
182210
continue
183211
}
184212

185-
return p.addr, nil
213+
return p.conn, nil
186214
}
187215
}
188216

189-
return "", fmt.Errorf("no route")
217+
return nil, fmt.Errorf("no route")
190218
}
191219

192220
func (s *Server) addRoute(peer *codec.Edge) error {
193221
log.Info("adding peer: %v", peer)
194222

223+
var sess *smux.Session
224+
var stream *smux.Stream
225+
for {
226+
lconn, err := net.Dial("tcp", peer.ListenAddr)
227+
if err != nil {
228+
log.Error("dial peer fail: %v", err)
229+
time.Sleep(time.Second * 3)
230+
continue
231+
}
232+
sess, err = smux.Client(lconn, nil)
233+
if err != nil {
234+
log.Error("smux client fail: %v", err)
235+
time.Sleep(time.Second * 3)
236+
continue
237+
}
238+
stream, err = sess.OpenStream()
239+
if err != nil {
240+
log.Error("smux open stream fail: %v", err)
241+
time.Sleep(time.Second * 3)
242+
continue
243+
}
244+
break
245+
}
246+
195247
ipmask := strings.Split(peer.Cidr, "/")
196248
cidrtype := "-net"
197249
if len(ipmask) == 1 || ipmask[1] == "32" {
@@ -228,16 +280,74 @@ func (s *Server) addRoute(peer *codec.Edge) error {
228280
peer.Cidr = fmt.Sprintf("%s/32", ipmask[0])
229281
}
230282

283+
s.peerConnMu.Lock()
284+
defer s.peerConnMu.Unlock()
285+
231286
s.peerConns[peer.Cidr] = &peerConn{
232287
addr: peer.ListenAddr,
233288
cidr: peer.Cidr,
289+
conn: stream,
234290
}
235291

292+
go s.deadlineCheck(peer, sess)
236293
log.Info("added peer %v OK", peer)
237294
log.Info("==========================\n")
238295
return nil
239296
}
240297

298+
func (s *Server) deadlineCheck(peer *codec.Edge, sess *smux.Session) {
299+
tick := time.NewTicker(time.Second * 5)
300+
for range tick.C {
301+
if !sess.IsClosed() {
302+
continue
303+
}
304+
305+
log.Info("receive dead channel for peer %s", peer.ListenAddr)
306+
for {
307+
s.peerConnMu.Lock()
308+
_, ok := s.peerConns[peer.Cidr]
309+
// peer edge has been removed
310+
if !ok {
311+
log.Warn("peer %s has not session", peer.Cidr)
312+
break
313+
}
314+
s.peerConnMu.Unlock()
315+
316+
// reconnect
317+
lconn, err := net.Dial("tcp", peer.ListenAddr)
318+
if err != nil {
319+
log.Error("dial peer fail: %v", err)
320+
time.Sleep(time.Second * 3)
321+
continue
322+
}
323+
324+
smuxSess, err := smux.Client(lconn, nil)
325+
if err != nil {
326+
log.Error("smux client fail: %v", err)
327+
time.Sleep(time.Second * 3)
328+
continue
329+
}
330+
331+
stream, err := smuxSess.OpenStream()
332+
if err != nil {
333+
log.Error("smux open stream fail: %v", err)
334+
time.Sleep(time.Second * 3)
335+
continue
336+
}
337+
338+
s.peerConnMu.Lock()
339+
s.peerConns[peer.Cidr] = &peerConn{
340+
addr: peer.ListenAddr,
341+
cidr: peer.Cidr,
342+
conn: stream,
343+
}
344+
s.peerConnMu.Unlock()
345+
sess = smuxSess
346+
break
347+
}
348+
}
349+
}
350+
241351
func (s *Server) delRoute(peer *codec.Edge) {
242352
log.Info("del peer: %v", peer)
243353
ipmask := strings.Split(peer.Cidr, "/")
@@ -254,34 +364,41 @@ func (s *Server) delRoute(peer *codec.Edge) {
254364
peer.Cidr = fmt.Sprintf("%s/32", ipmask[0])
255365
}
256366

367+
s.peerConnMu.Lock()
368+
defer s.peerConnMu.Unlock()
369+
peerConn, ok := s.peerConns[peer.Cidr]
370+
if ok {
371+
peerConn.conn.Close()
372+
}
373+
257374
delete(s.peerConns, peer.Cidr)
258375
log.Info("del peer %s OK", peer)
259376
log.Info("==========================\n")
260377
}
261378

262379
func (s *Server) AddPeers(peers []*codec.Edge) {
263380
for _, p := range peers {
264-
s.addRoute(p)
381+
go s.addRoute(p)
265382
}
266383
}
267384

268385
func (s *Server) AddPeer(peer *codec.Edge) {
269-
s.addRoute(peer)
386+
go s.addRoute(peer)
270387
}
271388

272389
func (s *Server) DelPeer(peer *codec.Edge) {
273-
s.delRoute(peer)
390+
go s.delRoute(peer)
274391
}
275392

276393
func (s *Server) AddRoute(msg *codec.AddRouteMsg) {
277-
s.addRoute(&codec.Edge{
394+
go s.addRoute(&codec.Edge{
278395
Cidr: msg.Cidr,
279396
ListenAddr: msg.Nexthop,
280397
})
281398
}
282399

283400
func (s *Server) DelRoute(msg *codec.DelRouteMsg) {
284-
s.delRoute(&codec.Edge{
401+
go s.delRoute(&codec.Edge{
285402
Cidr: msg.Cidr,
286403
ListenAddr: msg.Nexthop,
287404
})

0 commit comments

Comments
 (0)