Skip to content

Commit 764ef38

Browse files
committed
multi: specify proof courier proof recipient at delivery or receive
This commit enables establishing a connection to a proof courier service without specifying a particular recipient (proof owner). This change allows us to: * Share proof courier service connections among multiple recipient peers. * Test connections to the proof courier service earlier in the process, before the proof owner recipient is specified. This test can now occur before the anchor transaction broadcast in the ChainPorter's state machine.
1 parent 8fcd27c commit 764ef38

File tree

7 files changed

+50
-58
lines changed

7 files changed

+50
-58
lines changed

proof/courier.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"time"
1212

1313
"github.com/btcsuite/btcd/btcec/v2"
14-
"github.com/davecgh/go-spew/spew"
1514
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1615
"github.com/lightninglabs/taproot-assets/asset"
1716
"github.com/lightninglabs/taproot-assets/fn"
@@ -58,11 +57,12 @@ type CourierHarness interface {
5857
type Courier interface {
5958
// DeliverProof attempts to delivery a proof to the receiver, using the
6059
// information in the Addr type.
61-
DeliverProof(context.Context, *AnnotatedProof) error
60+
DeliverProof(context.Context, Recipient, *AnnotatedProof) error
6261

6362
// ReceiveProof attempts to obtain a proof as identified by the passed
6463
// locator from the source encapsulated within the specified address.
65-
ReceiveProof(context.Context, Locator) (*AnnotatedProof, error)
64+
ReceiveProof(context.Context, Recipient,
65+
Locator) (*AnnotatedProof, error)
6666

6767
// SetSubscribers sets the set of subscribers that will be notified
6868
// of proof courier related events.
@@ -96,7 +96,7 @@ type CourierCfg struct {
9696
type CourierDispatch interface {
9797
// NewCourier instantiates a new courier service handle given a service
9898
// URL address.
99-
NewCourier(addr *url.URL, recipient Recipient) (Courier, error)
99+
NewCourier(addr *url.URL) (Courier, error)
100100
}
101101

102102
// URLDispatch is a proof courier dispatch that uses the courier address URL
@@ -114,9 +114,7 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch {
114114

115115
// NewCourier instantiates a new courier service handle given a service URL
116116
// address.
117-
func (u *URLDispatch) NewCourier(addr *url.URL,
118-
recipient Recipient) (Courier, error) {
119-
117+
func (u *URLDispatch) NewCourier(addr *url.URL) (Courier, error) {
120118
subscribers := make(map[uint64]*fn.EventReceiver[fn.Event])
121119

122120
// Create new courier addr based on URL scheme.
@@ -136,7 +134,6 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
136134
return &HashMailCourier{
137135
cfg: u.cfg,
138136
backoffHandle: backoffHandler,
139-
recipient: recipient,
140137
mailbox: hashMailBox,
141138
subscribers: subscribers,
142139
}, nil
@@ -162,7 +159,6 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
162159
client := unirpc.NewUniverseClient(conn)
163160

164161
return &UniverseRpcCourier{
165-
recipient: recipient,
166162
client: client,
167163
backoffHandle: backoffHandler,
168164
cfg: u.cfg,
@@ -728,9 +724,8 @@ type HashMailCourier struct {
728724
// delivery.
729725
backoffHandle *BackoffHandler
730726

731-
// recipient describes the recipient of the proof.
732-
recipient Recipient
733-
727+
// mailbox is the mailbox service that the courier will use to interact
728+
// with the hashmail server.
734729
mailbox ProofMailbox
735730

736731
// subscribers is a map of components that want to be notified on new
@@ -747,14 +742,14 @@ type HashMailCourier struct {
747742
//
748743
// TODO(roasbeef): other delivery context as type param?
749744
func (h *HashMailCourier) DeliverProof(ctx context.Context,
750-
proof *AnnotatedProof) error {
745+
recipient Recipient, proof *AnnotatedProof) error {
751746

752747
log.Infof("Attempting to deliver receiver proof for send of "+
753-
"asset_id=%v, amt=%v", h.recipient.AssetID, h.recipient.Amount)
748+
"asset_id=%v, amt=%v", recipient.AssetID, recipient.Amount)
754749

755750
// Compute the stream IDs for the sender and receiver.
756-
senderStreamID := deriveSenderStreamID(h.recipient)
757-
receiverStreamID := deriveReceiverStreamID(h.recipient)
751+
senderStreamID := deriveSenderStreamID(recipient)
752+
receiverStreamID := deriveReceiverStreamID(recipient)
758753

759754
// Interact with the hashmail service using a backoff procedure to
760755
// ensure that we don't overwhelm the service with delivery attempts.
@@ -891,8 +886,7 @@ func (h *HashMailCourier) publishSubscriberEvent(event fn.Event) {
891886
// Close closes the underlying connection to the hashmail server.
892887
func (h *HashMailCourier) Close() error {
893888
if err := h.mailbox.Close(); err != nil {
894-
log.Warnf("unable to close mailbox session, "+
895-
"recipient=%v: %v", err, spew.Sdump(h.recipient))
889+
log.Warnf("Unable to close mailbox session: %v", err)
896890
return err
897891
}
898892

@@ -941,10 +935,10 @@ func NewBackoffWaitEvent(
941935

942936
// ReceiveProof attempts to obtain a proof as identified by the passed locator
943937
// from the source encapsulated within the specified address.
944-
func (h *HashMailCourier) ReceiveProof(ctx context.Context,
938+
func (h *HashMailCourier) ReceiveProof(ctx context.Context, recipient Recipient,
945939
loc Locator) (*AnnotatedProof, error) {
946940

947-
senderStreamID := deriveSenderStreamID(h.recipient)
941+
senderStreamID := deriveSenderStreamID(recipient)
948942
if err := h.mailbox.Init(ctx, senderStreamID); err != nil {
949943
return nil, err
950944
}
@@ -960,7 +954,7 @@ func (h *HashMailCourier) ReceiveProof(ctx context.Context,
960954

961955
// Now that we've read the proof, we'll create our mailbox (which might
962956
// already exist) to send an ACK back to the sender.
963-
receiverStreamID := deriveReceiverStreamID(h.recipient)
957+
receiverStreamID := deriveReceiverStreamID(recipient)
964958
log.Infof("Sending ACK to sender via sid=%x", receiverStreamID)
965959
if err := h.mailbox.Init(ctx, receiverStreamID); err != nil {
966960
return nil, err
@@ -1001,9 +995,6 @@ type UniverseRpcCourierCfg struct {
1001995
// UniverseRpcCourier is a universe RPC proof courier service handle. It
1002996
// implements the Courier interface.
1003997
type UniverseRpcCourier struct {
1004-
// recipient describes the recipient of the proof.
1005-
recipient Recipient
1006-
1007998
// client is the RPC client that the courier will use to interact with
1008999
// the universe RPC server.
10091000
client unirpc.UniverseClient
@@ -1030,7 +1021,7 @@ type UniverseRpcCourier struct {
10301021

10311022
// DeliverProof attempts to delivery a proof file to the receiver.
10321023
func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
1033-
annotatedProof *AnnotatedProof) error {
1024+
recipient Recipient, annotatedProof *AnnotatedProof) error {
10341025

10351026
// Decode annotated proof into proof file.
10361027
proofFile := &File{}
@@ -1041,7 +1032,7 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
10411032

10421033
log.Infof("Universe RPC proof courier attempting to deliver proof "+
10431034
"file (num_proofs=%d) for send event (asset_id=%v, amt=%v)",
1044-
proofFile.NumProofs(), c.recipient.AssetID, c.recipient.Amount)
1035+
proofFile.NumProofs(), recipient.AssetID, recipient.Amount)
10451036

10461037
// Iterate over each proof in the proof file and submit to the courier
10471038
// service.
@@ -1136,7 +1127,7 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
11361127
// ReceiveProof attempts to obtain a proof file from the courier service. The
11371128
// final proof in the target proof file is identified by the given locator.
11381129
func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
1139-
originLocator Locator) (*AnnotatedProof, error) {
1130+
_ Recipient, originLocator Locator) (*AnnotatedProof, error) {
11401131

11411132
fetchProof := func(ctx context.Context, loc Locator) (Blob, error) {
11421133
var groupKeyBytes []byte

proof/courier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {
4141

4242
localArchive.proofs.Store(locHash, proofBlob)
4343

44+
recipient := Recipient{}
4445
courier := &UniverseRpcCourier{
45-
recipient: Recipient{},
46-
client: nil,
46+
client: nil,
4747
cfg: &CourierCfg{
4848
LocalArchive: localArchive,
4949
},
@@ -58,7 +58,7 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {
5858

5959
// If we attempt to receive a proof that the local archive has, we
6060
// expect to get it back.
61-
annotatedProof, err := courier.ReceiveProof(ctxt, locator)
61+
annotatedProof, err := courier.ReceiveProof(ctxt, recipient, locator)
6262
require.NoError(t, err)
6363

6464
require.Equal(t, proofBlob, annotatedProof.Blob)
@@ -67,7 +67,7 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {
6767
// should end up in the code path that attempts to fetch the proof from
6868
// the universe. Since we don't want to set up a full universe server
6969
// in the test, we just make sure we get an error from that code path.
70-
_, err = courier.ReceiveProof(ctxt, Locator{
70+
_, err = courier.ReceiveProof(ctxt, recipient, Locator{
7171
AssetID: fn.Ptr(genesis.ID()),
7272
ScriptKey: *proof.Asset.ScriptKey.PubKey,
7373
})

proof/mock.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ type MockProofCourierDispatcher struct {
443443

444444
// NewCourier instantiates a new courier service handle given a service
445445
// URL address.
446-
func (m *MockProofCourierDispatcher) NewCourier(*url.URL, Recipient) (Courier,
446+
func (m *MockProofCourierDispatcher) NewCourier(*url.URL) (Courier,
447447
error) {
448448

449449
return m.Courier, nil
@@ -479,7 +479,7 @@ func (m *MockProofCourier) Stop() error {
479479
// DeliverProof attempts to delivery a proof to the receiver, using the
480480
// information in the Addr type.
481481
func (m *MockProofCourier) DeliverProof(_ context.Context,
482-
proof *AnnotatedProof) error {
482+
_ Recipient, proof *AnnotatedProof) error {
483483

484484
m.Lock()
485485
defer m.Unlock()
@@ -492,7 +492,7 @@ func (m *MockProofCourier) DeliverProof(_ context.Context,
492492
// ReceiveProof attempts to obtain a proof as identified by the passed
493493
// locator from the source encapsulated within the specified address.
494494
func (m *MockProofCourier) ReceiveProof(_ context.Context,
495-
loc Locator) (*AnnotatedProof, error) {
495+
_ Recipient, loc Locator) (*AnnotatedProof, error) {
496496

497497
m.Lock()
498498
defer m.Unlock()

tapchannel/aux_sweeper.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -801,21 +801,20 @@ func importOutputProofs(scid lnwire.ShortChannelID,
801801

802802
// First, we'll make a courier to use in fetching the proofs we
803803
// need.
804-
proofFetcher, err := proofDispatch.NewCourier(
805-
courierAddr, proof.Recipient{
806-
ScriptKey: scriptKey,
807-
AssetID: proofPrevID.ID,
808-
Amount: proofToImport.Asset.Amount,
809-
},
810-
)
804+
proofFetcher, err := proofDispatch.NewCourier(courierAddr)
811805
if err != nil {
812806
return fmt.Errorf("unable to create proof courier: %w",
813807
err)
814808
}
815809

816810
ctxb := context.Background()
811+
recipient := proof.Recipient{
812+
ScriptKey: scriptKey,
813+
AssetID: proofPrevID.ID,
814+
Amount: proofToImport.Asset.Amount,
815+
}
817816
prefixProof, err := proofFetcher.ReceiveProof(
818-
ctxb, inputProofLocator,
817+
ctxb, recipient, inputProofLocator,
819818
)
820819

821820
// Always attempt to close the courier, even if we encounter an

tapfreighter/chain_porter.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -819,13 +819,8 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
819819

820820
// Initiate proof courier service handle from the proof
821821
// courier address found in the Tap address.
822-
recipient := proof.Recipient{
823-
ScriptKey: key,
824-
AssetID: *receiverProof.AssetID,
825-
Amount: out.Amount,
826-
}
827822
courier, err := p.cfg.ProofCourierDispatcher.NewCourier(
828-
proofCourierAddr, recipient,
823+
proofCourierAddr,
829824
)
830825
if err != nil {
831826
return fmt.Errorf("unable to initiate proof courier "+
@@ -841,7 +836,12 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
841836
p.subscriberMtx.Unlock()
842837

843838
// Deliver proof to proof courier service.
844-
err = courier.DeliverProof(ctx, receiverProof)
839+
recipient := proof.Recipient{
840+
ScriptKey: key,
841+
AssetID: *receiverProof.AssetID,
842+
Amount: out.Amount,
843+
}
844+
err = courier.DeliverProof(ctx, recipient, receiverProof)
845845

846846
// If the proof courier returned a backoff error, then
847847
// we'll just return nil here so that we can retry

tapgarden/custodian.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -563,13 +563,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint,
563563

564564
// Initiate proof courier service handle from the proof courier address
565565
// found in the Tap address.
566-
recipient := proof.Recipient{
567-
ScriptKey: &addr.ScriptKey,
568-
AssetID: assetID,
569-
Amount: addr.Amount,
570-
}
571566
courier, err := c.cfg.ProofCourierDispatcher.NewCourier(
572-
&addr.ProofCourierAddr, recipient,
567+
&addr.ProofCourierAddr,
573568
)
574569
if err != nil {
575570
return fmt.Errorf("unable to initiate proof courier service "+
@@ -593,13 +588,18 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint,
593588
}
594589

595590
// Attempt to receive proof via proof courier service.
591+
recipient := proof.Recipient{
592+
ScriptKey: &addr.ScriptKey,
593+
AssetID: assetID,
594+
Amount: addr.Amount,
595+
}
596596
loc := proof.Locator{
597597
AssetID: &assetID,
598598
GroupKey: addr.GroupKey,
599599
ScriptKey: addr.ScriptKey,
600600
OutPoint: &op,
601601
}
602-
addrProof, err := courier.ReceiveProof(ctx, loc)
602+
addrProof, err := courier.ReceiveProof(ctx, recipient, loc)
603603
if err != nil {
604604
return fmt.Errorf("unable to receive proof using courier: %w",
605605
err)

tapgarden/custodian_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,8 @@ func TestTransactionHandling(t *testing.T) {
644644
h.walletAnchor.Transactions = append(h.walletAnchor.Transactions, *tx)
645645

646646
mockProof := randProof(t, outputIdx, tx.Tx, genesis[0], addrs[0])
647-
err := h.courier.DeliverProof(nil, mockProof)
647+
recipient := proof.Recipient{}
648+
err := h.courier.DeliverProof(nil, recipient, mockProof)
648649
require.NoError(t, err)
649650

650651
require.NoError(t, h.c.Start())
@@ -708,6 +709,7 @@ func runTransactionConfirmedOnlyTest(t *testing.T, withRestart bool) {
708709
// need to signal an unconfirmed transaction for each of them now.
709710
outputIndexes := make([]int, numAddrs)
710711
transactions := make([]*lndclient.Transaction, numAddrs)
712+
recipient := proof.Recipient{}
711713
for idx := range addrs {
712714
outputIndex, tx := randWalletTx(addrs[idx])
713715
outputIndexes[idx] = outputIndex
@@ -719,7 +721,7 @@ func runTransactionConfirmedOnlyTest(t *testing.T, withRestart bool) {
719721
mockProof := randProof(
720722
t, outputIndexes[idx], tx.Tx, genesis[idx], addrs[idx],
721723
)
722-
_ = h.courier.DeliverProof(nil, mockProof)
724+
_ = h.courier.DeliverProof(nil, recipient, mockProof)
723725
}
724726

725727
// We want events to be created for each address, they should be in the

0 commit comments

Comments
 (0)