Skip to content

Commit cd98983

Browse files
authored
Merge pull request #176 from cbusbey/error_handling
Error handling around session code
2 parents a9e665c + 38d031b commit cd98983

File tree

3 files changed

+136
-46
lines changed

3 files changed

+136
-46
lines changed

in_session.go

Lines changed: 85 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,17 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio
4040
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
4141
switch string(msgType) {
4242
case enum.MsgType_LOGON:
43-
session.handleLogon(msg)
43+
if err := session.handleLogon(msg); err != nil {
44+
return session.handleError(err)
45+
}
46+
4447
return
4548
case enum.MsgType_LOGOUT:
4649
session.log.OnEvent("Received logout request")
4750
session.log.OnEvent("Sending logout response")
48-
session.sendLogout("")
51+
if err := session.sendLogout(""); err != nil {
52+
return session.handleError(err)
53+
}
4954
nextState = latentState{}
5055
case enum.MsgType_TEST_REQUEST:
5156
return state.handleTestRequest(session, msg)
@@ -56,7 +61,9 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio
5661
}
5762
}
5863

59-
session.store.IncrNextTargetMsgSeqNum()
64+
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
65+
return session.handleError(err)
66+
}
6067
return
6168
}
6269

@@ -65,7 +72,9 @@ func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRej
6572
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
6673
switch string(msgType) {
6774
case enum.MsgType_LOGON:
68-
session.initiateLogout("")
75+
if err := session.initiateLogout(""); err != nil {
76+
return session.handleError(err)
77+
}
6978
return logoutState{}
7079
case enum.MsgType_LOGOUT:
7180
return latentState{}
@@ -79,12 +88,16 @@ func (state inSession) Timeout(session *session, event event) (nextState session
7988
case needHeartbeat:
8089
heartBt := NewMessage()
8190
heartBt.Header.SetField(tagMsgType, FIXString("0"))
82-
session.send(heartBt)
91+
if err := session.send(heartBt); err != nil {
92+
return session.handleError(err)
93+
}
8394
case peerTimeout:
8495
testReq := NewMessage()
8596
testReq.Header.SetField(tagMsgType, FIXString("1"))
8697
testReq.Body.SetField(tagTestReqID, FIXString("TEST"))
87-
session.send(testReq)
98+
if err := session.send(testReq); err != nil {
99+
return session.handleError(err)
100+
}
88101
session.log.OnEvent("Sent test request TEST")
89102
session.peerTimer.Reset(time.Duration(int64(1.2 * float64(session.heartBeatTimeout))))
90103
return pendingTimeout{}
@@ -100,10 +113,14 @@ func (state inSession) handleTestRequest(session *session, msg Message) (nextSta
100113
heartBt := NewMessage()
101114
heartBt.Header.SetField(tagMsgType, FIXString("0"))
102115
heartBt.Body.SetField(tagTestReqID, testReq)
103-
session.send(heartBt)
116+
if err := session.send(heartBt); err != nil {
117+
return session.handleError(err)
118+
}
104119
}
105120

106-
session.store.IncrNextTargetMsgSeqNum()
121+
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
122+
return session.handleError(err)
123+
}
107124
return state
108125
}
109126

@@ -115,10 +132,14 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS
115132

116133
switch {
117134
case newSeqNo > expectedSeqNum:
118-
session.store.SetNextTargetMsgSeqNum(int(newSeqNo))
135+
if err := session.store.SetNextTargetMsgSeqNum(int(newSeqNo)); err != nil {
136+
return session.handleError(err)
137+
}
119138
case newSeqNo < expectedSeqNum:
120139
//FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess)
121-
session.doReject(msg, valueIsIncorrectNoTag())
140+
if err := session.doReject(msg, valueIsIncorrectNoTag()); err != nil {
141+
return session.handleError(err)
142+
}
122143
}
123144
}
124145
return state
@@ -149,16 +170,21 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS
149170
endSeqNo = expectedSeqNum - 1
150171
}
151172

152-
state.resendMessages(session, int(beginSeqNo), endSeqNo)
153-
session.store.IncrNextTargetMsgSeqNum()
173+
if err := state.resendMessages(session, int(beginSeqNo), endSeqNo); err != nil {
174+
return session.handleError(err)
175+
}
176+
177+
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
178+
return session.handleError(err)
179+
}
154180
return state
155181
}
156182

157-
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) {
183+
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) (err error) {
158184
msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo)
159185
if err != nil {
160186
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
161-
panic(err)
187+
return
162188
}
163189

