Skip to content

Commit 5f10d7f

Browse files
committed
multi: make HashMail proof courier connection attempts lazy
This commit updates the HashMail proof courier handler to make connection attempts optionally lazy. Connection attempts are now integrated into the backoff procedure where feasible, improving the robustness of the connection handling process.
1 parent 038000c commit 5f10d7f

File tree

5 files changed

+208
-32
lines changed

5 files changed

+208
-32
lines changed

proof/courier.go

Lines changed: 200 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
1919
"google.golang.org/grpc"
2020
"google.golang.org/grpc/codes"
21+
grpcconn "google.golang.org/grpc/connectivity"
2122
"google.golang.org/grpc/credentials"
2223
"google.golang.org/grpc/status"
2324
)
@@ -91,12 +92,80 @@ type CourierCfg struct {
9192
LocalArchive Archiver
9293
}
9394

95+
// CourierConnStatus is an enum that represents the different states a courier
96+
// connection can be in.
97+
type CourierConnStatus int
98+
99+
const (
100+
// CourierConnStatusUnknown indicates that the connection status is
101+
// unknown.
102+
CourierConnStatusUnknown CourierConnStatus = iota
103+
104+
// CourierConnStatusIdle indicates that the connection is idle.
105+
CourierConnStatusIdle
106+
107+
// CourierConnStatusConnecting indicates that the connection is being
108+
// established.
109+
CourierConnStatusConnecting
110+
111+
// CourierConnStatusReady indicates that the connection is ready for
112+
// work.
113+
CourierConnStatusReady
114+
115+
// CourierConnStatusTransientFailure indicates that the connection has
116+
// seen a failure but expects to recover.
117+
CourierConnStatusTransientFailure
118+
119+
// CourierConnStatusShutdown indicates that the connection has started
120+
// shutting down.
121+
CourierConnStatusShutdown
122+
123+
// CourierConnStatusDisconnected indicates that the connection is
124+
// disconnected.
125+
CourierConnStatusDisconnected
126+
)
127+
128+
// NewCourierConnStatusFromRpcStatus creates a new courier connection status
129+
// from the given gRPC connection status.
130+
func NewCourierConnStatusFromRpcStatus(
131+
rpcStatus grpcconn.State) (CourierConnStatus, error) {
132+
133+
switch rpcStatus {
134+
case grpcconn.Idle:
135+
return CourierConnStatusIdle, nil
136+
case grpcconn.Connecting:
137+
return CourierConnStatusConnecting, nil
138+
case grpcconn.Ready:
139+
return CourierConnStatusReady, nil
140+
case grpcconn.TransientFailure:
141+
return CourierConnStatusTransientFailure, nil
142+
case grpcconn.Shutdown:
143+
return CourierConnStatusShutdown, nil
144+
default:
145+
return CourierConnStatusUnknown, fmt.Errorf("unknown courier "+
146+
"connection status: %d", rpcStatus)
147+
}
148+
}
149+
150+
// IsPending returns true if the courier connection status is pending,
151+
// indicating it is either ready or will soon be ready for use.
152+
func (c CourierConnStatus) IsPending() bool {
153+
return c == CourierConnStatusIdle ||
154+
c == CourierConnStatusConnecting ||
155+
c == CourierConnStatusReady
156+
}
157+
94158
// CourierDispatch is an interface that abstracts away the different proof
95159
// courier services that are supported.
96160
type CourierDispatch interface {
97161
// NewCourier instantiates a new courier service handle given a service
98162
// URL address.
99-
NewCourier(addr *url.URL) (Courier, error)
163+
//
164+
// The `lazyConnect` flag determines whether the courier should
165+
// establish a connection to the service immediately or delay it
166+
// until the connection is actually needed.
167+
NewCourier(ctx context.Context, addr *url.URL,
168+
lazyConnect bool) (Courier, error)
100169
}
101170

