Skip to content

Commit af24d09

Browse files
committed
rfq+rpcserver: rfq subscription event filtering by ID
The issue with the current filtering of the subscription events is that we rely on matching the RFQ peer and the asset specifier. This can be conflicting with other quotes that may share those fields. Instead we want to filter by the unique RFQ ID of the request. To do that we return the ID earlier when calling into the RFQ manager, then filter the rejection event based on that field, which is guaranteed to not cause any conflicts.
1 parent ac3a63b commit af24d09

File tree

3 files changed

+184
-187
lines changed

3 files changed

+184
-187
lines changed

rfq/manager.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -737,22 +737,24 @@ type BuyOrder struct {
737737
}
738738

739739
// UpsertAssetBuyOrder upserts an asset buy order for management.
740-
func (m *Manager) UpsertAssetBuyOrder(order BuyOrder) error {
740+
func (m *Manager) UpsertAssetBuyOrder(order BuyOrder) (rfqmsg.ID, error) {
741741
// For now, a peer must be specified.
742742
//
743743
// TODO(ffranr): Add support for peerless buy orders. The negotiator
744744
// should be able to determine the optimal peer.
745745
if order.Peer.IsNone() {
746-
return fmt.Errorf("buy order peer must be specified")
746+
return rfqmsg.ID{}, fmt.Errorf("buy order peer must be " +
747+
"specified")
747748
}
748749

749750
// Request a quote from a peer via the negotiator.
750-
err := m.negotiator.HandleOutgoingBuyOrder(order)
751+
id, err := m.negotiator.HandleOutgoingBuyOrder(order)
751752
if err != nil {
752-
return fmt.Errorf("error registering asset buy order: %w", err)
753+
return rfqmsg.ID{}, fmt.Errorf("error registering asset buy "+
754+
"order: %w", err)
753755
}
754756

755-
return nil
757+
return id, nil
756758
}
757759

758760
// SellOrder instructs the RFQ (Request For Quote) system to request a quote
@@ -789,20 +791,19 @@ type SellOrder struct {
789791
}
790792

791793
// UpsertAssetSellOrder upserts an asset sell order for management.
792-
func (m *Manager) UpsertAssetSellOrder(order SellOrder) error {
794+
func (m *Manager) UpsertAssetSellOrder(order SellOrder) (rfqmsg.ID, error) {
793795
// For now, a peer must be specified.
794796
//
795797
// TODO(ffranr): Add support for peerless sell orders. The negotiator
796798
// should be able to determine the optimal peer.
797799
if order.Peer.IsNone() {
798-
return fmt.Errorf("sell order peer must be specified")
800+
return rfqmsg.ID{}, fmt.Errorf("sell order peer must be " +
801+
"specified")
799802
}
800803

801804
// Pass the asset sell order to the negotiator which will generate sell
802805
// request messages to send to peers.
803-
m.negotiator.HandleOutgoingSellOrder(order)
804-
805-
return nil
806+
return m.negotiator.HandleOutgoingSellOrder(order)
806807
}
807808

808809
// PeerAcceptedBuyQuotes returns buy quotes that were requested by our node and

rfq/negotiator.go

Lines changed: 121 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -155,82 +155,79 @@ func (n *Negotiator) queryBidFromPriceOracle(assetSpecifier asset.Specifier,
155155
// HandleOutgoingBuyOrder handles an outgoing buy order by constructing buy
156156
// requests and passing them to the outgoing messages channel. These requests
157157
// are sent to peers.
158-
func (n *Negotiator) HandleOutgoingBuyOrder(buyOrder BuyOrder) error {
159-
// Query the price oracle for a reasonable bid price. We perform this
160-
// query and response handling in a separate goroutine in case it is a
161-
// remote service and takes a long time to respond.
162-
n.Wg.Add(1)
163-
go func() {
164-
defer n.Wg.Done()
158+
func (n *Negotiator) HandleOutgoingBuyOrder(
159+
buyOrder BuyOrder) (rfqmsg.ID, error) {
160+
161+
// Whenever this method returns an error we want to notify both the RFQ
162+
// manager main loop and also the caller. This wrapper delivers the
163+
// error to the manager (possibly triggering a daemon shutdown if
164+
// critical) then returns the error.
165+
finalise := func(err error) (rfqmsg.ID, error) {
166+
n.cfg.ErrChan <- err
167+
return rfqmsg.ID{}, err
168+
}
169+
170+
// Unwrap the peer from the buy order. For now, we can assume
171+
// that the peer is always specified.
172+
peer, err := buyOrder.Peer.UnwrapOrErr(
173+
fmt.Errorf("buy order peer must be specified"),
174+
)
175+
if err != nil {
176+
return finalise(err)
177+
}
178+
179+
// We calculate a proposed bid price for our peer's
180+
// consideration. If a price oracle is not specified we will
181+
// skip this step.
182+
var assetRateHint fn.Option[rfqmsg.AssetRate]
165183

166-
// Unwrap the peer from the buy order. For now, we can assume
167-
// that the peer is always specified.
168-
peer, err := buyOrder.Peer.UnwrapOrErr(
169-
fmt.Errorf("buy order peer must be specified"),
184+
if n.cfg.PriceOracle != nil && buyOrder.AssetSpecifier.IsSome() {
185+
// Query the price oracle for a bid price.
186+
//
187+
// TODO(ffranr): Pass the BuyOrder expiry to the price
188+
// oracle at this point.
189+
assetRate, err := n.queryBidFromPriceOracle(
190+
buyOrder.AssetSpecifier,
191+
fn.Some(buyOrder.AssetMaxAmt),
192+
fn.None[lnwire.MilliSatoshi](),
193+
fn.None[rfqmsg.AssetRate](),
170194
)
171195
if err != nil {
172-
n.cfg.ErrChan <- err
196+
// If we fail to query the price oracle for a
197+
// bid price, we will log a warning and continue
198+
// without a bid price.
199+
log.Warnf("failed to query bid price from price "+
200+
"oracle for outgoing buy request: %v", err)
173201
}
174202

175-
// We calculate a proposed bid price for our peer's
176-
// consideration. If a price oracle is not specified we will
177-
// skip this step.
178-
var assetRateHint fn.Option[rfqmsg.AssetRate]
179-
180-
if n.cfg.PriceOracle != nil &&
181-
buyOrder.AssetSpecifier.IsSome() {
203+
assetRateHint = fn.MaybeSome[rfqmsg.AssetRate](assetRate)
204+
}
182205

183-
// Query the price oracle for a bid price.
184-
//
185-
// TODO(ffranr): Pass the BuyOrder expiry to the price
186-
// oracle at this point.
187-
assetRate, err := n.queryBidFromPriceOracle(
188-
buyOrder.AssetSpecifier,
189-
fn.Some(buyOrder.AssetMaxAmt),
190-
fn.None[lnwire.MilliSatoshi](),
191-
fn.None[rfqmsg.AssetRate](),
192-
)
193-
if err != nil {
194-
// If we fail to query the price oracle for a
195-
// bid price, we will log a warning and continue
196-
// without a bid price.
197-
log.Warnf("failed to query bid price from "+
198-
"price oracle for outgoing buy "+
199-
"request: %v", err)
200-
}
201-
202-
assetRateHint = fn.MaybeSome[rfqmsg.AssetRate](
203-
assetRate,
204-
)
205-
}
206+
// Construct a new buy request to send to the peer.
207+
request, err := rfqmsg.NewBuyRequest(
208+
peer, buyOrder.AssetSpecifier,
209+
buyOrder.AssetMaxAmt, assetRateHint,
210+
)
211+
if err != nil {
212+
err := fmt.Errorf("unable to create buy request "+
213+
"message: %w", err)
214+
return finalise(err)
215+
}
206216

207-
// Construct a new buy request to send to the peer.
208-
request, err := rfqmsg.NewBuyRequest(
209-
peer, buyOrder.AssetSpecifier,
210-
buyOrder.AssetMaxAmt, assetRateHint,
211-
)
212-
if err != nil {
213-
err := fmt.Errorf("unable to create buy request "+
214-
"message: %w", err)
215-
n.cfg.ErrChan <- err
216-
return
217-
}
217+
// Send the response message to the outgoing messages channel.
218+
var msg rfqmsg.OutgoingMsg = request
219+
sendSuccess := fn.SendOrQuit(
220+
n.cfg.OutgoingMessages, msg, n.Quit,
221+
)
222+
if !sendSuccess {
223+
err := fmt.Errorf("negotiator failed to add quote " +
224+
"request message to the outgoing messages " +
225+
"channel")
218226

219-
// Send the response message to the outgoing messages channel.
220-
var msg rfqmsg.OutgoingMsg = request
221-
sendSuccess := fn.SendOrQuit(
222-
n.cfg.OutgoingMessages, msg, n.Quit,
223-
)
224-
if !sendSuccess {
225-
err := fmt.Errorf("negotiator failed to add quote " +
226-
"request message to the outgoing messages " +
227-
"channel")
228-
n.cfg.ErrChan <- err
229-
return
230-
}
231-
}()
227+
return finalise(err)
228+
}
232229

233-
return nil
230+
return request.ID, nil
234231
}
235232

236233
// queryAskFromPriceOracle queries the price oracle for an asking price. It
@@ -466,72 +463,71 @@ func (n *Negotiator) HandleIncomingSellRequest(
466463
// HandleOutgoingSellOrder handles an outgoing sell order by constructing sell
467464
// requests and passing them to the outgoing messages channel. These requests
468465
// are sent to peers.
469-
func (n *Negotiator) HandleOutgoingSellOrder(order SellOrder) {
470-
// Query the price oracle for a reasonable ask price. We perform this
471-
// query and response handling in a separate goroutine in case it is a
472-
// remote service and takes a long time to respond.
473-
n.Wg.Add(1)
474-
go func() {
475-
defer n.Wg.Done()
466+
func (n *Negotiator) HandleOutgoingSellOrder(
467+
order SellOrder) (rfqmsg.ID, error) {
468+
469+
// Whenever this method returns an error we want to notify both the RFQ
470+
// manager main loop and also the caller. This wrapper delivers the
471+
// error to the manager (possibly triggering a daemon shutdown if
472+
// critical) then returns the error.
473+
finalise := func(err error) (rfqmsg.ID, error) {
474+
n.cfg.ErrChan <- err
475+
return rfqmsg.ID{}, err
476+
}
477+
478+
// Unwrap the peer from the order. For now, we can assume that
479+
// the peer is always specified.
480+
peer, err := order.Peer.UnwrapOrErr(
481+
fmt.Errorf("buy order peer must be specified"),
482+
)
483+
if err != nil {
484+
return finalise(err)
485+
}
476486

477-
// Unwrap the peer from the order. For now, we can assume that
478-
// the peer is always specified.
479-
peer, err := order.Peer.UnwrapOrErr(
480-
fmt.Errorf("buy order peer must be specified"),
487+
// We calculate a proposed ask price for our peer's
488+
// consideration. If a price oracle is not specified we will
489+
// skip this step.
490+
var assetRateHint fn.Option[rfqmsg.AssetRate]
491+
492+
if n.cfg.PriceOracle != nil && order.AssetSpecifier.IsSome() {
493+
// Query the price oracle for an asking price.
494+
//
495+
// TODO(ffranr): Pass the SellOrder expiry to the
496+
// price oracle at this point.
497+
assetRate, err := n.queryAskFromPriceOracle(
498+
order.AssetSpecifier, fn.None[uint64](),
499+
fn.Some(order.PaymentMaxAmt),
500+
fn.None[rfqmsg.AssetRate](),
481501
)
482502
if err != nil {
483-
n.cfg.ErrChan <- err
503+
err := fmt.Errorf("negotiator failed to handle price "+
504+
"oracle response: %w", err)
505+
return finalise(err)
484506
}
485507

486-
// We calculate a proposed ask price for our peer's
487-
// consideration. If a price oracle is not specified we will
488-
// skip this step.
489-
var assetRateHint fn.Option[rfqmsg.AssetRate]
508+
assetRateHint = fn.MaybeSome(assetRate)
509+
}
490510

491-
if n.cfg.PriceOracle != nil && order.AssetSpecifier.IsSome() {
492-
// Query the price oracle for an asking price.
493-
//
494-
// TODO(ffranr): Pass the SellOrder expiry to the
495-
// price oracle at this point.
496-
assetRate, err := n.queryAskFromPriceOracle(
497-
order.AssetSpecifier, fn.None[uint64](),
498-
fn.Some(order.PaymentMaxAmt),
499-
fn.None[rfqmsg.AssetRate](),
500-
)
501-
if err != nil {
502-
err := fmt.Errorf("negotiator failed to "+
503-
"handle price oracle response: %w", err)
504-
n.cfg.ErrChan <- err
505-
return
506-
}
507-
508-
assetRateHint = fn.MaybeSome(assetRate)
509-
}
511+
request, err := rfqmsg.NewSellRequest(
512+
peer, order.AssetSpecifier, order.PaymentMaxAmt, assetRateHint,
513+
)
510514

511-
request, err := rfqmsg.NewSellRequest(
512-
peer, order.AssetSpecifier, order.PaymentMaxAmt,
513-
assetRateHint,
514-
)
515-
if err != nil {
516-
err := fmt.Errorf("unable to create sell request "+
517-
"message: %w", err)
518-
n.cfg.ErrChan <- err
519-
return
520-
}
515+
if err != nil {
516+
err := fmt.Errorf("unable to create sell request message: %w",
517+
err)
518+
return finalise(err)
519+
}
521520

522-
// Send the response message to the outgoing messages channel.
523-
var msg rfqmsg.OutgoingMsg = request
524-
sendSuccess := fn.SendOrQuit(
525-
n.cfg.OutgoingMessages, msg, n.Quit,
526-
)
527-
if !sendSuccess {
528-
err := fmt.Errorf("negotiator failed to add sell " +
529-
"request message to the outgoing messages " +
530-
"channel")
531-
n.cfg.ErrChan <- err
532-
return
533-
}
534-
}()
521+
// Send the response message to the outgoing messages channel.
522+
var msg rfqmsg.OutgoingMsg = request
523+
sendSuccess := fn.SendOrQuit(n.cfg.OutgoingMessages, msg, n.Quit)
524+
if !sendSuccess {
525+
err := fmt.Errorf("negotiator failed to add sell request " +
526+
"message to the outgoing messages channel")
527+
return finalise(err)
528+
}
529+
530+
return request.ID, err
535531
}
536532

537533
// expiryWithinBounds checks if a quote expiry unix timestamp (in seconds) is

0 commit comments

Comments
 (0)