Skip to content

Commit 012a57a

Browse files
committed
Merge pull request #64 from cbusbey/send_on_close_fix
misc refactoring, actual fix for #59
2 parents a97b847 + 8e94488 commit 012a57a

13 files changed

+197
-87
lines changed

event_timer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ func (t *eventTimer) Reset(timeout time.Duration) (ok bool) {
1616
ok = true
1717
}
1818

19-
t.timer = time.AfterFunc(timeout, t.Task)
19+
if t.Task != nil {
20+
t.timer = time.AfterFunc(timeout, t.Task)
21+
}
2022
return
2123
}

in_session.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
type inSession struct {
99
}
1010

11+
func (state inSession) String() string { return "In Session" }
12+
func (state inSession) IsLoggedOn() bool { return true }
13+
1114
func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessionState) {
1215
var msgType FIXString
1316
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
@@ -196,7 +199,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
196199
switch TypedError := rej.(type) {
197200
case targetTooHigh:
198201

199-
switch session.currentState.(type) {
202+
switch session.sessionState.(type) {
200203
default:
201204
session.doTargetTooHigh(TypedError)
202205
case resendState:

latent_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package quickfix
22

33
type latentState struct{}
44

5+
func (state latentState) String() string { return "Latent State" }
6+
func (state latentState) IsLoggedOn() bool { return false }
7+
58
func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
69
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
710
return state

logon_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package quickfix
22

33
type logonState struct{}
44

5+
func (state logonState) String() string { return "Logon State" }
6+
func (s logonState) IsLoggedOn() bool { return false }
7+
58
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
69
var msgType FIXString
710
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil && string(msgType) == "A" {

logout_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package quickfix
33
type logoutState struct {
44
}
55

6+
func (state logoutState) String() string { return "Logout State" }
7+
func (s logoutState) IsLoggedOn() bool { return false }
8+
69
func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
710
return state
811
}

registry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func SendToTarget(m Marshaler, sessionID SessionID) error {
4141
return err
4242
}
4343

44-
session.send(msg)
44+
//NOTE: must queue for send here. otherwise, if not executed in same goroutine as session run loop,
45+
//message may be sent on closed channel or sent outside of valid state
46+
session.queueForSend(msg)
4547

4648
return nil
4749
}

resend_state.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ func (state resendState) FixMsgIn(session *session, msg Message) (nextState sess
88
for ok := true; ok; {
99
nextState = state.inSession.FixMsgIn(session, msg)
1010

11-
switch nextState.(type) {
12-
case logoutState, latentState, resendState:
11+
if !nextState.IsLoggedOn() {
1312
return
1413
}
1514

session.go

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/quickfixgo/quickfix/config"
66
"github.com/quickfixgo/quickfix/datadictionary"
77
"github.com/quickfixgo/quickfix/enum"
8+
"sync"
89
"time"
910
)
1011

@@ -16,11 +17,15 @@ type session struct {
1617
sessionID SessionID
1718

1819
messageOut chan []byte
19-
toSend chan Message
2020

21-
sessionEvent chan event
22-
application Application
23-
currentState sessionState
21+
//application messages are queued up to be sent during the run loop.
22+
toSend []Message
23+
//mutex for access to toSend
24+
sendMutex sync.Mutex
25+
26+
sessionEvent chan event
27+
application Application
28+
sessionState
2429
stateTimer eventTimer
2530
peerTimer eventTimer
2631
messageStash map[int]Message
@@ -95,12 +100,10 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
95100
return err
96101
}
97102

98-
session.toSend = make(chan Message)
99103
session.sessionEvent = make(chan event)
100104
session.application = application
101105
session.stateTimer = eventTimer{Task: func() { session.sessionEvent <- needHeartbeat }}
102106
session.peerTimer = eventTimer{Task: func() { session.sessionEvent <- peerTimeout }}
103-
104107
application.OnCreate(session.sessionID)
105108
sessions.newSession <- session
106109

@@ -109,7 +112,7 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
109112

110113
//kicks off session as an initiator
111114
func (s *session) initiate(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
112-
s.currentState = logonState{}
115+
s.sessionState = logonState{}
113116
s.messageStash = make(map[int]Message)
114117
s.initiateLogon = true
115118

@@ -118,7 +121,7 @@ func (s *session) initiate(msgIn chan fixIn, msgOut chan []byte, quit chan bool)
118121

119122
//kicks off session as an acceptor
120123
func (s *session) accept(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
121-
s.currentState = logonState{}
124+
s.sessionState = logonState{}
122125
s.messageStash = make(map[int]Message)
123126

124127
s.run(msgIn, msgOut, quit)
@@ -161,6 +164,30 @@ func (s *session) resend(msg Message) {
161164
s.sendBytes(msg.rawMessage)
162165
}
163166

167+
//queueForSend will queue up a message to be sent by the session during the next iteration of the run loop
168+
func (s *session) queueForSend(msg Message) {
169+
s.sendMutex.Lock()
170+
defer s.sendMutex.Unlock()
171+
s.toSend = append(s.toSend, msg)
172+
}
173+
174+
//sends queued messages if session is logged on
175+
func (s *session) sendQueued() {
176+
if !s.IsLoggedOn() {
177+
return
178+
}
179+
180+
s.sendMutex.Lock()
181+
defer s.sendMutex.Unlock()
182+
183+
for _, msg := range s.toSend {
184+
s.send(msg)
185+
}
186+
187+
s.toSend = s.toSend[:0]
188+
}
189+
190+
//send should NOT be called outside of the run loop
164191
func (s *session) send(msg Message) {
165192
s.fillDefaultHeader(msg)
166193

@@ -183,7 +210,6 @@ func (s *session) send(msg Message) {
183210
}
184211

185212
func (s *session) sendBytes(msg []byte) {
186-
187213
s.log.OnOutgoing(string(msg))
188214
s.messageOut <- msg
189215
s.stateTimer.Reset(time.Duration(s.heartBeatTimeout))
@@ -469,13 +495,13 @@ type fixIn struct {
469495

470496
func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
471497
s.messageOut = msgOut
498+
472499
defer func() {
473500
close(s.messageOut)
474501
s.onDisconnect()
475502
}()
476503

477504
if s.initiateLogon {
478-
479505
if s.resetOnLogon {
480506
s.store.Reset()
481507
}
@@ -500,11 +526,13 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
500526

501527
for {
502528

503-
switch s.currentState.(type) {
529+
switch s.sessionState.(type) {
504530
case latentState:
505531
return
506532
}
507533

534+
s.sendQueued()
535+
508536
select {
509537
case fixIn, ok := <-msgIn:
510538
if ok {
@@ -513,21 +541,18 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
513541
s.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), fixIn.bytes)
514542
} else {
515543
msg.ReceiveTime = fixIn.receiveTime
516-
s.currentState = s.currentState.FixMsgIn(s, msg)
544+
s.sessionState = s.FixMsgIn(s, msg)
517545
}
518546
} else {
519547
return
520548
}
521549
s.peerTimer.Reset(time.Duration(int64(1.2 * float64(s.heartBeatTimeout))))
522550

523-
case msg := <-s.toSend:
524-
s.send(msg)
525-
526551
case <-quit:
527552
return
528553

529554
case evt := <-s.sessionEvent:
530-
s.currentState = s.currentState.Timeout(s, evt)
555+
s.sessionState = s.Timeout(s, evt)
531556
}
532557
}
533558
}

session_state.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
11
package quickfix
22

3+
import "fmt"
4+
5+
//sessionState is the current state of the session state machine. The session state determines how the session responds to
6+
//incoming messages, timeouts, and requests to send application messages.
37
type sessionState interface {
8+
//FixMsgIn is called by the session on incoming messages from the counter party. The return type is the next session state
9+
//following message processing
410
FixMsgIn(*session, Message) (nextState sessionState)
11+
12+
//Timeout is called by the session on a timeout event.
513
Timeout(*session, event) (nextState sessionState)
14+
15+
//IsLoggedOn returns true if state is logged on an in session, false otherwise
16+
IsLoggedOn() bool
17+
18+
//debugging convenience
19+
fmt.Stringer
620
}

session_state_internal_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package quickfix
2+
3+
import "testing"
4+
5+
func TestSessionState_IsLoggedOn(t *testing.T) {
6+
7+
var tests = []struct {
8+
sessionState
9+
expectIsLoggedOn bool
10+
}{
11+
{latentState{}, false},
12+
{logonState{}, false},
13+
{logoutState{}, false},
14+
{inSession{}, true},
15+
{resendState{}, true},
16+
}
17+
18+
for _, test := range tests {
19+
switch {
20+
case test.expectIsLoggedOn && !test.IsLoggedOn():
21+
t.Errorf("'%v' should be LoggedOn", test.sessionState)
22+
case !test.expectIsLoggedOn && test.IsLoggedOn():
23+
t.Errorf("'%v' should not be LoggedOn", test.sessionState)
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)