164190
seqNum := beginSeqNo
@@ -177,8 +203,10 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
177203
state.generateSequenceReset(session, seqNum, sentMessageSeqNum)
178204
}
179205

180-
session.resend(msg)
181206
session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
207+
if err = session.resend(msg); err != nil {
208+
return
209+
}
182210

183211
seqNum = sentMessageSeqNum + 1
184212
nextSeqNum = seqNum
@@ -187,6 +215,8 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
187215
if seqNum != nextSeqNum { // gapfill for catch-up
188216
state.generateSequenceReset(session, seqNum, nextSeqNum)
189217
}
218+
219+
return
190220
}
191221

192222
func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
@@ -195,7 +225,9 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
195225

196226
switch session.sessionState.(type) {
197227
default:
198-
session.doTargetTooHigh(TypedError)
228+
if err := session.doTargetTooHigh(TypedError); err != nil {
229+
return session.handleError(err)
230+
}
199231
case resendState:
200232
//assumes target too high reject already sent
201233
}
@@ -206,18 +238,30 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
206238
case targetTooLow:
207239
return state.doTargetTooLow(session, msg, TypedError)
208240
case incorrectBeginString:
209-
session.initiateLogout(rej.Error())
241+
if err := session.initiateLogout(rej.Error()); err != nil {
242+
session.handleError(err)
243+
}
210244
return logoutState{}
211245
}
212246

213247
switch rej.RejectReason() {
214248
case rejectReasonCompIDProblem, rejectReasonSendingTimeAccuracyProblem:
215-
session.doReject(msg, rej)
216-
session.initiateLogout("")
249+
if err := session.doReject(msg, rej); err != nil {
250+
return session.handleError(err)
251+
}
252+
253+
if err := session.initiateLogout(""); err != nil {
254+
return session.handleError(err)
255+
}
217256
return logoutState{}
218257
default:
219-
session.doReject(msg, rej)
220-
session.store.IncrNextTargetMsgSeqNum()
258+
if err := session.doReject(msg, rej); err != nil {
259+
return session.handleError(err)
260+
}
261+
262+
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
263+
return session.handleError(err)
264+
}
221265
return state
222266
}
223267
}
@@ -228,26 +272,40 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT
228272

229273
origSendingTime := new(FIXUTCTimestamp)
230274
if err = msg.Header.GetField(tagOrigSendingTime, origSendingTime); err != nil {
231-
session.doReject(msg, RequiredTagMissing(tagOrigSendingTime))
275+
if rejErr := session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)); rejErr != nil {
276+
return session.handleError(rejErr)
277+
}
232278
return state
233279
}
234280

235281
sendingTime := new(FIXUTCTimestamp)
236282
msg.Header.GetField(tagSendingTime, sendingTime)
237283

238284
if sendingTime.Before(origSendingTime.Time) {
239-
session.doReject(msg, sendingTimeAccuracyProblem())
240-
session.initiateLogout("")
285+
if err := session.doReject(msg, sendingTimeAccuracyProblem()); err != nil {
286+
return session.handleError(err)
287+
}
288+
289+
if err := session.initiateLogout(""); err != nil {
290+
return session.handleError(err)
291+
}
241292
return logoutState{}
242293
}
243294

244295
if appReject := session.fromCallback(msg); appReject != nil {
245-
session.doReject(msg, appReject)
246-
session.initiateLogout("")
296+
if err := session.doReject(msg, appReject); err != nil {
297+
return session.handleError(err)
298+
}
299+
300+
if err := session.initiateLogout(""); err != nil {
301+
return session.handleError(err)
302+
}
247303
return logoutState{}
248304
}
249305
} else {
250-
session.initiateLogout(rej.Error())
306+
if err := session.initiateLogout(rej.Error()); err != nil {
307+
return session.handleError(err)
308+
}
251309
return logoutState{}
252310
}
253311

initiator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (i *Initiator) Start() error {
3434
return fmt.Errorf("error on SocketConnectPort: %v", err)
3535
}
3636

37-
var reconnectInterval int = 30 // Default configuration (in seconds)
37+
reconnectInterval := 30 // Default configuration (in seconds)
3838
if s.HasSetting(config.ReconnectInterval) {
3939
if reconnectInterval, err = s.IntSetting(config.ReconnectInterval); err != nil {
4040
return fmt.Errorf("error on ReconnectInterval: %v", err)

0 commit comments

Comments
 (0)