Skip to content

Commit becddbf

Browse files
authored
Merge pull request #719 from Roasbeef/tcp-connection-issue
multi: attempt to fix issue of high number of TCP connections by explicitly closing connections and implementing an idle timer
2 parents 732a90c + b61d6fb commit becddbf

File tree

9 files changed

+105
-5
lines changed

9 files changed

+105
-5
lines changed

proof/courier.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/btcsuite/btcd/btcec/v2"
14+
"github.com/davecgh/go-spew/spew"
1415
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1516
"github.com/lightninglabs/taproot-assets/asset"
1617
"github.com/lightninglabs/taproot-assets/fn"
@@ -66,6 +67,9 @@ type Courier interface {
6667
// SetSubscribers sets the set of subscribers that will be notified
6768
// of proof courier related events.
6869
SetSubscribers(map[uint64]*fn.EventReceiver[fn.Event])
70+
71+
// Close stops the courier instance.
72+
Close() error
6973
}
7074

7175
// CourierAddr is a fully validated courier address (including protocol specific
@@ -223,6 +227,7 @@ func (h *UniverseRpcCourierAddr) NewCourier(_ context.Context,
223227
backoffHandle: backoffHandle,
224228
transfer: cfg.TransferLog,
225229
subscribers: subscribers,
230+
rawConn: conn,
226231
}, nil
227232
}
228233

@@ -291,11 +296,16 @@ type ProofMailbox interface {
291296
// CleanUp attempts to tear down the mailbox as specified by the passed
292297
// sid.
293298
CleanUp(ctx context.Context, sid streamID) error
299+
300+
// Close closes the underlying connection to the hashmail server.
301+
Close() error
294302
}
295303

296304
// HashMailBox is an implementation of the ProofMailbox interface backed by the
297305
// hashmailrpc.HashMailClient.
298306
type HashMailBox struct {
307+
rawConn *grpc.ClientConn
308+
299309
client hashmailrpc.HashMailClient
300310
}
301311

@@ -341,7 +351,8 @@ func NewHashMailBox(courierAddr *url.URL) (*HashMailBox,
341351
client := hashmailrpc.NewHashMailClient(conn)
342352

343353
return &HashMailBox{
344-
client: client,
354+
client: client,
355+
rawConn: conn,
345356
}, nil
346357
}
347358

@@ -480,6 +491,11 @@ func (h *HashMailBox) CleanUp(ctx context.Context, sid streamID) error {
480491
return err
481492
}
482493

494+
// Close closes the underlying connection to the hashmail server.
495+
func (h *HashMailBox) Close() error {
496+
return h.rawConn.Close()
497+
}
498+
483499
// A compile-time assertion to ensure that the HashMailBox meets the
484500
// ProofMailbox interface.
485501
var _ ProofMailbox = (*HashMailBox)(nil)
@@ -853,6 +869,8 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
853869

854870
log.Infof("Received ACK from receiver! Cleaning up mailboxes...")
855871

872+
defer h.Close()
873+
856874
// Once we receive this ACK, we can clean up our mailbox and also the
857875
// receiver's mailbox.
858876
if err := h.mailbox.CleanUp(ctx, senderStreamID); err != nil {
@@ -928,6 +946,17 @@ func (h *HashMailCourier) publishSubscriberEvent(event fn.Event) {
928946
}
929947
}
930948

949+
// Close closes the underlying connection to the hashmail server.
950+
func (h *HashMailCourier) Close() error {
951+
if err := h.mailbox.Close(); err != nil {
952+
log.Warnf("unable to close mailbox session, "+
953+
"recipient=%v: %v", err, spew.Sdump(h.recipient))
954+
return err
955+
}
956+
957+
return nil
958+
}
959+
931960
// BackoffWaitEvent is an event that is sent to a subscriber each time we
932961
// wait via the Backoff procedure before retrying to deliver a proof to the
933962
// receiver.
@@ -1030,6 +1059,10 @@ type UniverseRpcCourier struct {
10301059
// the universe RPC server.
10311060
client unirpc.UniverseClient
10321061

1062+
// rawConn is the raw connection that the courier will use to interact
1063+
// with the remote gRPC service.
1064+
rawConn *grpc.ClientConn
1065+
10331066
// backoffHandle is a handle to the backoff procedure used in proof
10341067
// delivery.
10351068
backoffHandle *BackoffHandler
@@ -1297,6 +1330,11 @@ func (c *UniverseRpcCourier) publishSubscriberEvent(event fn.Event) {
12971330
}
12981331
}
12991332

