Skip to content

Commit d37efec

Browse files
authored
Merge pull request #164 from bhaan/session-msg-handling
Session event loop
2 parents 3d85e3b + 7850dca commit d37efec

File tree

10 files changed

+189
-187
lines changed

10 files changed

+189
-187
lines changed

connection.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ func writeLoop(connection io.Writer, messageOut chan []byte) {
116116
}
117117

118118
func readLoop(parser *parser, msgIn chan fixIn) {
119-
defer func() {
120-
close(msgIn)
121-
}()
119+
defer close(msgIn)
122120

123121
for {
124122
msg, err := parser.ReadMessage()

in_session.go

Lines changed: 56 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,65 @@ type inSession struct {
1212
func (state inSession) String() string { return "In Session" }
1313
func (state inSession) IsLoggedOn() bool { return true }
1414

15+
func (state inSession) VerifyMsgIn(session *session, msg Message) (err MessageRejectError) {
16+
var msgType FIXString
17+
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
18+
switch string(msgType) {
19+
case enum.MsgType_LOGON:
20+
return session.verifyLogon(msg)
21+
case enum.MsgType_LOGOUT:
22+
return nil
23+
case enum.MsgType_RESEND_REQUEST:
24+
return session.verifyIgnoreSeqNumTooHighOrLow(msg)
25+
case enum.MsgType_SEQUENCE_RESET:
26+
var gapFillFlag FIXBoolean
27+
msg.Body.GetField(tagGapFillFlag, &gapFillFlag)
28+
return session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag))
29+
default:
30+
return session.verify(msg)
31+
}
32+
}
33+
return nil
34+
}
35+
1536
func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessionState) {
1637
var msgType FIXString
1738
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
1839
switch string(msgType) {
19-
//logon
20-
case "A":
21-
return state.handleLogon(session, msg)
22-
//logout
23-
case "5":
24-
return state.handleLogout(session, msg)
25-
//test request
26-
case "1":
40+
case enum.MsgType_LOGON:
41+
session.handleLogon(msg)
42+
return state
43+
case enum.MsgType_LOGOUT:
44+
session.log.OnEvent("Received logout request")
45+
session.log.OnEvent("Sending logout response")
46+
state.generateLogout(session)
47+
return latentState{}
48+
case enum.MsgType_TEST_REQUEST:
2749
return state.handleTestRequest(session, msg)
28-
//resend request
29-
case "2":
50+
case enum.MsgType_RESEND_REQUEST:
3051
return state.handleResendRequest(session, msg)
31-
//sequence reset
32-
case "4":
52+
case enum.MsgType_SEQUENCE_RESET:
3353
return state.handleSequenceReset(session, msg)
34-
default:
35-
if err := session.verify(msg); err != nil {
36-
return state.processReject(session, msg, err)
37-
}
3854
}
3955
}
4056

4157
session.store.IncrNextTargetMsgSeqNum()
42-
4358
return state
4459
}
4560

61+
func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
62+
var msgType FIXString
63+
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
64+
switch string(msgType) {
65+
case enum.MsgType_LOGON:
66+
return state.initiateLogout(session, "")
67+
case enum.MsgType_LOGOUT:
68+
return latentState{}
69+
}
70+
}
71+
return state.processReject(session, msg, rej)
72+
}
73+
4674
func (state inSession) Timeout(session *session, event event) (nextState sessionState) {
4775
switch event {
4876
case needHeartbeat:
@@ -60,30 +88,22 @@ func (state inSession) Timeout(session *session, event event) (nextState session
6088
return state
6189
}
6290

63-
func (state inSession) handleLogon(session *session, msg Message) (nextState sessionState) {
64-
if err := session.handleLogon(msg); err != nil {
65-
return state.initiateLogout(session, "")
91+
func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
92+
var testReq FIXString
93+
if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil {
94+
session.log.OnEvent("Test Request with no testRequestID")
95+
} else {
96+
heartBt := NewMessage()
97+
heartBt.Header.SetField(tagMsgType, FIXString("0"))
98+
heartBt.Body.SetField(tagTestReqID, testReq)
99+
session.send(heartBt)
66100
}
67101

102+
session.store.IncrNextTargetMsgSeqNum()
68103
return state
69104
}
70105

71-
func (state inSession) handleLogout(session *session, msg Message) (nextState sessionState) {
72-
session.log.OnEvent("Received logout request")
73-
session.log.OnEvent("Sending logout response")
74-
75-
state.generateLogout(session)
76-
return latentState{}
77-
}
78-
79106
func (state inSession) handleSequenceReset(session *session, msg Message) (nextState sessionState) {
80-
var gapFillFlag FIXBoolean
81-
msg.Body.GetField(tagGapFillFlag, &gapFillFlag)
82-
83-
if err := session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag)); err != nil {
84-
return state.processReject(session, msg, err)
85-
}
86-
87107
var newSeqNo FIXInt
88108
if err := msg.Body.GetField(tagNewSeqNo, &newSeqNo); err == nil {
89109
expectedSeqNum := FIXInt(session.store.NextTargetMsgSeqNum())
@@ -97,15 +117,10 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS
97117
session.doReject(msg, valueIsIncorrectNoTag())
98118
}
99119
}
100-
101120
return state
102121
}
103122

104123
func (state inSession) handleResendRequest(session *session, msg Message) (nextState sessionState) {
105-
if err := session.verifyIgnoreSeqNumTooHighOrLow(msg); err != nil {
106-
return state.processReject(session, msg, err)
107-
}
108-
109124
var err error
110125
var beginSeqNoField FIXInt
111126
if err = msg.Body.GetField(tagBeginSeqNo, &beginSeqNoField); err != nil {
@@ -167,26 +182,6 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
167182
}
168183
}
169184

