Skip to content

Commit 5419272

Browse files
committed
rpcserver: add channel checks to RFQ orders
1 parent 4bf4dc7 commit 5419272

File tree

1 file changed

+86
-14
lines changed

1 file changed

+86
-14
lines changed

rpcserver.go

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6340,7 +6340,7 @@ func unmarshalAssetBuyOrder(
63406340

63416341
// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
63426342
// manager. If the order already exists for the given asset, it will be updated.
6343-
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
6343+
func (r *rpcServer) AddAssetBuyOrder(ctx context.Context,
63446344
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
63456345
error) {
63466346

@@ -6354,13 +6354,24 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
63546354
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
63556355
}
63566356

6357-
peerStr := fn.MapOptionZ(
6358-
buyOrder.Peer, func(peerVertex route.Vertex) string {
6359-
return peerVertex.String()
6360-
},
6357+
// Currently, we require the peer to be specified in the buy order.
6358+
peer, err := buyOrder.Peer.UnwrapOrErr(
6359+
fmt.Errorf("buy order peer must be specified"),
6360+
)
6361+
if err != nil {
6362+
return nil, err
6363+
}
6364+
6365+
// Check if we have a channel with the peer.
6366+
err = r.checkPeerChannel(
6367+
ctx, peer, buyOrder.AssetSpecifier, req.SkipAssetChannelCheck,
63616368
)
6369+
if err != nil {
6370+
return nil, fmt.Errorf("error checking peer channel: %w", err)
6371+
}
6372+
63626373
rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
6363-
"(dest_peer=%s)", peerStr)
6374+
"(dest_peer=%s)", peer.String())
63646375

63656376
// Register an event listener before actually inserting the order, so we
63666377
// definitely don't miss any responses.
@@ -6402,11 +6413,61 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
64026413

64036414
case <-timeout:
64046415
return nil, fmt.Errorf("timeout waiting for response "+
6405-
"(peer=%s)", peerStr)
6416+
"(peer=%s)", peer.String())
64066417
}
64076418
}
64086419
}
64096420

6421+
// checkPeerChannel checks if there is a channel with the given peer. If the
6422+
// asset channel check is enabled, it will also check if there is a channel with
6423+
// the given asset with the peer.
6424+
func (r *rpcServer) checkPeerChannel(ctx context.Context, peer route.Vertex,
6425+
specifier asset.Specifier, skipAssetChannelCheck bool) error {
6426+
6427+
// We want to make sure there is at least a channel between us and the
6428+
// peer, otherwise RFQ negotiation doesn't make sense.
6429+
switch {
6430+
// For integration tests, we can't create asset channels, so we allow
6431+
// the asset channel check to be skipped. In this case we simply check
6432+
// that we have any channel with the peer.
6433+
case skipAssetChannelCheck:
6434+
activeChannels, err := r.cfg.Lnd.Client.ListChannels(
6435+
ctx, true, false,
6436+
)
6437+
if err != nil {
6438+
return fmt.Errorf("unable to fetch channels: %w", err)
6439+
}
6440+
peerChannels := fn.Filter(
6441+
activeChannels, func(c lndclient.ChannelInfo) bool {
6442+
return c.PubKeyBytes == peer
6443+
},
6444+
)
6445+
if len(peerChannels) == 0 {
6446+
return fmt.Errorf("no active channel found with peer "+
6447+
"%x", peer[:])
6448+
}
6449+
6450+
// For any other case, we'll want to make sure there is a channel with
6451+
// a non-zero balance of the given asset to carry the order.
6452+
default:
6453+
assetID, err := specifier.UnwrapIdOrErr()
6454+
if err != nil {
6455+
return fmt.Errorf("cannot check asset channel, " +
6456+
"missing asset ID")
6457+
}
6458+
6459+
// If we don't get an error here, it means we do have an asset
6460+
// channel with the peer.
6461+
_, err = r.rfqChannel(ctx, assetID, &peer)
6462+
if err != nil {
6463+
return fmt.Errorf("error checking asset channel: %w",
6464+
err)
6465+
}
6466+
}
6467+
6468+
return nil
6469+
}
6470+
64106471
// unmarshalAssetSellOrder unmarshals an asset sell order from the RPC form.
64116472
func unmarshalAssetSellOrder(
64126473
req *rfqrpc.AddAssetSellOrderRequest) (*rfq.SellOrder, error) {
@@ -6457,7 +6518,7 @@ func unmarshalAssetSellOrder(
64576518

64586519
// AddAssetSellOrder upserts a new sell order for the given asset into the RFQ
64596520
// manager. If the order already exists for the given asset, it will be updated.
6460-
func (r *rpcServer) AddAssetSellOrder(_ context.Context,
6521+
func (r *rpcServer) AddAssetSellOrder(ctx context.Context,
64616522
req *rfqrpc.AddAssetSellOrderRequest) (*rfqrpc.AddAssetSellOrderResponse,
64626523
error) {
64636524

@@ -6472,13 +6533,24 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,
64726533
err)
64736534
}
64746535

6475-
// Extract peer identifier as a string for logging.
6476-
peerStr := fn.MapOptionZ(sellOrder.Peer, func(p route.Vertex) string {
6477-
return p.String()
6478-
})
6536+
// Currently, we require the peer to be specified in the buy order.
6537+
peer, err := sellOrder.Peer.UnwrapOrErr(
6538+
fmt.Errorf("sell order peer must be specified"),
6539+
)
6540+
if err != nil {
6541+
return nil, err
6542+
}
6543+
6544+
// Check if we have a channel with the peer.
6545+
err = r.checkPeerChannel(
6546+
ctx, peer, sellOrder.AssetSpecifier, req.SkipAssetChannelCheck,
6547+
)
6548+
if err != nil {
6549+
return nil, fmt.Errorf("error checking peer channel: %w", err)
6550+
}
64796551

64806552
rpcsLog.Debugf("[AddAssetSellOrder]: upserting sell order "+
6481-
"(dest_peer=%s)", peerStr)
6553+
"(dest_peer=%s)", peer.String())
64826554

64836555
// Register an event listener before actually inserting the order, so we
64846556
// definitely don't miss any responses.
@@ -6520,7 +6592,7 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,
65206592

65216593
case <-timeout:
65226594
return nil, fmt.Errorf("timeout waiting for response "+
6523-
"from peer %s", peerStr)
6595+
"from peer %s", peer.String())
65246596
}
65256597
}
65266598
}

0 commit comments

Comments
 (0)