Skip to content

Commit 56f9a5b

Browse files
authored
Merge pull request #1468 from lightninglabs/send-asset-label
Add Transfer Label Support for Asset Sends
2 parents bf6ffa7 + 0a13455 commit 56f9a5b

19 files changed

+946
-720
lines changed

itest/addrs_test.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package itest
33
import (
44
"bytes"
55
"context"
6+
"fmt"
7+
"time"
68

79
"github.com/btcsuite/btcd/btcec/v2/schnorr"
810
"github.com/btcsuite/btcd/wire"
@@ -964,25 +966,40 @@ func sendAsset(t *harnessTest, sender *tapdHarness,
964966

965967
require.NotEmpty(t.t, options.sendAssetRequest.TapAddrs)
966968

967-
// We need the first address's scriptkey to subscribe to events.
968-
firstAddr, err := address.DecodeAddress(
969-
options.sendAssetRequest.TapAddrs[0], &address.RegressionNetTap,
970-
)
971-
require.NoError(t.t, err)
972-
scriptKey := firstAddr.ScriptKey.SerializeCompressed()
969+
// Assign a default transfer label using a Unix timestamp if none is
970+
// provided. This will be used in filtering send events.
971+
if options.sendAssetRequest.Label == "" {
972+
options.sendAssetRequest.Label = fmt.Sprintf(
973+
"%d", time.Now().UnixNano(),
974+
)
975+
}
973976

977+
// Construct send event stream.
974978
ctxc, streamCancel := context.WithCancel(ctxb)
975979
stream, err := sender.SubscribeSendEvents(
976980
ctxc, &taprpc.SubscribeSendEventsRequest{
977-
FilterScriptKey: scriptKey,
981+
FilterLabel: options.sendAssetRequest.Label,
978982
},
979983
)
980984
require.NoError(t.t, err)
985+
986+
// Formulate a subscription handler for the send event stream.
981987
sub := &EventSubscription[*taprpc.SendEvent]{
982988
ClientEventStream: stream,
983989
Cancel: streamCancel,
990+
991+
// Use the filter callback to ensure that the server side filter
992+
// is working as expected.
993+
ShouldNotify: func(e *taprpc.SendEvent) (bool, error) {
994+
require.Equal(
995+
t.t, e.TransferLabel,
996+
options.sendAssetRequest.Label,
997+
)
998+
return true, nil
999+
},
9841000
}
9851001

1002+
// Kick off the send asset request.
9861003
resp, err := sender.SendAsset(ctxt, &options.sendAssetRequest)
9871004
if options.errText != "" {
9881005
require.ErrorContains(t.t, err, options.errText)
@@ -992,8 +1009,12 @@ func sendAsset(t *harnessTest, sender *tapdHarness,
9921009
require.NoError(t.t, err)
9931010

9941011
// We'll get events up to the point where we broadcast the transaction.
1012+
//
1013+
// Don't specify a target script key here, we are already filtering
1014+
// events by the label.
1015+
var targetScriptKey []byte = nil
9951016
AssertSendEvents(
996-
t.t, scriptKey, sub,
1017+
t.t, targetScriptKey, sub,
9971018
tapfreighter.SendStateVirtualCommitmentSelect,
9981019
tapfreighter.SendStateBroadcast,
9991020
)

itest/assertions.go

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,36 @@ func AssertReceiveEvents(t *testing.T, addr *taprpc.Addr,
961961
}
962962
}
963963

964+
// makeFilterSendEventScriptKey returns a filter function that checks if the
965+
// given script key is present in the send event. If it is, the event is
966+
// included in the stream.
967+
func makeFilterSendEventScriptKey(
968+
scriptKey btcec.PublicKey) func(*taprpc.SendEvent) (bool, error) {
969+
970+
return func(event *taprpc.SendEvent) (bool, error) {
971+
for _, vPacketBytes := range event.VirtualPackets {
972+
vPkt, err := tappsbt.Decode(vPacketBytes)
973+
if err != nil {
974+
return false, err
975+
}
976+
977+
for _, vOut := range vPkt.Outputs {
978+
if vOut.ScriptKey.PubKey == nil {
979+
continue
980+
}
981+
982+
// This packet sends to the filtered script key,
983+
// we want to include this event.
984+
if vOut.ScriptKey.PubKey.IsEqual(&scriptKey) {
985+
return true, nil
986+
}
987+
}
988+
}
989+
990+
return false, nil
991+
}
992+
}
993+
964994
// AssertSendEventsComplete makes sure the two remaining events for the given
965995
// script key are sent on the stream.
966996
func AssertSendEventsComplete(t *testing.T, scriptKey []byte,
@@ -974,39 +1004,27 @@ func AssertSendEventsComplete(t *testing.T, scriptKey []byte,
9741004

9751005
// AssertSendEvents makes sure all events with incremental status are sent
9761006
// on the stream for the given script key.
977-
func AssertSendEvents(t *testing.T, scriptKey []byte,
1007+
func AssertSendEvents(t *testing.T, targetScriptKey []byte,
9781008
stream *EventSubscription[*taprpc.SendEvent], from,
9791009
to tapfreighter.SendState) {
9801010

9811011
success := make(chan struct{})
9821012
timeout := time.After(defaultWaitTimeout)
9831013
startStatus := from
9841014

985-
targetScriptKey, err := btcec.ParsePubKey(scriptKey)
986-
require.NoError(t, err)
987-
988-
sendsToKey := func(e *taprpc.SendEvent) bool {
989-
for _, vPacketBytes := range e.VirtualPackets {
990-
vPkt, err := tappsbt.Decode(vPacketBytes)
991-
require.NoError(t, err)
992-
993-
for _, vOut := range vPkt.Outputs {
994-
if vOut.ScriptKey.PubKey == nil {
995-
continue
996-
}
997-
998-
// This packet sends to the filtered script key,
999-
// we want to include this event.
1000-
if vOut.ScriptKey.PubKey.IsEqual(
1001-
targetScriptKey,
1002-
) {
1015+
// By default, if the target script key is not given we will accept all
1016+
// send events.
1017+
sendsToKey := func(*taprpc.SendEvent) (bool, error) {
1018+
return true, nil
1019+
}
10031020

1004-
return true
1005-
}
1006-
}
1007-
}
1021+
// If a target script key is given, we will only accept send events that
1022+
// relate to that key.
1023+
if targetScriptKey != nil {
1024+
targetScriptKey, err := btcec.ParsePubKey(targetScriptKey)
1025+
require.NoError(t, err)
10081026

1009-
return false
1027+
sendsToKey = makeFilterSendEventScriptKey(*targetScriptKey)
10101028
}
10111029

10121030
// To make sure we don't forever hang on receiving on the stream, we'll
@@ -1026,7 +1044,11 @@ func AssertSendEvents(t *testing.T, scriptKey []byte,
10261044
event, err := stream.Recv()
10271045
require.NoError(t, err, "receiving send event")
10281046

1029-
require.True(t, sendsToKey(event))
1047+
// Ensure that the event complies with the target script key
1048+
// check.
1049+
res, err := sendsToKey(event)
1050+
require.NoError(t, err)
1051+
require.True(t, res)
10301052

10311053
// Check the event's error field for unexpected errors. Perform
10321054
// this check before verifying the expected send state, as

itest/utils.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package itest
33
import (
44
"bytes"
55
"context"
6+
"io"
67
"testing"
78
"time"
89

@@ -61,6 +62,51 @@ type ClientEventStream[T any] interface {
6162
type EventSubscription[T any] struct {
6263
ClientEventStream[T]
6364
Cancel context.CancelFunc
65+
66+
// ShouldNotify is an optional filter predicate function that can be
67+
// used to filter events received from the client stream.
68+
//
69+
// If set, it will be called for each event received from the stream. If
70+
// it returns true, the event is returned. If it returns false, the
71+
// event is ignored and the next event is received from the stream.
72+
ShouldNotify func(T) (bool, error)
73+
}
74+
75+
// Recv receives an event from the client stream. If a filter is set, it will
76+
// check if the event matches the filter. If it does, it returns the event.
77+
// If not, it continues receiving events until it finds one that matches the
78+
// filter.
79+
func (e *EventSubscription[T]) Recv() (T, error) {
80+
var zero T
81+
82+
// If no filter predicate is set, we can just return the event.
83+
if e.ShouldNotify == nil {
84+
return e.ClientEventStream.Recv()
85+
}
86+
87+
// If a filter is set, we need to check if the event matches the
88+
// filter. If it does, we return the event. If not, we continue
89+
// receiving events until we find one that matches the filter.
90+
for {
91+
event, err := e.ClientEventStream.Recv()
92+
if err != nil {
93+
if err == io.EOF {
94+
// Handle end of stream.
95+
return zero, err
96+
}
97+
98+
return zero, err
99+
}
100+
101+
match, err := e.ShouldNotify(event)
102+
if err != nil {
103+
return zero, err
104+
}
105+
106+
if match {
107+
return event, nil
108+
}
109+
}
64110
}
65111

66112
// CopyRequest is a helper function to copy a request so that we can modify it.

rpcserver.go

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3375,7 +3375,7 @@ func (r *rpcServer) SendAsset(ctx context.Context,
33753375
}
33763376

33773377
resp, err := r.cfg.ChainPorter.RequestShipment(
3378-
tapfreighter.NewAddressParcel(feeRate, tapAddrs...),
3378+
tapfreighter.NewAddressParcel(feeRate, req.Label, tapAddrs...),
33793379
)
33803380
if err != nil {
33813381
return nil, err
@@ -3662,6 +3662,7 @@ func marshalOutboundParcel(
36623662
AnchorTxBlockHeight: parcel.AnchorTxBlockHeight,
36633663
Inputs: rpcInputs,
36643664
Outputs: rpcOutputs,
3665+
Label: parcel.Label,
36653666
}, nil
36663667
}
36673668

@@ -3846,23 +3847,69 @@ func (r *rpcServer) SubscribeReceiveEvents(
38463847
)
38473848
}
38483849

3850+
// shouldNotifyAssetSendEvent returns true if the given AssetSendEvent matches
3851+
// all specified filter criteria. Unset filter values are ignored. If set, all
3852+
// filters must match (logical AND).
3853+
func shouldNotifyAssetSendEvent(event tapfreighter.AssetSendEvent,
3854+
targetScriptKey fn.Option[btcec.PublicKey], targetLabel string) bool {
3855+
3856+
// By default, if no filter values are provided, match on event.
3857+
match := true
3858+
3859+
// Filter by label if specified.
3860+
if targetLabel != "" {
3861+
match = match && (event.TransferLabel == targetLabel)
3862+
}
3863+
3864+
// Filter by target script key if specified.
3865+
if targetScriptKey.IsSome() {
3866+
// If script key is specified but there are no virtual packets,
3867+
// early return as a match is impossible.
3868+
if len(event.VirtualPackets) == 0 {
3869+
return false
3870+
}
3871+
3872+
scriptKey := targetScriptKey.UnwrapToPtr()
3873+
found := false
3874+
3875+
for _, vPacket := range event.VirtualPackets {
3876+
if found {
3877+
break
3878+
}
3879+
3880+
for _, vOut := range vPacket.Outputs {
3881+
if vOut.ScriptKey.PubKey == nil {
3882+
continue
3883+
}
3884+
if vOut.ScriptKey.PubKey.IsEqual(scriptKey) {
3885+
found = true
3886+
break
3887+
}
3888+
}
3889+
}
3890+
3891+
match = match && found
3892+
}
3893+
3894+
return match
3895+
}
3896+
38493897
// SubscribeSendEvents registers a subscription to the event notification
38503898
// stream which relates to the asset sending process.
38513899
func (r *rpcServer) SubscribeSendEvents(req *taprpc.SubscribeSendEventsRequest,
38523900
ntfnStream sendEventStream) error {
38533901

3854-
var (
3855-
targetScriptKey *btcec.PublicKey
3856-
err error
3857-
)
3902+
var targetScriptKey fn.Option[btcec.PublicKey]
38583903
if len(req.FilterScriptKey) > 0 {
3859-
targetScriptKey, err = btcec.ParsePubKey(req.FilterScriptKey)
3904+
scriptKey, err := btcec.ParsePubKey(req.FilterScriptKey)
38603905
if err != nil {
38613906
return fmt.Errorf("error parsing script key: %w", err)
38623907
}
3908+
3909+
targetScriptKey = fn.MaybeSome(scriptKey)
38633910
}
38643911

3865-
filter := func(event fn.Event) (bool, error) {
3912+
shouldNotify := func(event fn.Event) (bool, error) {
38663913
var e *tapfreighter.AssetSendEvent
38673914
switch typedEvent := event.(type) {
38683915
case *tapfreighter.AssetSendEvent:
@@ -3878,44 +3925,14 @@ func (r *rpcServer) SubscribeSendEvents(req *taprpc.SubscribeSendEventsRequest,
38783925
event)
38793926
}
38803927

3881-
// If we're not filtering on a specific script key, we return
3882-
// all events.
3883-
if targetScriptKey == nil {
3884-
return true, nil
3885-
}
3886-
3887-
// We can only match the script key on the active virtual
3888-
// packets, so if there are none, then this isn't an event we're
3889-
// interested in.
3890-
if len(e.VirtualPackets) == 0 {
3891-
return false, nil
3892-
}
3893-
3894-
for _, vPacket := range e.VirtualPackets {
3895-
for _, vOut := range vPacket.Outputs {
3896-
if vOut.ScriptKey.PubKey == nil {
3897-
continue
3898-
}
3899-
3900-
// This packet sends to the filtered script key,
3901-
// we want to include this event.
3902-
if vOut.ScriptKey.PubKey.IsEqual(
3903-
targetScriptKey,
3904-
) {
3905-
3906-
return true, nil
3907-
}
3908-
}
3909-
}
3910-
3911-
// No packets with outputs going to the filtered script key
3912-
// found.
3913-
return false, nil
3928+
return shouldNotifyAssetSendEvent(
3929+
*e, targetScriptKey, req.FilterLabel,
3930+
), nil
39143931
}
39153932

39163933
return handleEvents[bool, *taprpc.SendEvent](
3917-
r.cfg.ChainPorter, ntfnStream, marshalSendEvent, filter, r.quit,
3918-
false,
3934+
r.cfg.ChainPorter, ntfnStream, marshalSendEvent, shouldNotify,
3935+
r.quit, false,
39193936
)
39203937
}
39213938

@@ -4162,6 +4179,7 @@ func marshalSendEvent(event fn.Event) (*taprpc.SendEvent, error) {
41624179
SendState: e.SendState.String(),
41634180
VirtualPackets: make([][]byte, len(e.VirtualPackets)),
41644181
PassiveVirtualPackets: make([][]byte, len(e.PassivePackets)),
4182+
TransferLabel: e.TransferLabel,
41654183
}
41664184

41674185
if e.Error != nil {

tapdb/assets_store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,6 +2442,7 @@ func (a *AssetStore) LogPendingParcel(ctx context.Context,
24422442
HeightHint: int32(spend.AnchorTxHeightHint),
24432443
AnchorTxid: newAnchorTXID[:],
24442444
TransferTimeUnix: spend.TransferTime,
2445+
Label: sqlStr(spend.Label),
24452446
})
24462447
if err != nil {
24472448
return fmt.Errorf("unable to insert asset transfer: "+
@@ -3562,6 +3563,7 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
35623563
ChainFees: dbAnchorTx.ChainFees,
35633564
Inputs: inputs,
35643565
Outputs: outputs,
3566+
Label: dbT.Label.String,
35653567
}
35663568

35673569
// Set the block height if the anchor is marked as

0 commit comments

Comments
 (0)