Skip to content

Commit 1b5a4ef

Browse files
author
ffranr
authored
Merge pull request #1055 from lightninglabs/retry-proof-delivery
Reattempt proof delivery on node restart
2 parents 8597ee3 + 332a550 commit 1b5a4ef

File tree

10 files changed

+405
-112
lines changed

10 files changed

+405
-112
lines changed

fn/option.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (o Option[A]) UnwrapOr(a A) A {
5959
return a
6060
}
6161

62-
// UnwrapPtr is used to extract a reference to a value from an option, and we
62+
// UnwrapToPtr is used to extract a reference to a value from an option, and we
6363
// supply an empty pointer in the case when the Option is empty.
6464
func (o Option[A]) UnwrapToPtr() *A {
6565
var v *A

itest/psbt_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,8 +1042,8 @@ func testPsbtMultiSend(t *harnessTest) {
10421042
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
10431043
defer cancel()
10441044

1045-
// Now that we have the asset created, we'll make a new node that'll
1046-
// serve as the node which'll receive the assets.
1045+
// With the asset created, we'll set up a new node that will act as the
1046+
// receiver of the transfer.
10471047
secondTapd := setupTapdHarness(
10481048
t.t, t, t.lndHarness.Bob, t.universeServer,
10491049
)

itest/send_test.go

Lines changed: 220 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
1717
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
1818
"github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc"
19+
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
1920
"github.com/lightningnetwork/lnd/lntest/wait"
2021
"github.com/stretchr/testify/require"
2122
)
@@ -573,7 +574,8 @@ func testBasicSendPassiveAsset(t *harnessTest) {
573574

574575
// testReattemptFailedSendHashmailCourier tests that a failed attempt at
575576
// sending an asset proof will be reattempted by the tapd node. This test
576-
// targets the hashmail courier.
577+
// targets the hashmail courier. The proof courier is specified in the test
578+
// list entry.
577579
func testReattemptFailedSendHashmailCourier(t *harnessTest) {
578580
var (
579581
ctxb = context.Background()
@@ -654,11 +656,6 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {
654656

655657
// Simulate a failed attempt at sending the asset proof by stopping
656658
// the receiver node.
657-
//
658-
// The receiving tapd node does not return a proof received confirmation
659-
// message via the universe RPC courier. We can simulate a proof
660-
// transfer failure by stopping the courier service directly and not the
661-
// receiving tapd node.
662659
require.NoError(t.t, t.tapd.stop(false))
663660

664661
// Send asset and then mine to confirm the associated on-chain tx.
@@ -668,33 +665,80 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {
668665
wg.Wait()
669666
}
670667

671-
// testReattemptFailedSendUniCourier tests that a failed attempt at
672-
// sending an asset proof will be reattempted by the tapd node. This test
673-
// targets the universe proof courier.
674-
func testReattemptFailedSendUniCourier(t *harnessTest) {
668+
// testReattemptProofTransferOnTapdRestart tests that a failed attempt at
669+
// transferring a transfer output proof to a proof courier will be reattempted
670+
// by the sending tapd node upon restart. This test targets the universe
671+
// courier.
672+
func testReattemptProofTransferOnTapdRestart(t *harnessTest) {
675673
var (
676674
ctxb = context.Background()
677675
wg sync.WaitGroup
678676
)
679677

680-
// Make a new node which will send the asset to the primary tapd node.
681-
// We expect this node to fail because our send call will time out
682-
// whilst the porter continues to attempt to send the asset.
678+
// For this test we will use the universe server as the proof courier.
679+
proofCourier := t.universeServer
680+
681+
// Make a new tapd node which will send an asset to a receiving tapd
682+
// node.
683683
sendTapd := setupTapdHarness(
684684
t.t, t, t.lndHarness.Bob, t.universeServer,
685685
func(params *tapdHarnessParams) {
686686
params.expectErrExit = true
687+
params.proofCourier = proofCourier
687688
},
688689
)
690+
defer func() {
691+
// Any node that has been started within an itest should be
692+
// explicitly stopped within the same itest.
693+
require.NoError(t.t, sendTapd.stop(!*noDelete))
694+
}()
695+
696+
// Use the primary tapd node as the receiver node.
697+
recvTapd := t.tapd
698+
699+
// Use the sending node to mint an asset for sending.
700+
rpcAssets := MintAssetsConfirmBatch(
701+
t.t, t.lndHarness.Miner.Client, sendTapd,
702+
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
703+
)
704+
705+
genInfo := rpcAssets[0].AssetGenesis
706+
707+
// After minting an asset with the sending node, we need to synchronize
708+
// the Universe state to ensure the receiving node is updated and aware
709+
// of the asset.
710+
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))
711+
712+
// Create a new address for the receiver node. We will use the universe
713+
// server as the proof courier.
714+
proofCourierAddr := fmt.Sprintf(
715+
"%s://%s", proof.UniverseRpcCourierType,
716+
proofCourier.service.rpcHost(),
717+
)
718+
t.Logf("Proof courier address: %s", proofCourierAddr)
719+
720+
recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
721+
AssetId: genInfo.AssetId,
722+
Amt: 10,
723+
ProofCourierAddr: proofCourierAddr,
724+
})
725+
require.NoError(t.t, err)
726+
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)
727+
728+
// Soon we will be attempting to send an asset to the receiver node. We
729+
// want the attempt to fail until we restart the sending node.
730+
// Therefore, we will take the proof courier service offline.
731+
t.Log("Stopping proof courier service")
732+
require.NoError(t.t, proofCourier.Stop())
689733

690-
// Subscribe to receive asset send events from the sending tapd node.
734+
// Now that the proof courier service is offline, the sending node's
735+
// attempt to transfer the asset proof should fail.
736+
//
737+
// We will soon start the asset transfer process. However, before we
738+
// start, we subscribe to the send events from the sending tapd node so
739+
// that we can be sure that a transfer has been attempted.
691740
events := SubscribeSendEvents(t.t, sendTapd)
692741

693-
// Test to ensure that we receive the expected number of backoff wait
694-
// event notifications.
695-
// This test is executed in a goroutine to ensure that we can receive
696-
// the event notification(s) from the tapd node as the rest of the test
697-
// proceeds.
698742
wg.Add(1)
699743
go func() {
700744
defer wg.Done()
@@ -712,7 +756,8 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
712756
// Expected number of events is one less than the number of
713757
// tries because the first attempt does not count as a backoff
714758
// event.
715-
nodeBackoffCfg := t.tapd.clientCfg.HashMailCourier.BackoffCfg
759+
nodeBackoffCfg :=
760+
sendTapd.clientCfg.UniverseRpcCourier.BackoffCfg
716761
expectedEventCount := nodeBackoffCfg.NumTries - 1
717762

718763
// Context timeout scales with expected number of events.
@@ -729,33 +774,174 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
729774
)
730775
}()
731776

732-
// Mint an asset for sending.
777+
// Start asset transfer and then mine to confirm the associated on-chain
778+
// tx. The on-chain tx should be mined successfully, but we expect the
779+
// asset proof transfer to be unsuccessful.
780+
sendResp, _ := sendAssetsToAddr(t, sendTapd, recvAddr)
781+
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)
782+
783+
// Wait to ensure that the asset transfer attempt has been made.
784+
wg.Wait()
785+
786+
// Stop the sending tapd node. This downtime will give us the
787+
// opportunity to restart the proof courier service.
788+
t.Log("Stopping sending tapd node")
789+
require.NoError(t.t, sendTapd.stop(false))
790+
791+
// Restart the proof courier service.
792+
t.Log("Starting proof courier service")
793+
require.NoError(t.t, proofCourier.Start(nil))
794+
t.Logf("Proof courier address: %s", proofCourier.service.rpcHost())
795+
796+
// Ensure that the proof courier address has not changed on restart.
797+
// The port is currently selected opportunistically.
798+
// If the proof courier address has changed the tap address will be
799+
// stale.
800+
newProofCourierAddr := fmt.Sprintf(
801+
"%s://%s", proof.UniverseRpcCourierType,
802+
proofCourier.service.rpcHost(),
803+
)
804+
require.Equal(t.t, proofCourierAddr, newProofCourierAddr)
805+
806+
// Identify receiver's asset transfer output.
807+
require.Len(t.t, sendResp.Transfer.Outputs, 2)
808+
recvOutput := sendResp.Transfer.Outputs[0]
809+
810+
// If the script key of the output is local to the sending node, then
811+
// the receiver's output is the second output.
812+
if recvOutput.ScriptKeyIsLocal {
813+
recvOutput = sendResp.Transfer.Outputs[1]
814+
}
815+
816+
// Formulate a universe key to query the proof courier for the asset
817+
// transfer proof.
818+
uniKey := unirpc.UniverseKey{
819+
Id: &unirpc.ID{
820+
Id: &unirpc.ID_AssetId{
821+
AssetId: genInfo.AssetId,
822+
},
823+
ProofType: unirpc.ProofType_PROOF_TYPE_TRANSFER,
824+
},
825+
LeafKey: &unirpc.AssetKey{
826+
Outpoint: &unirpc.AssetKey_OpStr{
827+
OpStr: recvOutput.Anchor.Outpoint,
828+
},
829+
ScriptKey: &unirpc.AssetKey_ScriptKeyBytes{
830+
ScriptKeyBytes: recvOutput.ScriptKey,
831+
},
832+
},
833+
}
834+
835+
// Ensure that the transfer proof has not reached the proof courier yet.
836+
resp, err := proofCourier.service.QueryProof(ctxb, &uniKey)
837+
require.Nil(t.t, resp)
838+
require.ErrorContains(t.t, err, "no universe proof found")
839+
840+
// Restart the sending tapd node. The node should reattempt to transfer
841+
// the asset proof to the proof courier.
842+
t.Log("Restarting sending tapd node")
843+
require.NoError(t.t, sendTapd.start(false))
844+
845+
require.Eventually(t.t, func() bool {
846+
resp, err = proofCourier.service.QueryProof(ctxb, &uniKey)
847+
return err == nil && resp != nil
848+
}, defaultWaitTimeout, 200*time.Millisecond)
849+
850+
// TODO(ffranr): Modify the receiver node proof retrieval backoff
851+
// schedule such that we can assert that the transfer fully completes
852+
// in a timely and predictable manner.
853+
// AssertNonInteractiveRecvComplete(t.t, recvTapd, 1)
854+
}
855+
856+
// testReattemptFailedSendUniCourier tests that a failed attempt at
857+
// sending an asset proof will be reattempted by the tapd node. This test
858+
// targets the universe proof courier.
859+
func testReattemptFailedSendUniCourier(t *harnessTest) {
860+
var (
861+
ctxb = context.Background()
862+
wg sync.WaitGroup
863+
)
864+
865+
// Make a new node which will send the asset to the primary tapd node.
866+
// We expect this node to fail because our send call will time out
867+
// whilst the porter continues to attempt to send the asset.
868+
sendTapd := setupTapdHarness(
869+
t.t, t, t.lndHarness.Bob, t.universeServer,
870+
func(params *tapdHarnessParams) {
871+
params.expectErrExit = true
872+
},
873+
)
874+
875+
// Use the primary tapd node as the receiver node.
876+
recvTapd := t.tapd
877+
878+
// Use the sending node to mint an asset for sending.
733879
rpcAssets := MintAssetsConfirmBatch(
734880
t.t, t.lndHarness.Miner.Client, sendTapd,
735881
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
736882
)
737883

738884
genInfo := rpcAssets[0].AssetGenesis
739885

740-
// Synchronize the Universe state of the second node, with the main
741-
// node.
742-
t.syncUniverseState(sendTapd, t.tapd, len(rpcAssets))
886+
// After minting an asset with the sending node, we need to synchronize
887+
// the Universe state to ensure the receiving node is updated and aware
888+
// of the asset.
889+
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))
743890