1333+
// Close closes the courier's connection to the remote gRPC service.
1334+
func (c *UniverseRpcCourier) Close() error {
1335+
return c.rawConn.Close()
1336+
}
1337+
13001338
// A compile-time assertion to ensure the UniverseRpcCourier meets the
13011339
// proof.Courier interface.
13021340
var _ Courier = (*UniverseRpcCourier)(nil)

server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"sync"
99
"sync/atomic"
10+
"time"
1011

1112
proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
1213
"github.com/lightninglabs/lndclient"
@@ -21,6 +22,7 @@ import (
2122
"github.com/lightningnetwork/lnd/lnrpc"
2223
"github.com/lightningnetwork/lnd/macaroons"
2324
"google.golang.org/grpc"
25+
"google.golang.org/grpc/keepalive"
2426
"gopkg.in/macaroon-bakery.v2/bakery"
2527
)
2628

@@ -269,6 +271,12 @@ func (s *Server) RunUntilShutdown(mainErrChan <-chan error) error {
269271
serverOpts = append(serverOpts, rpcServerOpts...)
270272
serverOpts = append(serverOpts, ServerMaxMsgReceiveSize)
271273

274+
keepAliveParams := keepalive.ServerParameters{
275+
MaxConnectionIdle: time.Minute * 2,
276+
}
277+
278+
serverOpts = append(serverOpts, grpc.KeepaliveParams(keepAliveParams))
279+
272280
grpcServer := grpc.NewServer(serverOpts...)
273281
defer grpcServer.Stop()
274282

tapfreighter/chain_porter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,8 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
671671
"service handle: %w", err)
672672
}
673673

674+
defer courier.Close()
675+
674676
// Update courier events subscribers before attempting to
675677
// deliver proof.
676678
p.subscriberMtx.Lock()

universe/auto_syncer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ func (f *FederationEnvoy) Start() error {
175175
return nil
176176
}
177177

