Skip to content

Commit 0d64598

Browse files
authored
Merge pull request #1128 from lightninglabs/rfqmsg-session-lookup-parsing
RFQ session lookup during quote accept message parsing
2 parents 81dd002 + 2eb9eea commit 0d64598

File tree

8 files changed

+88
-77
lines changed

8 files changed

+88
-77
lines changed

rfq/stream.go

Lines changed: 13 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,12 @@ func NewStreamHandler(ctx context.Context,
9292
func (h *StreamHandler) handleIncomingWireMessage(
9393
wireMsg rfqmsg.WireMessage) error {
9494

95-
// Parse the wire message as an RFQ message.
96-
msg, err := rfqmsg.NewIncomingMsgFromWire(wireMsg)
95+
// Parse the wire message as an RFQ message. The session cache load
96+
// function is provided to associate incoming wire messages with their
97+
// corresponding outgoing requests during parsing.
98+
msg, err := rfqmsg.NewIncomingMsgFromWire(
99+
wireMsg, h.outgoingRequests.Load,
100+
)
97101
if err != nil {
98102
if errors.Is(err, rfqmsg.ErrUnknownMessageType) {
99103
// Silently disregard the message if we don't recognise
@@ -109,66 +113,13 @@ func (h *StreamHandler) handleIncomingWireMessage(
109113

110114
log.Debugf("Stream handling incoming message: %s", msg)
111115

112-
// If the incoming message is an accept message, lookup the
113-
// corresponding outgoing request message. Assign the outgoing request
114-
// to a field on the accept message. This step allows us to easily
115-
// access the request that the accept message is responding to. Some of
116-
// the request fields are not present in the accept message.
117-
//
118-
// If the incoming message is a reject message, remove the corresponding
119-
// outgoing request from the store.
120-
switch typedMsg := msg.(type) {
121-
case *rfqmsg.Reject:
122-
// Delete the corresponding outgoing request from the store.
123-
h.outgoingRequests.Delete(typedMsg.ID)
124-
125-
case *rfqmsg.BuyAccept:
126-
// Load and delete the corresponding outgoing request from the
127-
// store.
128-
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
129-
typedMsg.ID,
130-
)
131-
132-
// Ensure that we have an outgoing request to match the incoming
133-
// accept message.
134-
if !found {
135-
return fmt.Errorf("no outgoing request found for "+
136-
"incoming accept message: %s", typedMsg.ID)
137-
}
138-
139-
// Type cast the outgoing message to a BuyRequest (the request
140-
// type that corresponds to a buy accept message).
141-
buyReq, ok := outgoingRequest.(*rfqmsg.BuyRequest)
142-
if !ok {
143-
return fmt.Errorf("expected BuyRequest, got %T",
144-
outgoingRequest)
145-
}
146-
147-
typedMsg.Request = *buyReq
148-
149-
case *rfqmsg.SellAccept:
150-
// Load and delete the corresponding outgoing request from the
151-
// store.
152-
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
153-
typedMsg.ID,
154-
)
155-
156-
// Ensure that we have an outgoing request to match the incoming
157-
// accept message.
158-
if !found {
159-
return fmt.Errorf("no outgoing request found for "+
160-
"incoming accept message: %s", typedMsg.ID)
161-
}
162-
163-
// Type cast the outgoing message to a SellRequest (the request
164-
// type that corresponds to a sell accept message).
165-
req, ok := outgoingRequest.(*rfqmsg.SellRequest)
166-
if !ok {
167-
return fmt.Errorf("expected SellRequest, got %T",
168-
outgoingRequest)
169-
}
170-
171-
typedMsg.Request = *req
116+
// If the incoming message is a response to an outgoing request, we
117+
// will remove the corresponding session from the store. We can safely
118+
// remove the session at this point because we have received the only
119+
// response we expect for this session.
120+
switch msg.(type) {
121+
case *rfqmsg.BuyAccept, *rfqmsg.SellAccept, *rfqmsg.Reject:
122+
h.outgoingRequests.Delete(msg.MsgID())
172123
}
173124

174125
// Send the incoming message to the RFQ manager.

rfqmsg/accept.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ func (m *acceptWireMsgData) Bytes() ([]byte, error) {
230230
// asset to us. Conversely, an incoming sell accept message indicates that our
231231
// peer accepts our sell request, meaning they are willing to buy the asset from
232232
// us.
233-
func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
233+
func NewIncomingAcceptFromWire(wireMsg WireMessage,
234+
sessionLookup SessionLookup) (IncomingMsg, error) {
235+
234236
// Ensure that the message type is a quote accept message.
235237
if wireMsg.MsgType != MsgTypeAccept {
236238
return nil, ErrUnknownMessageType
@@ -248,17 +250,30 @@ func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
248250
"quote accept message: %w", err)
249251
}
250252

251-
// We will now determine whether this is a buy or sell accept. We can
252-
// distinguish between buy/sell accept messages by inspecting which tick
253-
// rate field is populated.
254-
isBuyAccept := msgData.InOutRateTick.IsSome()
253+
// Before we can determine whether this is a buy or sell accept, we need
254+
// to look up the corresponding outgoing request message. This step is
255+
// necessary because the accept message data does not contain sufficient
256+
// data to distinguish between buy and sell accept messages.
257+
if sessionLookup == nil {
258+
return nil, fmt.Errorf("RFQ session lookup function is " +
259+
"required")
260+
}
255261

256-
// If this is a buy request, then we will create a new buy request
257-
// message.
258-
if isBuyAccept {
259-
return newBuyAcceptFromWireMsg(wireMsg, msgData)
262+
request, found := sessionLookup(msgData.ID.Val)
263+
if !found {
264+
return nil, fmt.Errorf("no outgoing request found for "+
265+
"incoming accept message: %s", msgData.ID.Val)
260266
}
261267

262-
// Otherwise, this is a sell request.
263-
return newSellAcceptFromWireMsg(wireMsg, msgData)
268+
// Use the corresponding request to determine the type of accept
269+
// message.
270+
switch typedRequest := request.(type) {
271+
case *BuyRequest:
272+
return newBuyAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
273+
case *SellRequest:
274+
return newSellAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
275+
default:
276+
return nil, fmt.Errorf("unknown request type for incoming "+
277+
"accept message: %T", request)
278+
}
264279
}

rfqmsg/buy_accept.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func NewBuyAcceptFromRequest(request BuyRequest, askPrice lnwire.MilliSatoshi,
5858

5959
// newBuyAcceptFromWireMsg instantiates a new instance from a wire message.
6060
func newBuyAcceptFromWireMsg(wireMsg WireMessage,
61-
msgData acceptWireMsgData) (*BuyAccept, error) {
61+
msgData acceptWireMsgData, request BuyRequest) (*BuyAccept, error) {
6262

6363
// Ensure that the message type is an accept message.
6464
if wireMsg.MsgType != MsgTypeAccept {
@@ -79,6 +79,7 @@ func newBuyAcceptFromWireMsg(wireMsg WireMessage,
7979

8080
return &BuyAccept{
8181
Peer: wireMsg.Peer,
82+
Request: request,
8283
Version: msgData.Version.Val,
8384
ID: msgData.ID.Val,
8485
Expiry: msgData.Expiry.Val,

rfqmsg/buy_request.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,16 @@ func (q *BuyRequest) ToWire() (WireMessage, error) {
168168
}, nil
169169
}
170170

171+
// MsgPeer returns the peer that sent the message.
172+
func (q *BuyRequest) MsgPeer() route.Vertex {
173+
return q.Peer
174+
}
175+
176+
// MsgID returns the quote request session ID.
177+
func (q *BuyRequest) MsgID() ID {
178+
return q.ID
179+
}
180+
171181
// String returns a human-readable string representation of the message.
172182
func (q *BuyRequest) String() string {
173183
var groupKeyBytes []byte

rfqmsg/messages.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,19 @@ type WireMessage struct {
9090
Data []byte
9191
}
9292

93+
// SessionLookup is a function that can be used to look up a session quote
94+
// request message given a session ID.
95+
type SessionLookup func(id ID) (OutgoingMsg, bool)
96+
9397
// NewIncomingMsgFromWire creates a new RFQ message from a wire message.
94-
func NewIncomingMsgFromWire(wireMsg WireMessage) (IncomingMsg, error) {
98+
func NewIncomingMsgFromWire(wireMsg WireMessage,
99+
sessionLookup SessionLookup) (IncomingMsg, error) {
100+
95101
switch wireMsg.MsgType {
96102
case MsgTypeRequest:
97103
return NewIncomingRequestFromWire(wireMsg)
98104
case MsgTypeAccept:
99-
return NewIncomingAcceptFromWire(wireMsg)
105+
return NewIncomingAcceptFromWire(wireMsg, sessionLookup)
100106
case MsgTypeReject:
101107
return NewQuoteRejectFromWireMsg(wireMsg)
102108
default:
@@ -156,6 +162,12 @@ func WireMsgDataVersionDecoder(r io.Reader, val any, buf *[8]byte,
156162
// IncomingMsg is an interface that represents an inbound wire message
157163
// that has been received from a peer.
158164
type IncomingMsg interface {
165+
// MsgPeer returns the peer that sent the message.
166+
MsgPeer() route.Vertex
167+
168+
// MsgID returns the quote request session ID.
169+
MsgID() ID
170+
159171
// String returns a human-readable string representation of the message.
160172
String() string
161173
}

rfqmsg/reject.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,16 @@ func (q *Reject) ToWire() (WireMessage, error) {
243243
}, nil
244244
}
245245

246+
// MsgPeer returns the peer that sent the message.
247+
func (q *Reject) MsgPeer() route.Vertex {
248+
return q.Peer
249+
}
250+
251+
// MsgID returns the quote request session ID.
252+
func (q *Reject) MsgID() ID {
253+
return q.ID
254+
}
255+
246256
// String returns a human-readable string representation of the message.
247257
func (q *Reject) String() string {
248258
return fmt.Sprintf("Reject(id=%x, err_code=%d, err_msg=%s)",

rfqmsg/sell_accept.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func NewSellAcceptFromRequest(request SellRequest, bidPrice lnwire.MilliSatoshi,
5858

5959
// newSellAcceptFromWireMsg instantiates a new instance from a wire message.
6060
func newSellAcceptFromWireMsg(wireMsg WireMessage,
61-
msgData acceptWireMsgData) (*SellAccept, error) {
61+
msgData acceptWireMsgData, request SellRequest) (*SellAccept,
62+
error) {
6263

6364
// Ensure that the message type is an accept message.
6465
if wireMsg.MsgType != MsgTypeAccept {
@@ -82,6 +83,7 @@ func newSellAcceptFromWireMsg(wireMsg WireMessage,
8283
// service.
8384
return &SellAccept{
8485
Peer: wireMsg.Peer,
86+
Request: request,
8587
Version: msgData.Version.Val,
8688
ID: msgData.ID.Val,
8789
BidPrice: bidPrice,

rfqmsg/sell_request.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,16 @@ func (q *SellRequest) ToWire() (WireMessage, error) {
174174
}, nil
175175
}
176176

177+
// MsgPeer returns the peer that sent the message.
178+
func (q *SellRequest) MsgPeer() route.Vertex {
179+
return q.Peer
180+
}
181+
182+
// MsgID returns the quote request session ID.
183+
func (q *SellRequest) MsgID() ID {
184+
return q.ID
185+
}
186+
177187
// String returns a human-readable string representation of the message.
178188
func (q *SellRequest) String() string {
179189
var groupKeyBytes []byte

0 commit comments

Comments
 (0)