Skip to content

Commit 1e65673

Browse files
authored
Merge pull request #182 from cbusbey/send_outside_of_in_session
allow sends outside of session being logged on
2 parents 384c286 + b42495c commit 1e65673

File tree

5 files changed

+291
-89
lines changed

5 files changed

+291
-89
lines changed

pending_timeout_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,23 @@ import (
77
)
88

99
func TestPendingTimeout_SessionTimeout(t *testing.T) {
10-
session := &session{
11-
log: nullLog{},
12-
}
13-
1410
tests := []pendingTimeout{
1511
pendingTimeout{inSession{}},
1612
pendingTimeout{resendState{}},
1713
}
1814

1915
for _, state := range tests {
16+
session := &session{
17+
log: nullLog{},
18+
sessionState: state,
19+
}
20+
2021
nextState := state.Timeout(session, peerTimeout)
2122
assert.IsType(t, latentState{}, nextState)
2223
}
2324
}
2425

2526
func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) {
26-
session := &session{
27-
log: nullLog{},
28-
}
29-
3027
tests := []pendingTimeout{
3128
pendingTimeout{inSession{}},
3229
pendingTimeout{resendState{}},
@@ -35,6 +32,11 @@ func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) {
3532
testEvents := []event{needHeartbeat, logonTimeout, logoutTimeout}
3633

3734
for _, state := range tests {
35+
session := &session{
36+
log: nullLog{},
37+
sessionState: state,
38+
}
39+
3840
for _, event := range testEvents {
3941
nextState := state.Timeout(session, event)
4042
assert.Equal(t, state, nextState)

registry.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,9 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
3939
session, err := lookupSession(sessionID)
4040
if err != nil {
4141
return err
42-
} else if session.toSend == nil {
43-
return fmt.Errorf("Not logged on")
4442
}
4543

46-
request := sendRequest{msg, make(chan error)}
47-
session.toSend <- request
48-
return <-request.err
44+
return session.queueForSend(msg)
4945
}
5046

5147
type sessionActivate struct {

resend_state_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ func TestResendState_TimeoutPeerTimeout(t *testing.T) {
1616
<-otherEnd
1717
}()
1818

19+
state := resendState{}
1920
session := &session{
20-
store: new(memoryStore),
21-
application: new(TestClient),
22-
messageOut: otherEnd,
23-
log: nullLog{},
21+
store: new(memoryStore),
22+
application: new(TestClient),
23+
messageOut: otherEnd,
24+
log: nullLog{},
25+
sessionState: state,
2426
}
25-
state := resendState{}
27+
2628
nextState := state.Timeout(session, peerTimeout)
2729
assert.Equal(t, pendingTimeout{state}, nextState)
2830
}
@@ -33,13 +35,14 @@ func TestResendState_TimeoutUnchanged(t *testing.T) {
3335
<-otherEnd
3436
}()
3537

38+
state := resendState{}
3639
session := &session{
37-
store: new(memoryStore),
38-
application: new(TestClient),
39-
messageOut: otherEnd,
40-
log: nullLog{},
40+
store: new(memoryStore),
41+
application: new(TestClient),
42+
messageOut: otherEnd,
43+
log: nullLog{},
44+
sessionState: state,
4145
}
42-
state := resendState{}
4346

4447
tests := []event{needHeartbeat, logonTimeout, logoutTimeout}
4548

session.go

Lines changed: 98 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package quickfix
22

33
import (
44
"fmt"
5+
"sync"
56
"time"
67

78
"github.com/quickfixgo/quickfix/config"
@@ -18,9 +19,14 @@ type session struct {
1819

1920
messageOut chan []byte
2021
messageIn chan fixIn
21-
toSend chan sendRequest
2222
resendIn chan Message
2323

24+
//application messages are queued up for send here
25+
toSend []Message
26+
27+
//mutex for access to toSend
28+
sendMutex sync.Mutex
29+
2430
sessionEvent chan event
2531
messageEvent chan bool
2632
application Application
@@ -212,31 +218,109 @@ func (s *session) resend(msg Message) error {
212218
return nil
213219
}
214220

215-
//send should NOT be called outside of the run loop
221+
//queueForSend will validate, persist, and queue the message for send
222+
func (s *session) queueForSend(msg Message) (err error) {
223+
s.sendMutex.Lock()
224+
defer s.sendMutex.Unlock()
225+
226+
if err = s.prepMessageForSend(&msg); err != nil {
227+
return
228+
}
229+
230+
s.toSend = append(s.toSend, msg)
231+
232+
select {
233+
case s.messageEvent <- true:
234+
default:
235+
}
236+
237+
return
238+
}
239+
240+
//send will validate, persist, queue the message and send all messages in the queue
216241
func (s *session) send(msg Message) (err error) {
217-
s.fillDefaultHeader(msg)
242+
s.sendMutex.Lock()
243+
defer s.sendMutex.Unlock()
244+
245+
if err = s.prepMessageForSend(&msg); err != nil {
246+
return
247+
}
248+
249+
s.toSend = append(s.toSend, msg)
250+
s.sendQueued()
251+
252+
return
253+
}
254+
255+
//dropAndSend will validate and persist the message, then drops the send queue and sends the message
256+
func (s *session) dropAndSend(msg Message) (err error) {
257+
258+
s.sendMutex.Lock()
259+
defer s.sendMutex.Unlock()
260+
if err = s.prepMessageForSend(&msg); err != nil {
261+
return
262+
}
263+
264+
s.dropQueued()
265+
s.toSend = append(s.toSend, msg)
266+
s.sendQueued()
267+
268+
return
269+
}
218270

271+
func (s *session) prepMessageForSend(msg *Message) (err error) {
272+
s.fillDefaultHeader(*msg)
219273
seqNum := s.store.NextSenderMsgSeqNum()
220274
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
221275

222276
var msgType FIXString
223-
if msg.Header.GetField(tagMsgType, &msgType); isAdminMessageType(string(msgType)) {
224-
s.application.ToAdmin(msg, s.sessionID)
277+
if err = msg.Header.GetField(tagMsgType, &msgType); err != nil {
278+
return err
279+
}
280+
281+
if isAdminMessageType(string(msgType)) {
282+
s.application.ToAdmin(*msg, s.sessionID)
225283
} else {
226-
s.application.ToApp(msg, s.sessionID)
284+
s.application.ToApp(*msg, s.sessionID)
227285
}
228286

229287
var msgBytes []byte
230288
if msgBytes, err = msg.Build(); err != nil {
231289
return
232290
}
233291

234-
if err = s.store.SaveMessage(seqNum, msgBytes); err == nil {
235-
s.sendBytes(msgBytes)
236-
err = s.store.IncrNextSenderMsgSeqNum()
292+
return s.persist(seqNum, msgBytes)
293+
}
294+
295+
func (s *session) persist(seqNum int, msgBytes []byte) error {
296+
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
297+
return err
237298
}
238299

239-
return
300+
return s.store.IncrNextSenderMsgSeqNum()
301+
}
302+
303+
func (s *session) sendQueued() {
304+
for _, msg := range s.toSend {
305+
s.sendBytes(msg.rawMessage)
306+
}
307+
308+
s.dropQueued()
309+
}
310+
311+
func (s *session) dropQueued() {
312+
s.toSend = s.toSend[:0]
313+
}
314+
315+
func (s *session) sendOrDropAppMessages() {
316+
s.sendMutex.Lock()
317+
defer s.sendMutex.Unlock()
318+
319+
if s.IsLoggedOn() {
320+
s.sendQueued()
321+
} else {
322+
s.dropQueued()
323+
}
240324
}
241325

242326
func (s *session) sendBytes(msg []byte) {
@@ -325,7 +409,7 @@ func (s *session) handleLogon(msg Message) error {
325409
}
326410

327411
s.log.OnEvent("Responding to logon request")
328-
if err := s.send(reply); err != nil {
412+
if err := s.dropAndSend(reply); err != nil {
329413
return err
330414
}
331415
} else {
@@ -538,15 +622,9 @@ type fixIn struct {
538622
receiveTime time.Time
539623
}
540624

541-
type sendRequest struct {
542-
msg Message
543-
err chan error
544-
}
545-
546625
func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
547626
s.messageIn = msgIn
548627
s.messageOut = msgOut
549-
s.toSend = make(chan sendRequest)
550628
s.resendIn = make(chan Message, 1)
551629

552630
type fromCallback struct {
@@ -557,8 +635,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
557635

558636
defer func() {
559637
close(s.messageOut)
560-
close(s.toSend)
561-
s.toSend = nil
562638
s.stateTimer.Stop()
563639
s.peerTimer.Stop()
564640
s.onDisconnect()
@@ -587,7 +663,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
587663
}
588664

589665
s.log.OnEvent("Sending logon request")
590-
if err := s.send(logon); err != nil {
666+
if err := s.dropAndSend(logon); err != nil {
591667
s.logError(err)
592668
return
593669
}
@@ -607,19 +683,12 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
607683
}
608684

609685
for {
610-
611686
switch s.sessionState.(type) {
612687
case latentState:
613688
return
614689
}
615690

616691
select {
617-
case request := <-s.toSend:
618-
if s.IsLoggedOn() {
619-
request.err <- s.send(request.msg)
620-
} else {
621-
request.err <- fmt.Errorf("Not logged on")
622-
}
623692
case fixIn, ok := <-msgIn:
624693
if ok {
625694
s.log.OnIncoming(string(fixIn.bytes))
@@ -658,6 +727,8 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
658727
}
659728
case evt := <-s.sessionEvent:
660729
s.sessionState = s.Timeout(s, evt)
730+
case <-s.messageEvent:
731+
s.sendOrDropAppMessages()
661732
}
662733
}
663734
}

0 commit comments

Comments
 (0)