Skip to content

Commit 7d5dd17

Browse files
authored
Fix proxy close logic (#82)
`Accept()` can return `proxy.test: error: accept tcp [::]:9042: use of closed network connection` when closing. This case needs to be handled specially when the proxy is closed properly.
1 parent 96eb3a2 commit 7d5dd17

File tree

4 files changed

+26
-9
lines changed

4 files changed

+26
-9
lines changed

proxy/proxy.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ var (
4444
encodedZeroValue, _ = proxycore.EncodeType(datatype.Int, primitive.ProtocolVersion4, 0)
4545
)
4646

47+
var ErrProxyClosed = errors.New("proxy closed")
48+
4749
const preparedIdSize = 16
4850

4951
type Config struct {
@@ -76,6 +78,8 @@ type Proxy struct {
7678
clientIdGen uint64
7779
lb proxycore.LoadBalancer
7880
systemLocalValues map[string]message.Column
81+
closed chan struct{}
82+
closingMu sync.Mutex
7983
}
8084

8185
func (p *Proxy) OnEvent(event proxycore.Event) {
@@ -114,6 +118,7 @@ func NewProxy(ctx context.Context, config Config) *Proxy {
114118
ctx: ctx,
115119
config: config,
116120
logger: proxycore.GetOrCreateNopLogger(config.Logger),
121+
closed: make(chan struct{}),
117122
}
118123
}
119124

@@ -195,13 +200,25 @@ func (p *Proxy) Serve() error {
195200
for {
196201
conn, err := p.listener.AcceptTCP()
197202
if err != nil {
198-
return err
203+
select {
204+
case <-p.closed:
205+
return ErrProxyClosed
206+
default:
207+
return err
208+
}
199209
}
200210
p.handle(conn)
201211
}
202212
}
203213

204-
func (p *Proxy) Shutdown() error {
214+
func (p *Proxy) Close() error {
215+
p.closingMu.Lock()
216+
defer p.closingMu.Unlock()
217+
select {
218+
case <-p.closed:
219+
default:
220+
close(p.closed)
221+
}
205222
return p.listener.Close()
206223
}
207224

proxy/proxy_retries_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ func testProxyRetry(t *testing.T, query message.Message, response message.Error)
382382
})
383383
defer func() {
384384
cluster.Shutdown()
385-
_ = proxy.Shutdown()
385+
_ = proxy.Close()
386386
}()
387387

388388
cl := connectTestClient(t, ctx)

proxy/proxy_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestProxy_ListenAndServe(t *testing.T) {
7474
})
7575
defer func() {
7676
cluster.Shutdown()
77-
_ = proxy.Shutdown()
77+
_ = proxy.Close()
7878
}()
7979

8080
cl := connectTestClient(t, ctx)
@@ -138,7 +138,7 @@ func TestProxy_Unprepared(t *testing.T) {
138138
})
139139
defer func() {
140140
cluster.Shutdown()
141-
_ = proxy.Shutdown()
141+
_ = proxy.Close()
142142
}()
143143

144144
cl := connectTestClient(t, ctx)
@@ -174,7 +174,7 @@ func TestProxy_UseKeyspace(t *testing.T) {
174174
cluster, proxy := setupProxyTest(t, ctx, 1, nil)
175175
defer func() {
176176
cluster.Shutdown()
177-
_ = proxy.Shutdown()
177+
_ = proxy.Close()
178178
}()
179179

180180
cl := connectTestClient(t, ctx)
@@ -195,7 +195,7 @@ func TestProxy_NegotiateProtocolV5(t *testing.T) {
195195
cluster, proxy := setupProxyTest(t, ctx, 1, nil)
196196
defer func() {
197197
cluster.Shutdown()
198-
_ = proxy.Shutdown()
198+
_ = proxy.Close()
199199
}()
200200

201201
cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(testProxyContactPoint), proxycore.ClientConnConfig{})

proxy/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,14 @@ func listenAndServe(p *Proxy, ctx context.Context, logger *zap.Logger) (err erro
269269
case <-ctx.Done():
270270
logger.Debug("proxy interrupted/killed")
271271
_ = server.Close()
272-
_ = p.Shutdown()
272+
_ = p.Close()
273273
}
274274
}()
275275

276276
go func() {
277277
defer wg.Done()
278278
err = p.Serve()
279-
if err != nil {
279+
if err != nil && err != ErrProxyClosed {
280280
ch <- err
281281
}
282282
}()

0 commit comments

Comments
 (0)