Skip to content

Commit a73aac8

Browse files
committed
rpc: add RPC endpoints for the RFQ system
This commit adds the RFQ RPC server. It also adds four RPC endpoints: 1. Upsert buy order. 2. Upsert sell offer. 3. List accepted quotes. 4. Subscribe to RFQ events.
1 parent ff71f2b commit a73aac8

File tree

10 files changed

+3119
-2
lines changed

10 files changed

+3119
-2
lines changed

perms/perms.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,22 @@ var (
212212
Entity: "universe",
213213
Action: "read",
214214
}},
215+
"/rfqrpc.Rfq/AddAssetBuyOrder": {{
216+
Entity: "rfq",
217+
Action: "write",
218+
}},
219+
"/rfqrpc.Rfq/AddAssetSellOffer": {{
220+
Entity: "rfq",
221+
Action: "write",
222+
}},
223+
"/rfqrpc.Rfq/QueryRfqAcceptedQuotes": {{
224+
Entity: "rfq",
225+
Action: "read",
226+
}},
227+
"/rfqrpc.Rfq/SubscribeRfqEventNtfns": {{
228+
Entity: "rfq",
229+
Action: "write",
230+
}},
215231
"/tapdevrpc.TapDev/ImportProof": {{
216232
Entity: "proofs",
217233
Action: "write",

rpcserver.go

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@ import (
2929
"github.com/lightninglabs/taproot-assets/fn"
3030
"github.com/lightninglabs/taproot-assets/mssmt"
3131
"github.com/lightninglabs/taproot-assets/proof"
32+
"github.com/lightninglabs/taproot-assets/rfq"
33+
"github.com/lightninglabs/taproot-assets/rfqmsg"
3234
"github.com/lightninglabs/taproot-assets/rpcperms"
3335
"github.com/lightninglabs/taproot-assets/tapfreighter"
3436
"github.com/lightninglabs/taproot-assets/tapgarden"
3537
"github.com/lightninglabs/taproot-assets/tappsbt"
3638
"github.com/lightninglabs/taproot-assets/taprpc"
3739
wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
3840
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
41+
"github.com/lightninglabs/taproot-assets/taprpc/rfqrpc"
3942
"github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc"
4043
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
4144
"github.com/lightninglabs/taproot-assets/tapscript"
@@ -45,6 +48,8 @@ import (
4548
"github.com/lightningnetwork/lnd/keychain"
4649
"github.com/lightningnetwork/lnd/lnrpc"
4750
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
51+
"github.com/lightningnetwork/lnd/lnwire"
52+
"github.com/lightningnetwork/lnd/routing/route"
4853
"github.com/lightningnetwork/lnd/signal"
4954
"golang.org/x/time/rate"
5055
"google.golang.org/grpc"
@@ -106,6 +111,7 @@ type rpcServer struct {
106111
taprpc.UnimplementedTaprootAssetsServer
107112
wrpc.UnimplementedAssetWalletServer
108113
mintrpc.UnimplementedMintServer
114+
rfqrpc.UnimplementedRfqServer
109115
unirpc.UnimplementedUniverseServer
110116
tapdevrpc.UnimplementedTapDevServer
111117

@@ -178,6 +184,7 @@ func (r *rpcServer) RegisterWithGrpcServer(grpcServer *grpc.Server) error {
178184
taprpc.RegisterTaprootAssetsServer(grpcServer, r)
179185
wrpc.RegisterAssetWalletServer(grpcServer, r)
180186
mintrpc.RegisterMintServer(grpcServer, r)
187+
rfqrpc.RegisterRfqServer(grpcServer, r)
181188
unirpc.RegisterUniverseServer(grpcServer, r)
182189
tapdevrpc.RegisterGrpcServer(grpcServer, r)
183190
return nil
@@ -4807,3 +4814,314 @@ func MarshalAssetFedSyncCfg(
48074814
AllowSyncExport: config.AllowSyncExport,
48084815
}, nil
48094816
}
4817+
4818+
// unmarshalAssetSpecifier unmarshals an asset specifier from the RPC form.
4819+
func unmarshalAssetSpecifier(req *rfqrpc.AssetSpecifier) (*asset.ID,
4820+
*btcec.PublicKey, error) {
4821+
4822+
// Attempt to decode the asset specifier from the RPC request. In cases
4823+
// where both the asset ID and asset group key are provided, we will
4824+
// give precedence to the asset ID due to its higher level of
4825+
// specificity.
4826+
var (
4827+
assetID *asset.ID
4828+
4829+
groupKeyBytes []byte
4830+
groupKey *btcec.PublicKey
4831+
4832+
err error
4833+
)
4834+
4835+
switch {
4836+
// Parse the asset ID if it's set.
4837+
case len(req.GetAssetId()) > 0:
4838+
var assetIdBytes [32]byte
4839+
copy(assetIdBytes[:], req.GetAssetId())
4840+
id := asset.ID(assetIdBytes)
4841+
assetID = &id
4842+
4843+
case len(req.GetAssetIdStr()) > 0:
4844+
assetIDBytes, err := hex.DecodeString(req.GetAssetIdStr())
4845+
if err != nil {
4846+
return nil, nil, fmt.Errorf("error decoding asset "+
4847+
"ID: %w", err)
4848+
}
4849+
4850+
var id asset.ID
4851+
copy(id[:], assetIDBytes)
4852+
assetID = &id
4853+
4854+
// Parse the group key if it's set.
4855+
case len(req.GetGroupKey()) > 0:
4856+
groupKeyBytes = req.GetGroupKey()
4857+
groupKey, err = btcec.ParsePubKey(groupKeyBytes)
4858+
if err != nil {
4859+
return nil, nil, fmt.Errorf("error parsing group "+
4860+
"key: %w", err)
4861+
}
4862+
4863+
case len(req.GetGroupKeyStr()) > 0:
4864+
groupKeyBytes, err := hex.DecodeString(
4865+
req.GetGroupKeyStr(),
4866+
)
4867+
if err != nil {
4868+
return nil, nil, fmt.Errorf("error decoding group "+
4869+
"key: %w", err)
4870+
}
4871+
4872+
groupKey, err = btcec.ParsePubKey(groupKeyBytes)
4873+
if err != nil {
4874+
return nil, nil, fmt.Errorf("error parsing group "+
4875+
"key: %w", err)
4876+
}
4877+
4878+
default:
4879+
// At this point, we know that neither the asset ID nor the
4880+
// group key are specified. Return an error.
4881+
return nil, nil, fmt.Errorf("either asset ID or asset group " +
4882+
"key must be specified")
4883+
}
4884+
4885+
return assetID, groupKey, nil
4886+
}
4887+
4888+
// unmarshalAssetBuyOrder unmarshals an asset buy order from the RPC form.
4889+
func unmarshalAssetBuyOrder(
4890+
req *rfqrpc.AddAssetBuyOrderRequest) (*rfq.BuyOrder, error) {
4891+
4892+
assetId, assetGroupKey, err := unmarshalAssetSpecifier(
4893+
req.AssetSpecifier,
4894+
)
4895+
if err != nil {
4896+
return nil, fmt.Errorf("error unmarshalling asset specifier: "+
4897+
"%w", err)
4898+
}
4899+
4900+
// Unmarshal the peer if specified.
4901+
var peer *route.Vertex
4902+
if len(req.PeerPubKey) > 0 {
4903+
pv, err := route.NewVertexFromBytes(req.PeerPubKey)
4904+
if err != nil {
4905+
return nil, fmt.Errorf("error unmarshalling peer "+
4906+
"route vertex: %w", err)
4907+
}
4908+
4909+
peer = &pv
4910+
}
4911+
4912+
return &rfq.BuyOrder{
4913+
AssetID: assetId,
4914+
AssetGroupKey: assetGroupKey,
4915+
MinAssetAmount: req.MinAssetAmount,
4916+
MaxBid: lnwire.MilliSatoshi(req.MaxBid),
4917+
Expiry: req.Expiry,
4918+
Peer: peer,
4919+
}, nil
4920+
}
4921+
4922+
// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
4923+
// manager. If the order already exists for the given asset, it will be updated.
4924+
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
4925+
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
4926+
error) {
4927+
4928+
// Unmarshal the buy order from the RPC form.
4929+
buyOrder, err := unmarshalAssetBuyOrder(req)
4930+
if err != nil {
4931+
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
4932+
}
4933+
4934+
var peer string
4935+
if buyOrder.Peer != nil {
4936+
peer = buyOrder.Peer.String()
4937+
}
4938+
rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
4939+
"(dest_peer=%s)", peer)
4940+
4941+
// Upsert the buy order into the RFQ manager.
4942+
err = r.cfg.RfqManager.UpsertAssetBuyOrder(*buyOrder)
4943+
if err != nil {
4944+
return nil, fmt.Errorf("error upserting buy order into RFQ "+
4945+
"manager: %w", err)
4946+
}
4947+
4948+
return &rfqrpc.AddAssetBuyOrderResponse{}, nil
4949+
}
4950+
4951+
// AddAssetSellOffer upserts a new sell offer for the given asset into the
4952+
// RFQ manager. If the offer already exists for the given asset, it will be
4953+
// updated.
4954+
func (r *rpcServer) AddAssetSellOffer(_ context.Context,
4955+
req *rfqrpc.AddAssetSellOfferRequest) (*rfqrpc.AddAssetSellOfferResponse,
4956+
error) {
4957+
4958+
// Unmarshal the sell offer from the RPC form.
4959+
assetID, assetGroupKey, err := unmarshalAssetSpecifier(
4960+
req.AssetSpecifier,
4961+
)
4962+
if err != nil {
4963+
return nil, fmt.Errorf("error unmarshalling asset specifier: "+
4964+
"%w", err)
4965+
}
4966+
4967+
sellOffer := &rfq.SellOffer{
4968+
AssetID: assetID,
4969+
AssetGroupKey: assetGroupKey,
4970+
MaxUnits: req.MaxUnits,
4971+
}
4972+
4973+
rpcsLog.Debugf("[AddAssetSellOffer]: upserting sell offer "+
4974+
"(sell_offer=%v)", sellOffer)
4975+
4976+
// Upsert the sell offer into the RFQ manager.
4977+
err = r.cfg.RfqManager.UpsertAssetSellOffer(*sellOffer)
4978+
if err != nil {
4979+
return nil, fmt.Errorf("error upserting sell offer into RFQ "+
4980+
"manager: %w", err)
4981+
}
4982+
4983+
return &rfqrpc.AddAssetSellOfferResponse{}, nil
4984+
}
4985+
4986+
// marshalAcceptedQuotes marshals a map of accepted quotes into the RPC form.
4987+
func marshalAcceptedQuotes(
4988+
acceptedQuotes map[rfq.SerialisedScid]rfqmsg.Accept) []*rfqrpc.AcceptedQuote {
4989+
4990+
// Marshal the accepted quotes into the RPC form.
4991+
rpcQuotes := make([]*rfqrpc.AcceptedQuote, 0, len(acceptedQuotes))
4992+
for scid, quote := range acceptedQuotes {
4993+
rpcQuote := &rfqrpc.AcceptedQuote{
4994+
Peer: quote.Peer.String(),
4995+
Id: quote.ID[:],
4996+
Scid: uint64(scid),
4997+
AssetAmount: quote.AssetAmount,
4998+
AskPrice: uint64(quote.AskPrice),
4999+
Expiry: quote.Expiry,
5000+
}
5001+
rpcQuotes = append(rpcQuotes, rpcQuote)
5002+
}
5003+
5004+
return rpcQuotes
5005+
}
5006+
5007+
// QueryRfqAcceptedQuotes queries for accepted quotes from the RFQ system.
5008+
func (r *rpcServer) QueryRfqAcceptedQuotes(_ context.Context,
5009+
_ *rfqrpc.QueryRfqAcceptedQuotesRequest) (
5010+
*rfqrpc.QueryRfqAcceptedQuotesResponse, error) {
5011+
5012+
// Query the RFQ manager for accepted quotes.
5013+
acceptedQuotes := r.cfg.RfqManager.QueryAcceptedQuotes()
5014+
5015+
rpcQuotes := marshalAcceptedQuotes(acceptedQuotes)
5016+
5017+
return &rfqrpc.QueryRfqAcceptedQuotesResponse{
5018+
AcceptedQuotes: rpcQuotes,
5019+
}, nil
5020+
}
5021+
5022+
// marshallRfqEvent marshals an RFQ event into the RPC form.
5023+
func marshallRfqEvent(eventInterface fn.Event) (*rfqrpc.RfqEvent, error) {
5024+
timestamp := eventInterface.Timestamp().UTC().Unix()
5025+
5026+
switch event := eventInterface.(type) {
5027+
case *rfq.IncomingAcceptQuoteEvent:
5028+
acceptedQuote := &rfqrpc.AcceptedQuote{
5029+
Peer: event.Peer.String(),
5030+
Id: event.ID[:],
5031+
Scid: uint64(event.ShortChannelId()),
5032+
AssetAmount: event.AssetAmount,
5033+
AskPrice: uint64(event.AskPrice),
5034+
Expiry: event.Expiry,
5035+
}
5036+
5037+
eventRpc := &rfqrpc.RfqEvent_IncomingAcceptQuote{
5038+
IncomingAcceptQuote: &rfqrpc.IncomingAcceptQuoteEvent{
5039+
Timestamp: uint64(timestamp),
5040+
AcceptedQuote: acceptedQuote,
5041+
},
5042+
}
5043+
return &rfqrpc.RfqEvent{
5044+
Event: eventRpc,
5045+
}, nil
5046+
5047+
case *rfq.AcceptHtlcEvent:
5048+
eventRpc := &rfqrpc.RfqEvent_AcceptHtlc{
5049+
AcceptHtlc: &rfqrpc.AcceptHtlcEvent{
5050+
Timestamp: uint64(timestamp),
5051+
Scid: uint64(event.ChannelRemit.Scid),
5052+
},
5053+
}
5054+
return &rfqrpc.RfqEvent{
5055+
Event: eventRpc,
5056+
}, nil
5057+
5058+
default:
5059+
return nil, fmt.Errorf("unknown RFQ event type: %T",
5060+
eventInterface)
5061+
}
5062+
}
5063+
5064+
// SubscribeRfqEventNtfns subscribes to RFQ event notifications.
5065+
func (r *rpcServer) SubscribeRfqEventNtfns(
5066+
_ *rfqrpc.SubscribeRfqEventNtfnsRequest,
5067+
ntfnStream rfqrpc.Rfq_SubscribeRfqEventNtfnsServer) error {
5068+
5069+
// Create a new event subscriber and pass a copy to the RFQ manager.
5070+
// We will then read events from the subscriber.
5071+
eventSubscriber := fn.NewEventReceiver[fn.Event](fn.DefaultQueueSize)
5072+
defer eventSubscriber.Stop()
5073+
5074+
// Register the subscriber with the ChainPorter.
5075+
err := r.cfg.RfqManager.RegisterSubscriber(
5076+
eventSubscriber, false, 0,
5077+
)
5078+
if err != nil {
5079+
return fmt.Errorf("failed to register RFQ manager event "+
5080+
"notifications subscription: %w", err)
5081+
}
5082+
5083+
for {
5084+
select {
5085+
// Handle new events from the subscriber.
5086+
case event := <-eventSubscriber.NewItemCreated.ChanOut():
5087+
// Marshal the event into its RPC form.
5088+
rpcEvent, err := marshallRfqEvent(event)
5089+
if err != nil {
5090+
return fmt.Errorf("failed to marshall RFQ "+
5091+
"event into RPC form: %w", err)
5092+
}
5093+
5094+
err = ntfnStream.Send(rpcEvent)
5095+
if err != nil {
5096+
return err
5097+
}
5098+
5099+
// Handle the case where the RPC stream is closed by the client.
5100+
case <-ntfnStream.Context().Done():
5101+
// Remove the subscriber from the RFQ manager.
5102+
err := r.cfg.RfqManager.RemoveSubscriber(
5103+
eventSubscriber,
5104+
)
5105+
if err != nil {
5106+
return fmt.Errorf("failed to remove RFQ "+
5107+
"manager event notifications "+
5108+
"subscription: %w", err)
5109+
}
5110+
5111+
// Don't return an error if a normal context
5112+
// cancellation has occurred.
5113+
isCanceledContext := errors.Is(
5114+
ntfnStream.Context().Err(), context.Canceled,
5115+
)
5116+
if isCanceledContext {
5117+
return nil
5118+
}
5119+
5120+
return ntfnStream.Context().Err()
5121+
5122+
// Handle the case where the RPC server is shutting down.
5123+
case <-r.quit:
5124+
return nil
5125+
}
5126+
}
5127+
}

taprpc/gen_protos.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ set -e
66
function generate() {
77
echo "Generating root gRPC server protos"
88

9-
PROTOS="taprootassets.proto assetwalletrpc/assetwallet.proto mintrpc/mint.proto universerpc/universe.proto tapdevrpc/tapdev.proto"
9+
PROTOS="taprootassets.proto assetwalletrpc/assetwallet.proto mintrpc/mint.proto rfqrpc/rfq.proto universerpc/universe.proto tapdevrpc/tapdev.proto"
1010

1111
# For each of the sub-servers, we then generate their protos, but a restricted
1212
# set as they don't yet require REST proxies, or swagger docs.
@@ -48,7 +48,7 @@ function generate() {
4848
--custom_opt="$opts" \
4949
taprootassets.proto
5050

51-
PACKAGES="assetwalletrpc universerpc mintrpc"
51+
PACKAGES="assetwalletrpc universerpc mintrpc rfqrpc"
5252
for package in $PACKAGES; do
5353

5454
opts="package_name=$package,manual_import=$manual_import,js_stubs=1"

0 commit comments

Comments
 (0)