Skip to content

Commit 2074809

Browse files
author
Musixal
committed
[fix] issues related to tcp-keepalive and heartbeat
1 parent 716b04c commit 2074809

File tree

7 files changed

+27
-22
lines changed

7 files changed

+27
-22
lines changed

internal/client/client.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ func (c *Client) Start() {
5050
tcpConfig := &transport.TcpConfig{
5151
RemoteAddr: c.config.RemoteAddr,
5252
Nodelay: c.config.Nodelay,
53-
KeepAlive: time.Duration(c.config.Keepalive),
54-
RetryInterval: time.Duration(c.config.RetryInterval),
53+
KeepAlive: time.Duration(c.config.Keepalive) * time.Second,
54+
RetryInterval: time.Duration(c.config.RetryInterval) * time.Second,
5555
Token: c.config.Token,
5656
Forwarder: c.forwarderReader(c.config.Forwarder),
5757
}
@@ -62,8 +62,8 @@ func (c *Client) Start() {
6262
tcpMuxConfig := &transport.TcpMuxConfig{
6363
RemoteAddr: c.config.RemoteAddr,
6464
Nodelay: c.config.Nodelay,
65-
KeepAlive: time.Duration(c.config.Keepalive),
66-
RetryInterval: time.Duration(c.config.RetryInterval),
65+
KeepAlive: time.Duration(c.config.Keepalive) * time.Second,
66+
RetryInterval: time.Duration(c.config.RetryInterval) * time.Second,
6767
Token: c.config.Token,
6868
MuxSession: c.config.MuxSession,
6969

@@ -76,8 +76,8 @@ func (c *Client) Start() {
7676
WsConfig := &transport.WsConfig{
7777
RemoteAddr: c.config.RemoteAddr,
7878
Nodelay: c.config.Nodelay,
79-
KeepAlive: time.Duration(c.config.Keepalive),
80-
RetryInterval: time.Duration(c.config.RetryInterval),
79+
KeepAlive: time.Duration(c.config.Keepalive) * time.Second,
80+
RetryInterval: time.Duration(c.config.RetryInterval) * time.Second,
8181
Token: c.config.Token,
8282
Forwarder: c.forwarderReader(c.config.Forwarder),
8383
}

internal/client/transport/tcp.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *TcpTransport) ChannelDialer() {
8787
tunnelTCPConn, err := c.tcpDialer(c.config.RemoteAddr, c.config.Nodelay)
8888
if err != nil {
8989
c.logger.Error(err)
90-
time.Sleep(c.config.RetryInterval * time.Second)
90+
time.Sleep(c.config.RetryInterval)
9191
continue
9292
}
9393

@@ -111,7 +111,7 @@ func (c *TcpTransport) ChannelDialer() {
111111
c.logger.Error("unable to establish new control channel")
112112
}
113113
tunnelTCPConn.Close() // Close connection on error or timeout
114-
time.Sleep(c.config.RetryInterval* time.Second)
114+
time.Sleep(c.config.RetryInterval)
115115
continue
116116
}
117117

@@ -125,7 +125,7 @@ func (c *TcpTransport) ChannelDialer() {
125125
} else {
126126
c.logger.Error("invalid token received, retrying...")
127127
tunnelTCPConn.Close() // Close connection if the token is invalid
128-
time.Sleep(c.config.RetryInterval * time.Second)
128+
time.Sleep(c.config.RetryInterval)
129129
continue
130130
}
131131
}

internal/client/transport/tcpmux.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func NewMuxClient(parentCtx context.Context, config *TcpMuxConfig, logger *logru
4444
cancel: cancel,
4545
logger: logger,
4646
smuxSession: make([]*smux.Session, config.MuxSession),
47+
timeout: 5 * time.Second, // Default timeout
4748
}
4849

4950
return client
@@ -87,7 +88,7 @@ func (c *TcpMuxTransport) MuxDialer() {
8788
tunnelTCPConn, err := c.tcpDialer(c.config.RemoteAddr, c.config.Nodelay)
8889
if err != nil {
8990
c.logger.Error("failed to dial tunnel server: ", err)
90-
time.Sleep(c.config.RetryInterval * time.Second)
91+
time.Sleep(c.config.RetryInterval)
9192
continue
9293
}
9394

internal/client/transport/ws.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (c *WsTransport) ChannelDialer() {
9191
tunnelWSConn, err := c.wsDialer(c.config.RemoteAddr, "/channel")
9292
if err != nil {
9393
c.logger.Error(err)
94-
time.Sleep(c.config.RetryInterval * time.Second)
94+
time.Sleep(c.config.RetryInterval)
9595
continue
9696
}
9797
c.controlChannel = tunnelWSConn

internal/server/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (s *Server) Start() {
4343
tcpConfig := &transport.TcpConfig{
4444
BindAddr: s.config.BindAddr,
4545
Nodelay: s.config.Nodelay,
46-
KeepAlive: time.Duration(s.config.Keepalive),
46+
KeepAlive: time.Duration(s.config.Keepalive) * time.Second,
4747
ConnectionPool: s.config.ConnectionPool,
4848
Token: s.config.Token,
4949
ChannelSize: s.config.ChannelSize,
@@ -57,7 +57,7 @@ func (s *Server) Start() {
5757
tcpMuxConfig := &transport.TcpMuxConfig{
5858
BindAddr: s.config.BindAddr,
5959
Nodelay: s.config.Nodelay,
60-
KeepAlive: time.Duration(s.config.Keepalive),
60+
KeepAlive: time.Duration(s.config.Keepalive) * time.Second,
6161
Token: s.config.Token,
6262
MuxSession: s.config.MuxSession,
6363
ChannelSize: s.config.ChannelSize,
@@ -71,7 +71,7 @@ func (s *Server) Start() {
7171
wsConfig := &transport.WsConfig{
7272
BindAddr: s.config.BindAddr,
7373
Nodelay: s.config.Nodelay,
74-
KeepAlive: time.Duration(s.config.Keepalive),
74+
KeepAlive: time.Duration(s.config.Keepalive) * time.Second,
7575
ConnectionPool: s.config.ConnectionPool,
7676
Token: s.config.Token,
7777
ChannelSize: s.config.ChannelSize,

internal/server/transport/tcp.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,16 @@ func (s *TcpTransport) channelListener() {
227227
}
228228

229229
func (s *TcpTransport) heartbeat() {
230+
ticker := time.NewTicker(s.heartbeatDuration)
231+
defer ticker.Stop()
232+
230233
for {
231234
select {
232235
case <-s.ctx.Done():
233236
return
234-
default:
237+
case <-ticker.C:
235238
if s.controlChannel == nil {
239+
go s.Restart()
236240
return
237241
}
238242
err := utils.SendBinaryString(s.controlChannel, s.heartbeatSig)
@@ -241,12 +245,11 @@ func (s *TcpTransport) heartbeat() {
241245
go s.Restart()
242246
return
243247
}
244-
s.logger.Debug("heartbeat sended successfully")
245-
time.Sleep(s.heartbeatDuration * time.Second)
248+
s.logger.Debug("heartbeat sent successfully")
246249
}
247250
}
248-
249251
}
252+
250253
func (s *TcpTransport) poolChecker() {
251254
for {
252255
select {

internal/server/transport/ws.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,24 +116,25 @@ func (s *WsTransport) portConfigReader() {
116116
}
117117

118118
func (s *WsTransport) heartbeat() {
119+
ticker := time.NewTicker(s.heartbeatDuration)
120+
defer ticker.Stop()
121+
119122
for {
120123
select {
121124
case <-s.ctx.Done():
122125
return
123-
default:
126+
case <-ticker.C:
124127
if s.controlChannel == nil {
128+
go s.Restart()
125129
return
126130
}
127-
s.mu.Lock()
128131
err := s.controlChannel.WriteMessage(websocket.TextMessage, []byte(s.heartbeatSig))
129-
s.mu.Unlock()
130132
if err != nil {
131133
s.logger.Error("unable to send heartbeat. restarting server...")
132134
go s.Restart()
133135
return
134136
}
135137
s.logger.Debug("heartbeat sent successfully")
136-
time.Sleep(s.heartbeatDuration * time.Second)
137138
}
138139
}
139140
}

0 commit comments

Comments
 (0)