Skip to content

Commit 346d838

Browse files
author
Robyn Ffrancon
authored
Merge pull request #659 from lightninglabs/add-recv-complete-event
Custodian emits a new asset-receive-complete event to notification subscribers
2 parents 80e7c85 + 6cacee9 commit 346d838

File tree

6 files changed

+545
-302
lines changed

6 files changed

+545
-302
lines changed

itest/send_test.go

Lines changed: 65 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -807,10 +807,7 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
807807
// receiving an asset proof will be reattempted by the receiving tapd node. This
808808
// test targets the universe proof courier.
809809
func testReattemptFailedReceiveUniCourier(t *harnessTest) {
810-
var (
811-
ctxb = context.Background()
812-
wg sync.WaitGroup
813-
)
810+
ctxb := context.Background()
814811

815812
// This tapd node will send the asset to the receiving tapd node.
816813
// It will also transfer proof the related transfer proofs to the
@@ -892,56 +889,47 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
892889

893890
// Test to ensure that we receive the minimum expected number of backoff
894891
// wait event notifications.
895-
//
896-
// This test is executed in a goroutine to ensure that we can receive
897-
// the event notification(s) from the tapd node as soon as the proof
898-
// courier service is restarted.
899-
wg.Add(1)
900-
go func() {
901-
defer wg.Done()
902-
903-
// Define a target event selector to match the backoff wait
904-
// event. This function selects for a specific event type.
905-
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
906-
switch eventTyped := event.Event.(type) {
907-
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
908-
ev := eventTyped.ProofTransferBackoffWaitEvent
909-
910-
// We are attempting to identify receive
911-
// transfer types. Skip the event if it is not
912-
// a receiving transfer type.
913-
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
914-
return false
915-
}
892+
t.Logf("Waiting for the receiving tapd node to complete backoff " +
893+
"proof retrieval attempts")
916894

917-
t.Logf("Found event ntfs: %v", ev)
918-
return true
895+
// Define a target event selector to match the backoff wait event. This
896+
// function selects for a specific event type.
897+
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
898+
switch eventTyped := event.Event.(type) {
899+
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
900+
ev := eventTyped.ProofTransferBackoffWaitEvent
901+
902+
// We are attempting to identify receive transfer types.
903+
// Skip the event if it is not a receiving transfer
904+
// type.
905+
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
906+
return false
919907
}
920908

921-
return false
909+
t.Logf("Found event ntfs: %v", ev)
910+
return true
922911
}
923912

924-
// Expected minimum number of events to receive.
925-
expectedEventCount := 3
926-
927-
// Context timeout scales with expected number of events.
928-
timeout := time.Duration(expectedEventCount) *
929-
defaultProofTransferReceiverAckTimeout
930-
// Add overhead buffer to context timeout.
931-
timeout += 5 * time.Second
932-
ctx, cancel := context.WithTimeout(ctxb, timeout)
933-
defer cancel()
913+
return false
914+
}
934915

935-
assertAssetRecvNtfsEvent(
936-
t, ctx, eventNtfns, targetEventSelector,
937-
expectedEventCount,
938-
)
939-
}()
916+
// Expected minimum number of events to receive.
917+
expectedEventCount := 3
918+
919+
// Context timeout scales with expected number of events.
920+
timeout := time.Duration(expectedEventCount) *
921+
defaultProofTransferReceiverAckTimeout
922+
// Add overhead buffer to context timeout.
923+
timeout += 5 * time.Second
924+
ctx, cancel := context.WithTimeout(ctxb, timeout)
925+
defer cancel()
926+
927+
// Assert that the receiver tapd node has accomplished our minimum
928+
// expected number of backoff procedure receive attempts.
929+
assertAssetRecvNtfsEvent(
930+
t, ctx, eventNtfns, targetEventSelector, expectedEventCount,
931+
)
940932

941-
// Wait for the receiver node's backoff attempts to complete.
942-
t.Logf("Waiting for the receiving tapd node to complete backoff " +
943-
"proof retrieval attempts")
944-
wg.Wait()
945933
t.Logf("Finished waiting for the receiving tapd node to complete " +
946934
"backoff procedure")
947935

@@ -956,6 +944,13 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
956944
// proof(s).
957945
t.Logf("Attempting to confirm asset received by receiver node")
958946
AssertNonInteractiveRecvComplete(t.t, receiveTapd, 1)
947+
948+
// Confirm that the sender tapd node eventually receives the asset
949+
// transfer and publishes an asset recv complete event.
950+
t.Logf("Check for asset recv complete event from receiver tapd node")
951+
assertAssetRecvCompleteEvent(
952+
t, ctxb, 5*time.Second, recvAddr.Encoded, eventNtfns,
953+
)
959954
}
960955

961956
// testOfflineReceiverEventuallyReceives tests that a receiver node will
@@ -1169,6 +1164,29 @@ func assertAssetRecvNtfsEvent(t *harnessTest, ctx context.Context,
11691164
expectedCount, countFound)
11701165
}
11711166

1167+
// assertAssetRecvNtfsEvent asserts that the given asset receive complete event
1168+
// notification was received. This function will block until the event is
1169+
// received or the event stream is closed.
1170+
func assertAssetRecvCompleteEvent(t *harnessTest, ctxb context.Context,
1171+
timeout time.Duration, encodedAddr string,
1172+
eventNtfns taprpc.TaprootAssets_SubscribeReceiveAssetEventNtfnsClient) {
1173+
1174+
ctx, cancel := context.WithTimeout(ctxb, timeout)
1175+
defer cancel()
1176+
1177+
eventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
1178+
switch eventTyped := event.Event.(type) {
1179+
case *taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent:
1180+
ev := eventTyped.AssetReceiveCompleteEvent
1181+
return encodedAddr == ev.Address.Encoded
1182+
default:
1183+
return false
1184+
}
1185+
}
1186+
1187+
assertAssetRecvNtfsEvent(t, ctx, eventNtfns, eventSelector, 1)
1188+
}
1189+
11721190
// testMultiInputSendNonInteractiveSingleID tests that we can properly
11731191
// non-interactively send a single asset from multiple inputs.
11741192
//