102171
// URLDispatch is a proof courier dispatch that uses the courier address URL
@@ -114,30 +183,19 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch {
114183

115184
// NewCourier instantiates a new courier service handle given a service URL
116185
// address.
117-
func (u *URLDispatch) NewCourier(addr *url.URL) (Courier, error) {
186+
func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL,
187+
lazyConnect bool) (Courier, error) {
188+
118189
subscribers := make(map[uint64]*fn.EventReceiver[fn.Event])
119190

120191
// Create new courier addr based on URL scheme.
121192
switch addr.Scheme {
122193
case HashmailCourierType:
123-
cfg := u.cfg.HashMailCfg
124-
backoffHandler := NewBackoffHandler(
125-
cfg.BackoffCfg, u.cfg.TransferLog,
194+
return NewHashMailCourier(
195+
ctx, u.cfg.HashMailCfg, u.cfg.TransferLog, addr,
196+
lazyConnect,
126197
)
127198

128-
hashMailBox, err := NewHashMailBox(addr)
129-
if err != nil {
130-
return nil, fmt.Errorf("unable to make mailbox: %w",
131-
err)
132-
}
133-
134-
return &HashMailCourier{
135-
cfg: u.cfg,
136-
backoffHandle: backoffHandler,
137-
mailbox: hashMailBox,
138-
subscribers: subscribers,
139-
}, nil
140-
141199
case UniverseRpcCourierType:
142200
cfg := u.cfg.UniverseRpcCfg
143201
backoffHandler := NewBackoffHandler(
@@ -237,6 +295,10 @@ type ProofMailbox interface {
237295

238296
// Close closes the underlying connection to the hashmail server.
239297
Close() error
298+
299+
// ConnectionStatus returns the current connection status of the
300+
// mailbox service connection.
301+
ConnectionStatus() (CourierConnStatus, error)
240302
}
241303

242304
// HashMailBox is an implementation of the ProofMailbox interface backed by the
@@ -265,7 +327,7 @@ func serverDialOpts() ([]grpc.DialOption, error) {
265327
//
266328
// NOTE: The TLS certificate path argument (tlsCertPath) is optional. If unset,
267329
// then the system's TLS trust store is used.
268-
func NewHashMailBox(courierAddr *url.URL) (*HashMailBox,
330+
func NewHashMailBox(ctx context.Context, courierAddr *url.URL) (*HashMailBox,
269331
error) {
270332

271333
if courierAddr.Scheme != HashmailCourierType {
@@ -281,7 +343,7 @@ func NewHashMailBox(courierAddr *url.URL) (*HashMailBox,
281343
serverAddr := fmt.Sprintf(
282344
"%s:%s", courierAddr.Hostname(), courierAddr.Port(),
283345
)
284-
conn, err := grpc.Dial(serverAddr, dialOpts...)
346+
conn, err := grpc.DialContext(ctx, serverAddr, dialOpts...)
285347
if err != nil {
286348
return nil, err
287349
}
@@ -434,6 +496,17 @@ func (h *HashMailBox) Close() error {
434496
return h.rawConn.Close()
435497
}
436498

499+
// ConnectionStatus returns the current connection status of the mailbox service
500+
// connection.
501+
func (h *HashMailBox) ConnectionStatus() (CourierConnStatus, error) {
502+
if h == nil || h.rawConn == nil {
503+
return CourierConnStatusDisconnected, nil
504+
}
505+
506+
grpcStatus := h.rawConn.GetState()
507+
return NewCourierConnStatusFromRpcStatus(grpcStatus)
508+
}
509+
437510
// A compile-time assertion to ensure that the HashMailBox meets the
438511
// ProofMailbox interface.
439512
var _ ProofMailbox = (*HashMailBox)(nil)
@@ -717,8 +790,15 @@ type HashMailCourierCfg struct {
717790
// HashMailCourier is a hashmail proof courier service handle. It implements the
718791
// Courier interface.
719792
type HashMailCourier struct {
720-
// cfg is the general courier configuration.
721-
cfg *CourierCfg
793+
// cfg is the hashmail courier configuration.
794+
cfg *HashMailCourierCfg
795+
796+
// addr is the address of the hashmail server.
797+
addr *url.URL
798+
799+
// transferLog is a log for recording proof delivery and retrieval
800+
// attempts.
801+
transferLog TransferLog
722802

723803
// backoffHandle is a handle to the backoff procedure used in proof
724804
// delivery.
@@ -737,6 +817,74 @@ type HashMailCourier struct {
737817
subscriberMtx sync.Mutex
738818
}
739819

820+
// NewHashMailCourier creates a new hashmail proof courier service handle.
821+
func NewHashMailCourier(ctx context.Context, cfg *HashMailCourierCfg,
822+
transferLog TransferLog, courierAddr *url.URL,
823+
lazyConnect bool) (*HashMailCourier, error) {
824+
825+
courier := HashMailCourier{
826+
cfg: cfg,
827+
addr: courierAddr,
828+
transferLog: transferLog,
829+
backoffHandle: NewBackoffHandler(cfg.BackoffCfg, transferLog),
830+
subscribers: make(map[uint64]*fn.EventReceiver[fn.Event]),
831+
}
832+
833+
// If we're not lazy connecting, then we'll attempt to connect to the
834+
// mailbox service immediately.
835+
if !lazyConnect {
836+
err := courier.ensureConnect(ctx)
837+
if err != nil {
838+
return nil, fmt.Errorf("unable to connect to mailbox "+
839+
"service during hashmail courier "+
840+
"instantiation: %w", err)
841+
}
842+
}
843+
844+
return &courier, nil
845+
}
846+
847+
// ensureConnect ensures that the courier is connected to the hashmail server.
848+
// This method does nothing if a mailbox service connection is already
849+
// established.
850+
func (h *HashMailCourier) ensureConnect(ctx context.Context) error {
851+
// If we're already connected, we'll return early.
852+
if h.mailbox != nil {
853+
// If the mailbox is already instantiated, we'll check the
854+
// to determine if the connection is ready.
855+
connStatus, err := h.mailbox.ConnectionStatus()
856+
if err != nil {
857+
return fmt.Errorf("unable to determine connection "+
858+
"status: %w", err)
859+
}
860+
861+
// Return early and don't attempt to establish a new connection
862+
// if the connection status is idle, ready, or connecting.
863+
if connStatus.IsPending() {
864+
return nil
865+
}
866+
867+
// At this point, even though the mailbox is instantiated, a
868+
// connection is not ready. We'll close the mailbox and attempt
869+
// to establish a new connection.
870+
err = h.mailbox.Close()
871+
if err != nil {
872+
return fmt.Errorf("unable to close existing mailbox "+
873+
"service connection: %w", err)
874+
}
875+
}
876+
877+
// Instantiate a new connection to the mailbox service.
878+
mailbox, err := NewHashMailBox(ctx, h.addr)
879+
if err != nil {
880+
return fmt.Errorf("unable to connect to hashmail server: %w",
881+
err)
882+
}
883+
884+
h.mailbox = mailbox
885+
return nil
886+
}
887+
740888
// DeliverProof attempts to delivery a proof to the receiver, using the
741889
// information in the Addr type.
742890
//
@@ -754,7 +902,17 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
754902
// Interact with the hashmail service using a backoff procedure to
755903
// ensure that we don't overwhelm the service with delivery attempts.
756904
deliveryExec := func() error {
757-
err := h.initMailboxes(
905+
// Connect to the hashmail service if a connection hasn't been
906+
// established yet.
907+
err := h.ensureConnect(ctx)
908+
if err != nil {
909+
return fmt.Errorf("unable to connect to hashmail "+
910+
"mailbox service during delivery attempt: %w",
911+
err)
912+
}
913+
914+
// Initialize the mailboxes for the sender and receiver.
915+
err = h.initMailboxes(
758916
ctx, senderStreamID, receiverStreamID,
759917
)
760918
if err != nil {
@@ -781,10 +939,10 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
781939
// Wait to receive the ACK from the remote party over
782940
// their stream.
783941
log.Infof("Waiting (%v) for receiver ACK via sid=%x",
784-
h.cfg.HashMailCfg.ReceiverAckTimeout, receiverStreamID)
942+
h.cfg.ReceiverAckTimeout, receiverStreamID)
785943

786944
ctxTimeout, cancel := context.WithTimeout(
787-
ctx, h.cfg.HashMailCfg.ReceiverAckTimeout,
945+
ctx, h.cfg.ReceiverAckTimeout,
788946
)
789947
defer cancel()
790948
err = h.mailbox.RecvAck(ctxTimeout, receiverStreamID)
@@ -824,13 +982,21 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
824982
func (h *HashMailCourier) initMailboxes(ctx context.Context,
825983
senderStreamID streamID, receiverStreamID streamID) error {
826984

985+
// Connect to the hashmail service if a connection hasn't been
986+
// established yet.
987+
err := h.ensureConnect(ctx)
988+
if err != nil {
989+
return fmt.Errorf("unable to connect to hashmail mailbox "+
990+
"service during mailbox init: %w", err)
991+
}
992+
827993
// To deliver the proof to the receiver, we'll use our hashmail box to
828994
// create a new session that we'll use to send the proof over.
829995
// We'll send on this stream, while the receiver receives on it.
830996
//
831997
// TODO(roasbeef): should do this as early in the process as possible.
832998
log.Infof("Creating sender mailbox w/ sid=%x", senderStreamID)
833-
if err := h.mailbox.Init(ctx, senderStreamID); err != nil {
999+
if err = h.mailbox.Init(ctx, senderStreamID); err != nil {
8341000
return fmt.Errorf("failed to init sender stream mailbox: %w",
8351001
err)
8361002
}
@@ -938,6 +1104,14 @@ func NewBackoffWaitEvent(
9381104
func (h *HashMailCourier) ReceiveProof(ctx context.Context, recipient Recipient,
9391105
loc Locator) (*AnnotatedProof, error) {
9401106

1107+
// Connect to the hashmail service if a connection hasn't been
1108+
// established yet.
1109+
err := h.ensureConnect(ctx)
1110+
if err != nil {
1111+
return nil, fmt.Errorf("unable to connect to hashmail "+
1112+
"mailbox service during proof receive attempt: %w", err)
1113+
}
1114+
9411115
senderStreamID := deriveSenderStreamID(recipient)
9421116
if err := h.mailbox.Init(ctx, senderStreamID); err != nil {
9431117
return nil, err

proof/mock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,8 @@ type MockProofCourierDispatcher struct {
444444

445445
// NewCourier instantiates a new courier service handle given a service
446446
// URL address.
447-
func (m *MockProofCourierDispatcher) NewCourier(*url.URL) (Courier,
448-
error) {
447+
func (m *MockProofCourierDispatcher) NewCourier(context.Context,
448+
*url.URL, bool) (Courier, error) {
449449

450450
return m.Courier, nil
451451
}

tapchannel/aux_sweeper.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -801,13 +801,15 @@ func importOutputProofs(scid lnwire.ShortChannelID,
801801

802802
// First, we'll make a courier to use in fetching the proofs we
803803
// need.
804-
proofFetcher, err := proofDispatch.NewCourier(courierAddr)
804+
ctxb := context.Background()
805+
proofFetcher, err := proofDispatch.NewCourier(
806+
ctxb, courierAddr, true,
807+
)
805808
if err != nil {
806809
return fmt.Errorf("unable to create proof courier: %w",
807810
err)
808811
}
809812

810-
ctxb := context.Background()
811813
recipient := proof.Recipient{
812814
ScriptKey: scriptKey,
813815
AssetID: proofPrevID.ID,

tapfreighter/chain_porter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
820820
// Initiate proof courier service handle from the proof
821821
// courier address found in the Tap address.
822822
courier, err := p.cfg.ProofCourierDispatcher.NewCourier(
823-
proofCourierAddr,
823+
ctx, proofCourierAddr, true,
824824
)
825825
if err != nil {
826826
return fmt.Errorf("unable to initiate proof courier "+

tapgarden/custodian.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint,
564564
// Initiate proof courier service handle from the proof courier address
565565
// found in the Tap address.
566566
courier, err := c.cfg.ProofCourierDispatcher.NewCourier(
567-
&addr.ProofCourierAddr,
567+
ctx, &addr.ProofCourierAddr, true,
568568
)
569569
if err != nil {
570570
return fmt.Errorf("unable to initiate proof courier service "+

0 commit comments

Comments
 (0)