Skip to content

Commit 8cff364

Browse files
committed
universe: fix TCP connection leak by explicitly closing gRPC conn for syncers
In this commit, we attempt to fix a TCP connection leak by explicitly closing the gRPC connections we create once we're done with the relevant gRPC client. Otherwise, we'll end up making a new connection for each new asset to be pushed, which can add up. In the future, we should also look into the server-side keep alive options.
1 parent 6a18d30 commit 8cff364

File tree

5 files changed

+47
-4
lines changed

5 files changed

+47
-4
lines changed

universe/auto_syncer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ func (f *FederationEnvoy) pushProofToServer(ctx context.Context,
243243
"to remote server(%v): %w", addr.HostStr(), err)
244244
}
245245

246+
defer remoteUniverseServer.Close()
247+
246248
_, err = remoteUniverseServer.UpsertProofLeaf(
247249
ctx, uniID, key, leaf,
248250
)

universe/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,9 @@ type Registrar interface {
401401
// UpsertProofLeaf upserts a proof leaf within the target universe tree.
402402
UpsertProofLeaf(ctx context.Context, id Identifier, key LeafKey,
403403
leaf *Leaf) (*Proof, error)
404+
405+
// Close is used to shutdown the active registrar instance.
406+
Close() error
404407
}
405408

406409
// Item contains the data fields necessary to insert/update a proof leaf
@@ -605,6 +608,9 @@ type DiffEngine interface {
605608
// of diff
606609
FetchProofLeaf(ctx context.Context, id Identifier,
607610
key LeafKey) ([]*Proof, error)
611+
612+
// Close is used to shutdown the active diff engine instance.
613+
Close() error
608614
}
609615

610616
// Commitment is an on chain universe commitment. This includes the merkle

universe/syncer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,8 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,
462462
"engine: %w", err)
463463
}
464464

465+
defer diffEngine.Close()
466+
465467
// With the engine created, we can now sync the local Universe with the
466468
// remote instance.
467469
return s.executeSync(ctx, diffEngine, syncType, syncConfigs, idsToSync)

universe_rpc_diff.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
// RpcUniverseDiff is an implementation of the universe.DiffEngine interface
1616
// that uses an RPC connection to target Universe.
1717
type RpcUniverseDiff struct {
18-
conn unirpc.UniverseClient
18+
conn *universeClientConn
1919
}
2020

2121
// NewRpcUniverseDiff creates a new RpcUniverseDiff instance that dials out to
@@ -210,6 +210,17 @@ func (r *RpcUniverseDiff) FetchProofLeaf(ctx context.Context,
210210
return []*universe.Proof{uniProof}, nil
211211
}
212212

213+
// Close closes the underlying RPC connection to the remote universe server.
214+
func (r *RpcUniverseDiff) Close() error {
215+
if err := r.conn.Close(); err != nil {
216+
tapdLog.Warnf("unable to close universe RPC "+
217+
"connection: %v", err)
218+
return err
219+
}
220+
221+
return nil
222+
}
223+
213224
// A compile time interface to ensure that RpcUniverseDiff implements the
214225
// universe.DiffEngine interface.
215226
var _ universe.DiffEngine = (*RpcUniverseDiff)(nil)

universe_rpc_registrar.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
// RpcUniverseRegistrar is an implementation of the universe.Registrar interface
1919
// that uses an RPC connection to target Universe.
2020
type RpcUniverseRegistrar struct {
21-
conn unirpc.UniverseClient
21+
conn *universeClientConn
2222
}
2323

2424
// NewRpcUniverseRegistrar creates a new RpcUniverseRegistrar instance that
@@ -115,6 +115,17 @@ func (r *RpcUniverseRegistrar) UpsertProofLeaf(ctx context.Context,
115115
return unmarshalIssuanceProof(uniKey, proofResp)
116116
}
117117

118+
// Close closes the underlying RPC connection to the remote Universe server.
119+
func (r *RpcUniverseRegistrar) Close() error {
120+
if err := r.conn.Close(); err != nil {
121+
tapdLog.Warnf("unable to close universe RPC "+
122+
"connection: %v", err)
123+
return err
124+
}
125+
126+
return nil
127+
}
128+
118129
// A compile time interface to ensure that RpcUniverseRegistrar implements the
119130
// universe.Registrar interface.
120131
var _ universe.Registrar = (*RpcUniverseRegistrar)(nil)
@@ -151,10 +162,18 @@ func CheckFederationServer(localRuntimeID int64, connectTimeout time.Duration,
151162
return nil
152163
}
153164

165+
// universeClientConn is a wrapper around a gRPC client connection that also
166+
// includes the raw connection. This allows us to properly manage the lifecycle
167+
// of the connection.
168+
type universeClientConn struct {
169+
*grpc.ClientConn
170+
unirpc.UniverseClient
171+
}
172+
154173
// ConnectUniverse connects to a remote Universe server using the provided
155174
// server address.
156175
func ConnectUniverse(
157-
serverAddr universe.ServerAddr) (unirpc.UniverseClient, error) {
176+
serverAddr universe.ServerAddr) (*universeClientConn, error) {
158177

159178
// TODO(roasbeef): all info is authenticated, but also want to allow
160179
// brontide connect as well, can avoid TLS certs
@@ -179,5 +198,8 @@ func ConnectUniverse(
179198
"server: %v", err)
180199
}
181200

182-
return unirpc.NewUniverseClient(rawConn), nil
201+
return &universeClientConn{
202+
ClientConn: rawConn,
203+
UniverseClient: unirpc.NewUniverseClient(rawConn),
204+
}, nil
183205
}

0 commit comments

Comments
 (0)