744891
// Create a new address for the receiver node.
745-
recvAddr, err := t.tapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
892+
recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
746893
AssetId: genInfo.AssetId,
747894
Amt: 10,
748895
})
749896
require.NoError(t.t, err)
750-
AssertAddrCreated(t.t, t.tapd, rpcAssets[0], recvAddr)
897+
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)
898+
899+
// No we will ensure that the expected number of backoff wait event
900+
// notifications are emitted from the sending node.
901+
//
902+
// We identify backoff wait events in a goroutine to ensure that we can
903+
// capture event notifications from the send node while the main
904+
// test continues.
905+
//
906+
// Subscribe to proof transfer send events from the sending tapd node.
907+
events := SubscribeSendEvents(t.t, sendTapd)
908+
909+
wg.Add(1)
910+
go func() {
911+
defer wg.Done()
912+
913+
// Define a target event selector to match the backoff wait
914+
// event. This function selects for a specific event type.
915+
targetEventSelector := func(
916+
event *tapdevrpc.SendAssetEvent) bool {
917+
918+
return AssertSendEventProofTransferBackoffWaitTypeSend(
919+
t, event,
920+
)
921+
}
922+
923+
// Expected number of events is one less than the number of
924+
// tries because the first attempt does not count as a backoff
925+
// event.
926+
nodeBackoffCfg := sendTapd.clientCfg.HashMailCourier.BackoffCfg
927+
expectedEventCount := nodeBackoffCfg.NumTries - 1
928+
929+
// Context timeout scales with expected number of events.
930+
timeout := time.Duration(expectedEventCount) *
931+
defaultProofTransferReceiverAckTimeout
932+
933+
// Allow for some margin for the operations that aren't pure
934+
// waiting on the receiver ACK.
935+
timeout += timeoutMargin
936+
937+
assertAssetNtfsEvent(
938+
t, events, timeout, targetEventSelector,
939+
expectedEventCount,
940+
)
941+
}()
751942

