Skip to content

Commit b42495c

Browse files
author
Chris Busbey
committed
reverting to send queue for out of session sending
1 parent bdae846 commit b42495c

File tree

3 files changed

+196
-130
lines changed

3 files changed

+196
-130
lines changed

registry.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
4141
return err
4242
}
4343

44-
request := sendRequest{msg, make(chan error)}
45-
session.toSend <- request
46-
return <-request.err
44+
return session.queueForSend(msg)
4745
}
4846

4947
type sessionActivate struct {

session.go

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package quickfix
22

33
import (
44
"fmt"
5+
"sync"
56
"time"
67

78
"github.com/quickfixgo/quickfix/config"
@@ -18,9 +19,14 @@ type session struct {
1819

1920
messageOut chan []byte
2021
messageIn chan fixIn
21-
toSend chan sendRequest
2222
resendIn chan Message
2323

24+
//application messages are queued up for send here
25+
toSend []Message
26+
27+
//mutex for access to toSend
28+
sendMutex sync.Mutex
29+
2430
sessionEvent chan event
2531
messageEvent chan bool
2632
application Application
@@ -130,7 +136,6 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
130136
return err
131137
}
132138

133-
session.toSend = make(chan sendRequest)
134139
session.sessionEvent = make(chan event)
135140
session.messageEvent = make(chan bool)
136141
session.application = application
@@ -213,18 +218,58 @@ func (s *session) resend(msg Message) error {
213218
return nil
214219
}
215220

216-
func (s *session) persist(seqNum int, msgBytes []byte) error {
217-
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
218-
return err
221+
//queueForSend will validate, persist, and queue the message for send
222+
func (s *session) queueForSend(msg Message) (err error) {
223+
s.sendMutex.Lock()
224+
defer s.sendMutex.Unlock()
225+
226+
if err = s.prepMessageForSend(&msg); err != nil {
227+
return
219228
}
220229

221-
return s.store.IncrNextSenderMsgSeqNum()
230+
s.toSend = append(s.toSend, msg)
231+
232+
select {
233+
case s.messageEvent <- true:
234+
default:
235+
}
236+
237+
return
222238
}
223239

224-
//send should NOT be called outside of the run loop
240+
//send will validate, persist, queue the message and send all messages in the queue
225241
func (s *session) send(msg Message) (err error) {
226-
s.fillDefaultHeader(msg)
242+
s.sendMutex.Lock()
243+
defer s.sendMutex.Unlock()
227244

245+
if err = s.prepMessageForSend(&msg); err != nil {
246+
return
247+
}
248+
249+
s.toSend = append(s.toSend, msg)
250+
s.sendQueued()
251+
252+
return
253+
}
254+
255+
//dropAndSend will validate and persist the message, then drops the send queue and sends the message
256+
func (s *session) dropAndSend(msg Message) (err error) {
257+
258+
s.sendMutex.Lock()
259+
defer s.sendMutex.Unlock()
260+
if err = s.prepMessageForSend(&msg); err != nil {
261+
return
262+
}
263+
264+
s.dropQueued()
265+
s.toSend = append(s.toSend, msg)
266+
s.sendQueued()
267+
268+
return
269+
}
270+
271+
func (s *session) prepMessageForSend(msg *Message) (err error) {
272+
s.fillDefaultHeader(*msg)
228273
seqNum := s.store.NextSenderMsgSeqNum()
229274
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
230275

@@ -234,32 +279,48 @@ func (s *session) send(msg Message) (err error) {
234279
}
235280

236281
if isAdminMessageType(string(msgType)) {
237-
s.application.ToAdmin(msg, s.sessionID)
282+
s.application.ToAdmin(*msg, s.sessionID)
238283
} else {
239-
s.application.ToApp(msg, s.sessionID)
284+
s.application.ToApp(*msg, s.sessionID)
240285
}
241286

242287
var msgBytes []byte
243288
if msgBytes, err = msg.Build(); err != nil {
244289
return
245290
}
246291

247-
if err = s.persist(seqNum, msgBytes); err != nil {
248-
return
292+
return s.persist(seqNum, msgBytes)
293+
}
294+
295+
func (s *session) persist(seqNum int, msgBytes []byte) error {
296+
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
297+
return err
298+
}
299+
300+
return s.store.IncrNextSenderMsgSeqNum()
301+
}
302+
303+
func (s *session) sendQueued() {
304+
for _, msg := range s.toSend {
305+
s.sendBytes(msg.rawMessage)
249306
}
250307

308+
s.dropQueued()
309+
}
310+
311+
func (s *session) dropQueued() {
312+
s.toSend = s.toSend[:0]
313+
}
314+
315+
func (s *session) sendOrDropAppMessages() {
316+
s.sendMutex.Lock()
317+
defer s.sendMutex.Unlock()
318+
251319
if s.IsLoggedOn() {
252-
s.sendBytes(msgBytes)
320+
s.sendQueued()
253321
} else {
254-
switch msgType {
255-
case enum.MsgType_LOGON:
256-
fallthrough
257-
case enum.MsgType_RESEND_REQUEST:
258-
s.sendBytes(msgBytes)
259-
}
322+
s.dropQueued()
260323
}
261-
262-
return
263324
}
264325

265326
func (s *session) sendBytes(msg []byte) {
@@ -348,7 +409,7 @@ func (s *session) handleLogon(msg Message) error {
348409
}
349410

350411
s.log.OnEvent("Responding to logon request")
351-
if err := s.send(reply); err != nil {
412+
if err := s.dropAndSend(reply); err != nil {
352413
return err
353414
}
354415
} else {
@@ -561,11 +622,6 @@ type fixIn struct {
561622
receiveTime time.Time
562623
}
563624

564-
type sendRequest struct {
565-
msg Message
566-
err chan error
567-
}
568-
569625
func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
570626
s.messageIn = msgIn
571627
s.messageOut = msgOut
@@ -607,7 +663,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
607663
}
608664

609665
s.log.OnEvent("Sending logon request")
610-
if err := s.send(logon); err != nil {
666+
if err := s.dropAndSend(logon); err != nil {
611667
s.logError(err)
612668
return
613669
}
@@ -627,15 +683,12 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
627683
}
628684

629685
for {
630-
631686
switch s.sessionState.(type) {
632687
case latentState:
633688
return
634689
}
635690

636691
select {
637-
case request := <-s.toSend:
638-
request.err <- s.send(request.msg)
639692
case fixIn, ok := <-msgIn:
640693
if ok {
641694
s.log.OnIncoming(string(fixIn.bytes))
@@ -674,6 +727,8 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
674727
}
675728
case evt := <-s.sessionEvent:
676729
s.sessionState = s.Timeout(s, evt)
730+
case <-s.messageEvent:
731+
s.sendOrDropAppMessages()
677732
}
678733
}
679734
}

0 commit comments

Comments
 (0)