Skip to content

Commit 5ce4911

Browse files
committed
synchronize msg sends with session loop (wip)
1 parent 3d85e3b commit 5ce4911

File tree

3 files changed

+20
-96
lines changed

3 files changed

+20
-96
lines changed

registry.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
4141
return err
4242
}
4343

44-
//NOTE: must queue for send here. otherwise, if not executed in same goroutine as session run loop,
45-
//message may be sent on closed channel or sent outside of valid state
46-
session.queueForSend(msg)
47-
48-
return nil
44+
request := sendRequest{msg, make(chan error)}
45+
session.toSend <- request
46+
return <-request.err
4947
}
5048

5149
type sessionActivate struct {

session.go

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package quickfix
22

33
import (
44
"fmt"
5-
"sync"
65
"time"
76

87
"github.com/quickfixgo/quickfix/config"
@@ -18,11 +17,8 @@ type session struct {
1817
sessionID SessionID
1918

2019
messageOut chan []byte
21-
22-
//application messages are queued up to be sent during the run loop.
23-
toSend []Message
24-
//mutex for access to toSend
25-
sendMutex sync.Mutex
20+
messageIn chan fixIn
21+
toSend chan sendRequest
2622

2723
sessionEvent chan event
2824
messageEvent chan bool
@@ -189,35 +185,8 @@ func (s *session) resend(msg Message) {
189185
s.sendBytes(msg.rawMessage)
190186
}
191187

192-
//queueForSend will queue up a message to be sent by the session during the next iteration of the run loop
193-
func (s *session) queueForSend(msg Message) {
194-
s.sendMutex.Lock()
195-
defer s.sendMutex.Unlock()
196-
s.toSend = append(s.toSend, msg)
197-
select {
198-
case s.messageEvent <- true:
199-
default:
200-
}
201-
}
202-
203-
//sends queued messages if session is logged on
204-
func (s *session) sendQueued() {
205-
if !s.IsLoggedOn() {
206-
return
207-
}
208-
209-
s.sendMutex.Lock()
210-
defer s.sendMutex.Unlock()
211-
212-
for _, msg := range s.toSend {
213-
s.send(msg)
214-
}
215-
216-
s.toSend = s.toSend[:0]
217-
}
218-
219188
//send should NOT be called outside of the run loop
220-
func (s *session) send(msg Message) {
189+
func (s *session) send(msg Message) error {
221190
s.fillDefaultHeader(msg)
222191

223192
seqNum := s.store.NextSenderMsgSeqNum()
@@ -230,12 +199,13 @@ func (s *session) send(msg Message) {
230199
s.application.ToApp(msg, s.sessionID)
231200
}
232201
if msgBytes, err := msg.Build(); err != nil {
233-
panic(err)
202+
return err
234203
} else {
235204
s.store.SaveMessage(seqNum, msgBytes)
236205
s.sendBytes(msgBytes)
237206
s.store.IncrNextSenderMsgSeqNum()
238207
}
208+
return nil
239209
}
240210

241211
func (s *session) sendBytes(msg []byte) {
@@ -511,7 +481,13 @@ type fixIn struct {
511481
receiveTime time.Time
512482
}
513483

484+
type sendRequest struct {
485+
msg Message
486+
err chan error
487+
}
488+
514489
func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
490+
s.messageIn = msgIn
515491
s.messageOut = msgOut
516492

517493
defer func() {
@@ -549,9 +525,13 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
549525
return
550526
}
551527

552-
s.sendQueued()
553-
554528
select {
529+
case request := <-s.toSend:
530+
if s.IsLoggedOn() {
531+
request.err <- s.send(request.msg)
532+
} else {
533+
request.err <- fmt.Errorf("Not logged on")
534+
}
555535
case fixIn, ok := <-msgIn:
556536
if ok {
557537
s.log.OnIncoming(string(fixIn.bytes))
@@ -575,7 +555,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
575555
}
576556
case evt := <-s.sessionEvent:
577557
s.sessionState = s.Timeout(s, evt)
578-
case <-s.messageEvent:
579558
}
580559
}
581560
}

session_test.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -326,56 +326,3 @@ func TestSession_CheckToAppCalled(t *testing.T) {
326326
t.Error("Toadmin should not have been called, instead was called", app.adminCalled, "times")
327327
}
328328
}
329-
330-
func TestSession_sendQueued(t *testing.T) {
331-
app := new(TestClient)
332-
otherEnd := make(chan []byte)
333-
go func() {
334-
for {
335-
_, ok := <-otherEnd
336-
if !ok {
337-
return
338-
}
339-
}
340-
}()
341-
session := session{
342-
store: new(memoryStore),
343-
application: app,
344-
messageOut: otherEnd,
345-
log: nullLog{}}
346-
session.queueForSend(buildMessage())
347-
session.queueForSend(buildMessage())
348-
session.queueForSend(buildMessage())
349-
350-
if len(session.toSend) != 3 {
351-
t.Errorf("Expected %v queued messages, got %v", 3, len(session.toSend))
352-
}
353-
354-
var tests = []struct {
355-
sessionState
356-
}{
357-
{logonState{}},
358-
{logoutState{}},
359-
}
360-
361-
for _, test := range tests {
362-
session.sessionState = test.sessionState
363-
session.sendQueued()
364-
365-
if app.appCalled != 0 {
366-
t.Fatalf("session state %v should not allow send but sent %v times", session.sessionState, app.appCalled)
367-
}
368-
}
369-
370-
session.sessionState = inSession{}
371-
372-
session.sendQueued()
373-
374-
if app.appCalled != 3 {
375-
t.Errorf("Toapp should have been called %v times, instead was called %v times", 3, app.appCalled)
376-
}
377-
378-
if len(session.toSend) != 0 {
379-
t.Errorf("Expected no queued messages, got %v", len(session.toSend))
380-
}
381-
}

0 commit comments

Comments
 (0)