178+
// Close frees up any ephemeral resources allocated by the envoy.
179+
func (f *FederationEnvoy) Close() error {
180+
return nil
181+
}
182+
178183
// Stop stops all active goroutines.
179184
func (f *FederationEnvoy) Stop() error {
180185
f.stopOnce.Do(func() {
@@ -243,6 +248,8 @@ func (f *FederationEnvoy) pushProofToServer(ctx context.Context,
243248
"to remote server(%v): %w", addr.HostStr(), err)
244249
}
245250

251+
defer remoteUniverseServer.Close()
252+
246253
_, err = remoteUniverseServer.UpsertProofLeaf(
247254
ctx, uniID, key, leaf,
248255
)

universe/base.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func NewArchive(cfg ArchiveConfig) *Archive {
7272
return a
7373
}
7474

75+
// Close closes the archive, stopping all goroutines and freeing all resources.
76+
func (a *Archive) Close() error {
77+
return nil
78+
}
79+
7580
// fetchUniverse returns the base universe instance for the passed identifier.
7681
// The universe will be loaded in on demand if it has not been seen before.
7782
func (a *Archive) fetchUniverse(id Identifier) BaseBackend {

universe/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,9 @@ type Registrar interface {
446446
// UpsertProofLeaf upserts a proof leaf within the target universe tree.
447447
UpsertProofLeaf(ctx context.Context, id Identifier, key LeafKey,
448448
leaf *Leaf) (*Proof, error)
449+
450+
// Close is used to shutdown the active registrar instance.
451+
Close() error
449452
}
450453

451454
// Item contains the data fields necessary to insert/update a proof leaf
@@ -650,6 +653,9 @@ type DiffEngine interface {
650653
// of diff
651654
FetchProofLeaf(ctx context.Context, id Identifier,
652655
key LeafKey) ([]*Proof, error)
656+
657+
// Close is used to shutdown the active diff engine instance.
658+
Close() error
653659
}
654660

655661
// Commitment is an on chain universe commitment. This includes the merkle

universe/syncer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,
461461
return nil, fmt.Errorf("unable to create remote diff "+
462462
"engine: %w", err)
463463
}
464+
defer diffEngine.Close()
464465

465466
// With the engine created, we can now sync the local Universe with the
466467
// remote instance.

universe_rpc_diff.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
// RpcUniverseDiff is an implementation of the universe.DiffEngine interface
1616
// that uses an RPC connection to target Universe.
1717
type RpcUniverseDiff struct {
18-
conn unirpc.UniverseClient
18+
conn *universeClientConn
1919
}
2020

2121
// NewRpcUniverseDiff creates a new RpcUniverseDiff instance that dials out to
@@ -210,6 +210,17 @@ func (r *RpcUniverseDiff) FetchProofLeaf(ctx context.Context,
210210
return []*universe.Proof{uniProof}, nil
211211
}
212212

213+
// Close closes the underlying RPC connection to the remote universe server.
214+
func (r *RpcUniverseDiff) Close() error {
215+
if err := r.conn.Close(); err != nil {
216+
tapdLog.Warnf("unable to close universe RPC "+
217+
"connection: %v", err)
218+
return err
219+
}
220+
221+
return nil
222+
}
223+
213224
// A compile time interface to ensure that RpcUniverseDiff implements the
214225
// universe.DiffEngine interface.
215226
var _ universe.DiffEngine = (*RpcUniverseDiff)(nil)

universe_rpc_registrar.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
// RpcUniverseRegistrar is an implementation of the universe.Registrar interface
1919
// that uses an RPC connection to target Universe.
2020
type RpcUniverseRegistrar struct {
21-
conn unirpc.UniverseClient
21+
conn *universeClientConn
2222
}
2323

2424
// NewRpcUniverseRegistrar creates a new RpcUniverseRegistrar instance that
@@ -115,6 +115,17 @@ func (r *RpcUniverseRegistrar) UpsertProofLeaf(ctx context.Context,
115115
return unmarshalIssuanceProof(uniKey, proofResp)
116116
}
117117

118+
// Close closes the underlying RPC connection to the remote Universe server.
119+
func (r *RpcUniverseRegistrar) Close() error {
120+
if err := r.conn.Close(); err != nil {
121+
tapdLog.Warnf("unable to close universe RPC "+
122+
"connection: %v", err)
123+
return err
124+
}
125+
126+
return nil
127+
}
128+
118129
// A compile time interface to ensure that RpcUniverseRegistrar implements the
119130
// universe.Registrar interface.
120131
var _ universe.Registrar = (*RpcUniverseRegistrar)(nil)
@@ -151,10 +162,18 @@ func CheckFederationServer(localRuntimeID int64, connectTimeout time.Duration,
151162
return nil
152163
}
153164

165+
// universeClientConn is a wrapper around a gRPC client connection that also
166+
// includes the raw connection. This allows us to properly manage the lifecycle
167+
// of the connection.
168+
type universeClientConn struct {
169+
*grpc.ClientConn
170+
unirpc.UniverseClient
171+
}
172+
154173
// ConnectUniverse connects to a remote Universe server using the provided
155174
// server address.
156175
func ConnectUniverse(
157-
serverAddr universe.ServerAddr) (unirpc.UniverseClient, error) {
176+
serverAddr universe.ServerAddr) (*universeClientConn, error) {
158177

159178
// TODO(roasbeef): all info is authenticated, but also want to allow
160179
// brontide connect as well, can avoid TLS certs
@@ -179,5 +198,8 @@ func ConnectUniverse(
179198
"server: %v", err)
180199
}
181200

182-
return unirpc.NewUniverseClient(rawConn), nil
201+
return &universeClientConn{
202+
ClientConn: rawConn,
203+
UniverseClient: unirpc.NewUniverseClient(rawConn),
204+
}, nil
183205
}

0 commit comments

Comments
 (0)