Skip to content

Commit 2eb9eea

Browse files
committed
rfq+rfqmsg: lookup RFQ session while parsing incoming messages
Future RFQ quote accept messages will not contain enough information for the RFQ service to classify them as buy or sell. Therefore, before these messages can be fully interpreted, the corresponding quote request message must be retrieved and inspected. This commit modifies the parsing of incoming quote accept messages so they can be accurately classified as buy or sell by looking up the associated quote request message. As a beneficial side effect, parsed quote accept message fields are now fully populated within the parsing routine.
1 parent bd77578 commit 2eb9eea

File tree

5 files changed

+52
-77
lines changed

5 files changed

+52
-77
lines changed

rfq/stream.go

Lines changed: 13 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,12 @@ func NewStreamHandler(ctx context.Context,
9292
func (h *StreamHandler) handleIncomingWireMessage(
9393
wireMsg rfqmsg.WireMessage) error {
9494

95-
// Parse the wire message as an RFQ message.
96-
msg, err := rfqmsg.NewIncomingMsgFromWire(wireMsg)
95+
// Parse the wire message as an RFQ message. The session cache load
96+
// function is provided to associate incoming wire messages with their
97+
// corresponding outgoing requests during parsing.
98+
msg, err := rfqmsg.NewIncomingMsgFromWire(
99+
wireMsg, h.outgoingRequests.Load,
100+
)
97101
if err != nil {
98102
if errors.Is(err, rfqmsg.ErrUnknownMessageType) {
99103
// Silently disregard the message if we don't recognise
@@ -109,66 +113,13 @@ func (h *StreamHandler) handleIncomingWireMessage(
109113

110114
log.Debugf("Stream handling incoming message: %s", msg)
111115

112-
// If the incoming message is an accept message, lookup the
113-
// corresponding outgoing request message. Assign the outgoing request
114-
// to a field on the accept message. This step allows us to easily
115-
// access the request that the accept message is responding to. Some of
116-
// the request fields are not present in the accept message.
117-
//
118-
// If the incoming message is a reject message, remove the corresponding
119-
// outgoing request from the store.
120-
switch typedMsg := msg.(type) {
121-
case *rfqmsg.Reject:
122-
// Delete the corresponding outgoing request from the store.
123-
h.outgoingRequests.Delete(typedMsg.ID)
124-
125-
case *rfqmsg.BuyAccept:
126-
// Load and delete the corresponding outgoing request from the
127-
// store.
128-
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
129-
typedMsg.ID,
130-
)
131-
132-
// Ensure that we have an outgoing request to match the incoming
133-
// accept message.
134-
if !found {
135-
return fmt.Errorf("no outgoing request found for "+
136-
"incoming accept message: %s", typedMsg.ID)
137-
}
138-
139-
// Type cast the outgoing message to a BuyRequest (the request
140-
// type that corresponds to a buy accept message).
141-
buyReq, ok := outgoingRequest.(*rfqmsg.BuyRequest)
142-
if !ok {
143-
return fmt.Errorf("expected BuyRequest, got %T",
144-
outgoingRequest)
145-
}
146-
147-
typedMsg.Request = *buyReq
148-
149-
case *rfqmsg.SellAccept:
150-
// Load and delete the corresponding outgoing request from the
151-
// store.
152-
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
153-
typedMsg.ID,
154-
)
155-
156-
// Ensure that we have an outgoing request to match the incoming
157-
// accept message.
158-
if !found {
159-
return fmt.Errorf("no outgoing request found for "+
160-
"incoming accept message: %s", typedMsg.ID)
161-
}
162-
163-
// Type cast the outgoing message to a SellRequest (the request
164-
// type that corresponds to a sell accept message).
165-
req, ok := outgoingRequest.(*rfqmsg.SellRequest)
166-
if !ok {
167-
return fmt.Errorf("expected SellRequest, got %T",
168-
outgoingRequest)
169-
}
170-
171-
typedMsg.Request = *req
116+
// If the incoming message is a response to an outgoing request, we
117+
// will remove the corresponding session from the store. We can safely
118+
// remove the session at this point because we have received the only
119+
// response we expect for this session.
120+
switch msg.(type) {
121+
case *rfqmsg.BuyAccept, *rfqmsg.SellAccept, *rfqmsg.Reject:
122+
h.outgoingRequests.Delete(msg.MsgID())
172123
}
173124

174125
// Send the incoming message to the RFQ manager.

rfqmsg/accept.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ func (m *acceptWireMsgData) Bytes() ([]byte, error) {
230230
// asset to us. Conversely, an incoming sell accept message indicates that our
231231
// peer accepts our sell request, meaning they are willing to buy the asset from
232232
// us.
233-
func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
233+
func NewIncomingAcceptFromWire(wireMsg WireMessage,
234+
sessionLookup SessionLookup) (IncomingMsg, error) {
235+
234236
// Ensure that the message type is a quote accept message.
235237
if wireMsg.MsgType != MsgTypeAccept {
236238
return nil, ErrUnknownMessageType
@@ -248,17 +250,30 @@ func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
248250
"quote accept message: %w", err)
249251
}
250252

251-
// We will now determine whether this is a buy or sell accept. We can
252-
// distinguish between buy/sell accept messages by inspecting which tick
253-
// rate field is populated.
254-
isBuyAccept := msgData.InOutRateTick.IsSome()
253+
// Before we can determine whether this is a buy or sell accept, we need
254+
// to look up the corresponding outgoing request message. This step is
255+
// necessary because the accept message data does not contain sufficient
256+
// data to distinguish between buy and sell accept messages.
257+
if sessionLookup == nil {
258+
return nil, fmt.Errorf("RFQ session lookup function is " +
259+
"required")
260+
}
255261

256-
// If this is a buy request, then we will create a new buy request
257-
// message.
258-
if isBuyAccept {
259-
return newBuyAcceptFromWireMsg(wireMsg, msgData)
262+
request, found := sessionLookup(msgData.ID.Val)
263+
if !found {
264+
return nil, fmt.Errorf("no outgoing request found for "+
265+
"incoming accept message: %s", msgData.ID.Val)
260266
}
261267

262-
// Otherwise, this is a sell request.
263-
return newSellAcceptFromWireMsg(wireMsg, msgData)
268+
// Use the corresponding request to determine the type of accept
269+
// message.
270+
switch typedRequest := request.(type) {
271+
case *BuyRequest:
272+
return newBuyAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
273+
case *SellRequest:
274+
return newSellAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
275+
default:
276+
return nil, fmt.Errorf("unknown request type for incoming "+
277+
"accept message: %T", request)
278+
}
264279
}

rfqmsg/buy_accept.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func NewBuyAcceptFromRequest(request BuyRequest, askPrice lnwire.MilliSatoshi,
5858

5959
// newBuyAcceptFromWireMsg instantiates a new instance from a wire message.
6060
func newBuyAcceptFromWireMsg(wireMsg WireMessage,
61-
msgData acceptWireMsgData) (*BuyAccept, error) {
61+
msgData acceptWireMsgData, request BuyRequest) (*BuyAccept, error) {
6262

6363
// Ensure that the message type is an accept message.
6464
if wireMsg.MsgType != MsgTypeAccept {
@@ -79,6 +79,7 @@ func newBuyAcceptFromWireMsg(wireMsg WireMessage,
7979

8080
return &BuyAccept{
8181
Peer: wireMsg.Peer,
82+
Request: request,
8283
Version: msgData.Version.Val,
8384
ID: msgData.ID.Val,
8485
Expiry: msgData.Expiry.Val,

rfqmsg/messages.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,19 @@ type WireMessage struct {
9090
Data []byte
9191
}
9292

93+
// SessionLookup is a function that can be used to look up a session quote
94+
// request message given a session ID.
95+
type SessionLookup func(id ID) (OutgoingMsg, bool)
96+
9397
// NewIncomingMsgFromWire creates a new RFQ message from a wire message.
94-
func NewIncomingMsgFromWire(wireMsg WireMessage) (IncomingMsg, error) {
98+
func NewIncomingMsgFromWire(wireMsg WireMessage,
99+
sessionLookup SessionLookup) (IncomingMsg, error) {
100+
95101
switch wireMsg.MsgType {
96102
case MsgTypeRequest:
97103
return NewIncomingRequestFromWire(wireMsg)
98104
case MsgTypeAccept:
99-
return NewIncomingAcceptFromWire(wireMsg)
105+
return NewIncomingAcceptFromWire(wireMsg, sessionLookup)
100106
case MsgTypeReject:
101107
return NewQuoteRejectFromWireMsg(wireMsg)
102108
default:

rfqmsg/sell_accept.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func NewSellAcceptFromRequest(request SellRequest, bidPrice lnwire.MilliSatoshi,
5858

5959
// newSellAcceptFromWireMsg instantiates a new instance from a wire message.
6060
func newSellAcceptFromWireMsg(wireMsg WireMessage,
61-
msgData acceptWireMsgData) (*SellAccept, error) {
61+
msgData acceptWireMsgData, request SellRequest) (*SellAccept,
62+
error) {
6263

6364
// Ensure that the message type is an accept message.
6465
if wireMsg.MsgType != MsgTypeAccept {
@@ -82,6 +83,7 @@ func newSellAcceptFromWireMsg(wireMsg WireMessage,
8283
// service.
8384
return &SellAccept{
8485
Peer: wireMsg.Peer,
86+
Request: request,
8587
Version: msgData.Version.Val,
8688
ID: msgData.ID.Val,
8789
BidPrice: bidPrice,

0 commit comments

Comments
 (0)