170-
func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
171-
if err := session.verify(msg); err != nil {
172-
return state.processReject(session, msg, err)
173-
}
174-
175-
var testReq FIXString
176-
if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil {
177-
session.log.OnEvent("Test Request with no testRequestID")
178-
} else {
179-
heartBt := NewMessage()
180-
heartBt.Header.SetField(tagMsgType, FIXString("0"))
181-
heartBt.Body.SetField(tagTestReqID, testReq)
182-
session.send(heartBt)
183-
}
184-
185-
session.store.IncrNextTargetMsgSeqNum()
186-
187-
return state
188-
}
189-
190185
func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
191186
switch TypedError := rej.(type) {
192187
case targetTooHigh:
@@ -197,6 +192,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
197192
case resendState:
198193
//assumes target too high reject already sent
199194
}
195+
200196
session.messageStash[TypedError.ReceivedTarget] = msg
201197
return resendState{}
202198

latent_state.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,19 @@ type latentState struct{}
55
func (state latentState) String() string { return "Latent State" }
66
func (state latentState) IsLoggedOn() bool { return false }
77

8+
func (state latentState) VerifyMsgIn(session *session, msg Message) MessageRejectError {
9+
return InvalidMessageType()
10+
}
11+
812
func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
913
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
1014
return state
1115
}
1216

17+
func (state latentState) FixMsgInRej(session *session, msg Message, err MessageRejectError) (nextState sessionState) {
18+
return state.FixMsgIn(session, msg)
19+
}
20+
1321
func (state latentState) Timeout(*session, event) (nextState sessionState) {
1422
return state
1523
}

logon_state.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,40 @@
11
package quickfix
22

3+
import "github.com/quickfixgo/quickfix/enum"
4+
35
type logonState struct{}
46

57
func (state logonState) String() string { return "Logon State" }
68
func (s logonState) IsLoggedOn() bool { return false }
79

8-
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
10+
func (s logonState) VerifyMsgIn(session *session, msg Message) MessageRejectError {
911
var msgType FIXString
10-
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil && string(msgType) == "A" {
11-
if err := session.handleLogon(msg); err != nil {
12+
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
13+
return RequiredTagMissing(tagMsgType)
14+
}
15+
16+
switch string(msgType) {
17+
case enum.MsgType_LOGON:
18+
err := session.verifyLogon(msg)
19+
if err != nil {
1220
session.log.OnEvent(err.Error())
13-
return latentState{}
1421
}
22+
return err
23+
default:
24+
session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg)
25+
return InvalidMessageType()
26+
}
27+
}
1528

16-
return inSession{}
29+
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
30+
if err := session.handleLogon(msg); err != nil {
31+
session.log.OnEvent(err.Error())
32+
return latentState{}
1733
}
34+
return inSession{}
35+
}
1836

19-
session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg)
37+
func (s logonState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState {
2038
return latentState{}
2139
}
2240

logout_state.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,33 @@
11
package quickfix
22

3-
type logoutState struct {
4-
}
3+
import "github.com/quickfixgo/quickfix/enum"
4+
5+
type logoutState struct{}
56

67
func (state logoutState) String() string { return "Logout State" }
78
func (s logoutState) IsLoggedOn() bool { return false }
89

10+
func (state logoutState) VerifyMsgIn(session *session, msg Message) MessageRejectError { return nil }
11+
912
func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
1013
var msgType FIXString
1114
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
1215
return latentState{}
1316
}
1417

1518
switch string(msgType) {
16-
//logout
17-
case "5":
19+
case enum.MsgType_LOGOUT:
1820
session.log.OnEvent("Received logout response")
1921
return latentState{}
2022
default:
2123
return state
2224
}
2325
}
2426

27+
func (state logoutState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState {
28+
return state.FixMsgIn(session, msg)
29+
}
30+
2531
func (state logoutState) Timeout(session *session, event event) (nextState sessionState) {
2632
switch event {
2733
case logoutTimeout:

registry.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
4141
return err
4242
}
4343

44-
//NOTE: must queue for send here. otherwise, if not executed in same goroutine as session run loop,
45-
//message may be sent on closed channel or sent outside of valid state
46-
session.queueForSend(msg)
47-
48-
return nil
44+
request := sendRequest{msg, make(chan error)}
45+
session.toSend <- request
46+
return <-request.err
4947
}
5048

5149
type sessionActivate struct {

resend_state.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@ type resendState struct {
44
inSession
55
}
66

7+
func (state resendState) String() string { return "Resend" }
8+
79
func (state resendState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
8-
for ok := true; ok; {
9-
nextState = state.inSession.FixMsgIn(session, msg)
10+
return state.handleNextState(session, state.inSession.FixMsgIn(session, msg))
11+
}
1012

11-
if !nextState.IsLoggedOn() {
12-
return
13-
}
13+
func (state resendState) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
14+
return state.handleNextState(session, state.inSession.FixMsgInRej(session, msg, rej))
15+
}
1416

15-
msg, ok = session.messageStash[session.store.NextTargetMsgSeqNum()]
17+
func (state resendState) handleNextState(session *session, nextState sessionState) sessionState {
18+
if !nextState.IsLoggedOn() || len(session.messageStash) == 0 {
19+
return nextState
1620
}
1721

18-
if len(session.messageStash) != 0 {
19-
nextState = resendState{}
22+
targetSeqNum := session.store.NextTargetMsgSeqNum()
23+
if msg, ok := session.messageStash[targetSeqNum]; ok {
24+
delete(session.messageStash, targetSeqNum)
25+
session.resendIn <- msg
2026
}
2127

22-
return
28+
return resendState{}
2329
}

0 commit comments

Comments
 (0)