752943
// Simulate a failed attempt at sending the asset proof by stopping
753944
// the proof courier service.
754-
//
755-
// In following the hashmail proof courier protocol, the receiver node
756-
// returns a proof received confirmation message via the courier.
757-
// We can simulate a proof transfer failure by stopping the receiving
758-
// tapd node. The courier service should still be operational.
759945
require.NoError(t.t, t.proofCourier.Stop())
760946

761947
// Send asset and then mine to confirm the associated on-chain tx.
@@ -765,9 +951,9 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
765951
wg.Wait()
766952
}
767953

768-
// testReattemptFailedReceiveUniCourier tests that a failed attempt at
769-
// receiving an asset proof will be reattempted by the receiving tapd node. This
770-
// test targets the universe proof courier.
954+
// testReattemptFailedReceiveUniCourier ensures that a failed attempt to receive
955+
// an asset proof is retried by the receiving Tapd node. This test focuses on
956+
// the universe proof courier.
771957
func testReattemptFailedReceiveUniCourier(t *harnessTest) {
772958
ctxb := context.Background()
773959

itest/test_list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ var testCases = []*testCase{
9292
name: "reattempt failed send uni courier",
9393
test: testReattemptFailedSendUniCourier,
9494
},
95+
{
96+
name: "reattempt proof transfer on tapd restart",
97+
test: testReattemptProofTransferOnTapdRestart,
98+
},
9599
{
96100
name: "reattempt failed receive uni courier",
97101
test: testReattemptFailedReceiveUniCourier,

0 commit comments

Comments
 (0)