Skip to content

Commit 6f69673

Browse files
authored
fix: reconnect bug (#19)
1 parent abdc1b1 commit 6f69673

File tree

3 files changed

+126
-105
lines changed

3 files changed

+126
-105
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,6 @@ temp/
7070

7171
# Debug
7272
__debug*
73+
74+
# Local factorio dir
75+
.factorio/

docker-compose.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
version: '3.8'
2+
3+
services:
4+
factorio:
5+
image: factoriotools/factorio
6+
restart: unless-stopped
7+
ports:
8+
- "34197:34197/udp" # Game port
9+
- "27015:27015/tcp" # RCON port
10+
volumes:
11+
- ./.factorio:/factorio

internal/rcon/rcon.go

Lines changed: 112 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import (
77
"errors"
88
"io"
99
"net"
10+
"strings"
1011
"sync"
11-
"time"
12+
"sync/atomic"
1213

1314
"github.com/cenkalti/backoff/v4"
1415
"github.com/gorcon/rcon"
1516
"github.com/nekomeowww/factorio-rcon-api/v2/internal/configs"
1617
"github.com/nekomeowww/fo"
1718
"github.com/nekomeowww/xo/logger"
18-
"github.com/samber/lo"
1919
"go.uber.org/fx"
2020
"go.uber.org/zap"
2121
)
@@ -38,7 +38,7 @@ type NewRCONParams struct {
3838
Logger *logger.Logger
3939
}
4040

41-
//counterfeiter:generate -o fake/rcon.go --fake-name FakeRCON . RCON
41+
//counterfeiter:generate -o fake/rcon.go --fake-name FakeRCON . RCON//counterfeiter:generate -o fake/rcon.go --fake-name FakeRCON . RCON
4242
type RCON interface {
4343
Close() error
4444
Execute(ctx context.Context, command string) (string, error)
@@ -47,166 +47,173 @@ type RCON interface {
4747
IsReady() bool
4848
}
4949

50+
var _ RCON = (*RCONConn)(nil)
51+
5052
type RCONConn struct {
5153
*rcon.Conn
5254

5355
host string
5456
port string
5557
password string
5658

57-
ready bool
58-
readinessMutex sync.RWMutex
59-
mutex sync.RWMutex
60-
logger *logger.Logger
61-
ctx context.Context
62-
cancel context.CancelFunc
59+
ready atomic.Bool
60+
reconnectChan chan struct{}
61+
readyChan chan struct{}
62+
63+
mutex sync.RWMutex
64+
logger *logger.Logger
65+
ctx context.Context
66+
cancel context.CancelFunc
6367
}
6468

6569
func NewRCON() func(NewRCONParams) (RCON, error) {
6670
return func(params NewRCONParams) (RCON, error) {
6771
connWrapper := &RCONConn{
68-
Conn: nil,
69-
mutex: sync.RWMutex{},
70-
logger: params.Logger,
71-
host: params.Config.Factorio.RCONHost,
72-
port: params.Config.Factorio.RCONPort,
73-
password: params.Config.Factorio.RCONPassword,
72+
Conn: nil,
73+
mutex: sync.RWMutex{},
74+
logger: params.Logger,
75+
host: params.Config.Factorio.RCONHost,
76+
port: params.Config.Factorio.RCONPort,
77+
password: params.Config.Factorio.RCONPassword,
78+
reconnectChan: make(chan struct{}, 1),
79+
readyChan: make(chan struct{}, 1),
7480
}
7581

7682
ctx, cancel := context.WithCancel(context.Background())
7783
connWrapper.ctx = ctx
7884
connWrapper.cancel = cancel
7985

80-
go connWrapper.Connect(ctx)
81-
82-
params.Lifecycle.Append(fx.Hook{
83-
OnStop: func(context.Context) error {
84-
connWrapper.cancel()
86+
// Start the connection manager
87+
go connWrapper.connectionManager()
8588

86-
connWrapper.mutex.RLock()
87-
defer connWrapper.mutex.RUnlock()
89+
// Trigger initial connection
90+
select {
91+
case connWrapper.reconnectChan <- struct{}{}:
92+
default:
93+
}
8894

89-
if connWrapper.Conn == nil {
95+
params.Lifecycle.Append(fx.Hook{
96+
OnStop: func(ctx context.Context) error {
97+
return fo.Invoke0(ctx, func() error {
98+
connWrapper.cancel()
99+
close(connWrapper.reconnectChan)
100+
close(connWrapper.readyChan)
101+
102+
connWrapper.mutex.Lock()
103+
defer connWrapper.mutex.Unlock()
104+
105+
if connWrapper.Conn != nil {
106+
return connWrapper.Conn.Close()
107+
}
90108
return nil
91-
}
92-
93-
_ = connWrapper.Conn.Close()
94-
95-
return nil
109+
})
96110
},
97111
})
98112

99113
return connWrapper, nil
100114
}
101115
}
102116

103-
func (r *RCONConn) connect() (*rcon.Conn, error) {
104-
conn, err := rcon.Dial(net.JoinHostPort(r.host, r.port), r.password)
105-
if err != nil {
106-
r.logger.Error("failed to connect to RCON, will attempt to reconnect", zap.Error(err))
107-
108-
return nil, err
109-
}
117+
func (r *RCONConn) connectionManager() {
118+
backoffStrategy := backoff.NewExponentialBackOff()
110119

111-
return conn, nil
112-
}
120+
for {
121+
select {
122+
case <-r.ctx.Done():
123+
return
124+
case <-r.reconnectChan:
125+
r.ready.Store(false)
113126

114-
func (r *RCONConn) Connect(ctx context.Context) {
115-
r.setUnready()
127+
err := fo.Invoke0(r.ctx, func() error {
128+
return backoff.Retry(func() error {
129+
return r.establishConnection(r.ctx)
130+
}, backoffStrategy)
131+
})
116132

117-
err := fo.Invoke0(ctx, func() error {
118-
return backoff.Retry(func() error {
119-
conn, err := r.connect()
120133
if err != nil {
121-
return err
134+
r.logger.Error("failed to establish RCON connection after retries", zap.Error(err))
135+
continue
122136
}
123137

124-
r.mutex.Lock()
125-
defer r.mutex.Unlock()
126-
127-
r.Conn = conn
128-
129-
err = r.ping(ctx)
130-
if err != nil {
131-
r.logger.Error("failed to ping RCON, will attempt to reconnect", zap.Error(err))
138+
r.ready.Store(true)
132139

133-
return err
140+
select {
141+
case r.readyChan <- struct{}{}:
142+
default:
134143
}
135-
136-
return nil
137-
}, backoff.NewExponentialBackOff())
138-
})
139-
if err != nil {
140-
r.logger.Error("failed to connect to RCON", zap.Error(err))
141-
return
144+
}
142145
}
143-
144-
r.setReady()
145146
}
146147

147-
func (r *RCONConn) Execute(ctx context.Context, command string) (string, error) {
148-
return fo.Invoke(ctx, func() (string, error) {
149-
_, _, err := lo.AttemptWithDelay(40, 250*time.Millisecond, func(_ int, _ time.Duration) error {
150-
if !r.IsReady() {
151-
return ErrTimeout
152-
}
148+
func (r *RCONConn) establishConnection(ctx context.Context) error {
149+
return fo.Invoke0(ctx, func() error {
150+
r.mutex.Lock()
151+
defer r.mutex.Unlock()
153152

154-
return nil
155-
})
156-
if err != nil {
157-
return "", err
153+
if r.Conn != nil {
154+
_ = r.Conn.Close()
158155
}
159156

160-
resp, err := r.Conn.Execute(command)
157+
conn, err := rcon.Dial(net.JoinHostPort(r.host, r.port), r.password)
161158
if err != nil {
162-
if errors.Is(err, io.EOF) {
163-
r.logger.Warn("RCON connection is closed, attempting to reconnect")
164-
165-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
166-
defer cancel()
159+
r.logger.Error("failed to connect to RCON", zap.Error(err))
160+
return err
161+
}
167162

168-
r.Connect(ctx)
169-
return r.Execute(ctx, command)
170-
}
163+
r.Conn = conn
171164

172-
return "", err
165+
// Test the connection
166+
_, err = r.Conn.Execute("/help")
167+
if err != nil {
168+
r.logger.Error("failed to ping RCON", zap.Error(err))
169+
return err
173170
}
174171

175-
return resp, nil
172+
r.logger.Info("RCON connection established successfully")
173+
174+
return nil
176175
})
177176
}
178177

179-
func (r *RCONConn) setUnready() {
180-
r.readinessMutex.Lock()
181-
defer r.readinessMutex.Unlock()
178+
func (r *RCONConn) Execute(ctx context.Context, command string) (string, error) {
179+
return fo.Invoke(ctx, func() (string, error) {
180+
if !r.IsReady() {
181+
select {
182+
case <-ctx.Done():
183+
return "", ctx.Err()
184+
case <-r.readyChan:
185+
}
186+
}
182187

183-
r.ready = false
184-
}
188+
r.mutex.RLock()
189+
conn := r.Conn
190+
r.mutex.RUnlock()
191+
if conn == nil {
192+
return r.Execute(ctx, command)
193+
}
185194

186-
func (r *RCONConn) setReady() {
187-
r.readinessMutex.Lock()
188-
defer r.readinessMutex.Unlock()
195+
resp, err := conn.Execute(command)
196+
if err != nil {
197+
if !strings.Contains(err.Error(), "use of closed network connection") &&
198+
!strings.Contains(err.Error(), "connection reset by peer") &&
199+
!errors.Is(err, io.EOF) {
200+
return "", err
201+
}
189202

190-
r.ready = true
191-
}
203+
r.logger.Warn("RCON connection lost, reconnecting...")
192204

193-
func (r *RCONConn) ping(ctx context.Context) error {
194-
return fo.Invoke0(ctx, func() error {
195-
_, err := r.Conn.Execute("/help")
196-
if err != nil {
197-
return err
205+
select {
206+
case r.reconnectChan <- struct{}{}:
207+
default:
208+
}
209+
210+
return r.Execute(ctx, command)
198211
}
199212

200-
return nil
213+
return resp, nil
201214
})
202215
}
203216

204217
func (r *RCONConn) IsReady() bool {
205-
r.readinessMutex.RLock()
206-
r.mutex.RLock()
207-
208-
defer r.mutex.RUnlock()
209-
defer r.readinessMutex.RUnlock()
210-
211-
return r.ready && r.Conn != nil
218+
return r.ready.Load()
212219
}

0 commit comments

Comments
 (0)