rpcserver.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2350,7 +2350,9 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
23502350
// RPC event type and sent over the stream.
23512351
case event := <-eventSubscriber.NewItemCreated.ChanOut():
23522352

2353-
rpcEvent, err := marshallReceiveAssetEvent(event)
2353+
rpcEvent, err := marshallReceiveAssetEvent(
2354+
event, r.cfg.TapAddrBook,
2355+
)
23542356
if err != nil {
23552357
return fmt.Errorf("failed to marshall "+
23562358
"asset receive event into RPC event: "+
@@ -2385,8 +2387,8 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
23852387
}
23862388

23872389
// marshallReceiveAssetEvent maps an asset receive event to its RPC counterpart.
2388-
func marshallReceiveAssetEvent(
2389-
eventInterface fn.Event) (*taprpc.ReceiveAssetEvent, error) {
2390+
func marshallReceiveAssetEvent(eventInterface fn.Event,
2391+
db address.Storage) (*taprpc.ReceiveAssetEvent, error) {
23902392

23912393
switch event := eventInterface.(type) {
23922394
case *proof.BackoffWaitEvent:
@@ -2413,6 +2415,23 @@ func marshallReceiveAssetEvent(
24132415
Event: &eventRpc,
24142416
}, nil
24152417

2418+
case *tapgarden.AssetReceiveCompleteEvent:
2419+
rpcAddr, err := marshalAddr(&event.Address, db)
2420+
if err != nil {
2421+
return nil, fmt.Errorf("error marshaling addr: %w", err)
2422+
}
2423+
2424+
eventRpc := taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent{
2425+
AssetReceiveCompleteEvent: &taprpc.AssetReceiveCompleteEvent{
2426+
Timestamp: event.Timestamp().UnixMicro(),
2427+
Address: rpcAddr,
2428+
Outpoint: event.OutPoint.String(),
2429+
},
2430+
}
2431+
return &taprpc.ReceiveAssetEvent{
2432+
Event: &eventRpc,
2433+
}, nil
2434+
24162435
default:
24172436
return nil, fmt.Errorf("unknown event type: %T", eventInterface)
24182437
}

tapgarden/custodian.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,36 @@ import (
1818
"github.com/lightningnetwork/lnd/lnrpc"
1919
)
2020

21+
// AssetReceiveCompleteEvent is an event that is sent to a subscriber once the
22+
// asset receive process has finished for a given address and outpoint.
23+
type AssetReceiveCompleteEvent struct {
24+
// timestamp is the time the event was created.
25+
timestamp time.Time
26+
27+
// Address is the address associated with the asset that was received.
28+
Address address.Tap
29+
30+
// OutPoint is the outpoint of the transaction that was used to receive
31+
// the asset.
32+
OutPoint wire.OutPoint
33+
}
34+
35+
// Timestamp returns the timestamp of the event.
36+
func (e *AssetReceiveCompleteEvent) Timestamp() time.Time {
37+
return e.timestamp
38+
}
39+
40+
// NewAssetRecvCompleteEvent creates a new AssetReceiveCompleteEvent.
41+
func NewAssetRecvCompleteEvent(addr address.Tap,
42+
outpoint wire.OutPoint) *AssetReceiveCompleteEvent {
43+
44+
return &AssetReceiveCompleteEvent{
45+
timestamp: time.Now().UTC(),
46+
Address: addr,
47+
OutPoint: outpoint,
48+
}
49+
}
50+
2151
// CustodianConfig houses all the items that the Custodian needs to carry out
2252
// its duties.
2353
type CustodianConfig struct {
@@ -458,6 +488,18 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
458488
log.Errorf("unable to import proofs: %v", err)
459489
return
460490
}
491+
492+
// At this point the "receive" process is complete. We
493+
// will now notify all status event subscribers.
494+
recvCompleteEvent := NewAssetRecvCompleteEvent(
495+
*addr, op,
496+
)
497+
err = c.publishSubscriberStatusEvent(recvCompleteEvent)
498+
if err != nil {
499+
log.Errorf("unable publish status event: %v",
500+
err)
501+
return
502+
}
461503
}()
462504
}
463505

@@ -702,6 +744,23 @@ func (c *Custodian) RegisterSubscriber(receiver *fn.EventReceiver[fn.Event],
702744
return nil
703745
}
704746

747+
// publishSubscriberStatusEvent publishes an event to all status events
748+
// subscribers.
749+
func (c *Custodian) publishSubscriberStatusEvent(event fn.Event) error {
750+
// Lock the subscriber mutex to ensure that we don't modify the
751+
// subscriber map while we're iterating over it.
752+
c.statusEventsSubsMtx.Lock()
753+
defer c.statusEventsSubsMtx.Unlock()
754+
755+
for _, sub := range c.statusEventsSubs {
756+
if !fn.SendOrQuit(sub.NewItemCreated.ChanIn(), event, c.Quit) {
757+
return fmt.Errorf("custodian shutting down")
758+
}
759+
}
760+
761+
return nil
762+
}
763+
705764
// RemoveSubscriber removes a subscriber from the set of status event
706765
// subscribers.
707766
func (c *Custodian) RemoveSubscriber(

0 commit comments

Comments
 (0)