Skip to content

Commit 38d031b

Browse files
author
Chris Busbey
committed
more error handling in session code
1 parent d38ffc0 commit 38d031b

File tree

2 files changed

+92
-32
lines changed

2 files changed

+92
-32
lines changed

in_session.go

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,14 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio
4343
if err := session.handleLogon(msg); err != nil {
4444
return session.handleError(err)
4545
}
46+
47+
return
4648
case enum.MsgType_LOGOUT:
4749
session.log.OnEvent("Received logout request")
4850
session.log.OnEvent("Sending logout response")
49-
session.sendLogout("")
51+
if err := session.sendLogout(""); err != nil {
52+
return session.handleError(err)
53+
}
5054
nextState = latentState{}
5155
case enum.MsgType_TEST_REQUEST:
5256
return state.handleTestRequest(session, msg)
@@ -68,7 +72,9 @@ func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRej
6872
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
6973
switch string(msgType) {
7074
case enum.MsgType_LOGON:
71-
session.initiateLogout("")
75+
if err := session.initiateLogout(""); err != nil {
76+
return session.handleError(err)
77+
}
7278
return logoutState{}
7379
case enum.MsgType_LOGOUT:
7480
return latentState{}
@@ -131,7 +137,9 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS
131137
}
132138
case newSeqNo < expectedSeqNum:
133139
//FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess)
134-
session.doReject(msg, valueIsIncorrectNoTag())
140+
if err := session.doReject(msg, valueIsIncorrectNoTag()); err != nil {
141+
return session.handleError(err)
142+
}
135143
}
136144
}
137145
return state
@@ -162,18 +170,21 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS
162170
endSeqNo = expectedSeqNum - 1
163171
}
164172

165-
state.resendMessages(session, int(beginSeqNo), endSeqNo)
173+
if err := state.resendMessages(session, int(beginSeqNo), endSeqNo); err != nil {
174+
return session.handleError(err)
175+
}
176+
166177
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
167178
return session.handleError(err)
168179
}
169180
return state
170181
}
171182

172-
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) {
183+
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) (err error) {
173184
msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo)
174185
if err != nil {
175186
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
176-
panic(err)
187+
return
177188
}
178189

179190
seqNum := beginSeqNo
@@ -192,8 +203,10 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
192203
state.generateSequenceReset(session, seqNum, sentMessageSeqNum)
193204
}
194205

195-
session.resend(msg)
196206
session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
207+
if err = session.resend(msg); err != nil {
208+
return
209+
}
197210

198211
seqNum = sentMessageSeqNum + 1
199212
nextSeqNum = seqNum
@@ -202,6 +215,8 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
202215
if seqNum != nextSeqNum { // gapfill for catch-up
203216
state.generateSequenceReset(session, seqNum, nextSeqNum)
204217
}
218+
219+
return
205220
}
206221

207222
func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
@@ -210,7 +225,9 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
210225

211226
switch session.sessionState.(type) {
212227
default:
213-
session.doTargetTooHigh(TypedError)
228+
if err := session.doTargetTooHigh(TypedError); err != nil {
229+
return session.handleError(err)
230+
}
214231
case resendState:
215232
//assumes target too high reject already sent
216233
}
@@ -221,17 +238,27 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
221238
case targetTooLow:
222239
return state.doTargetTooLow(session, msg, TypedError)
223240
case incorrectBeginString:
224-
session.initiateLogout(rej.Error())
241+
if err := session.initiateLogout(rej.Error()); err != nil {
242+
session.handleError(err)
243+
}
225244
return logoutState{}
226245
}
227246

228247
switch rej.RejectReason() {
229248
case rejectReasonCompIDProblem, rejectReasonSendingTimeAccuracyProblem:
230-
session.doReject(msg, rej)
231-
session.initiateLogout("")
249+
if err := session.doReject(msg, rej); err != nil {
250+
return session.handleError(err)
251+
}
252+
253+
if err := session.initiateLogout(""); err != nil {
254+
return session.handleError(err)
255+
}
232256
return logoutState{}
233257
default:
234-
session.doReject(msg, rej)
258+
if err := session.doReject(msg, rej); err != nil {
259+
return session.handleError(err)
260+
}
261+
235262
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
236263
return session.handleError(err)
237264
}
@@ -245,26 +272,40 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT
245272

246273
origSendingTime := new(FIXUTCTimestamp)
247274
if err = msg.Header.GetField(tagOrigSendingTime, origSendingTime); err != nil {
248-
session.doReject(msg, RequiredTagMissing(tagOrigSendingTime))
275+
if rejErr := session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)); rejErr != nil {
276+
return session.handleError(rejErr)
277+
}
249278
return state
250279
}
251280

252281
sendingTime := new(FIXUTCTimestamp)
253282
msg.Header.GetField(tagSendingTime, sendingTime)
254283

255284
if sendingTime.Before(origSendingTime.Time) {
256-
session.doReject(msg, sendingTimeAccuracyProblem())
257-
session.initiateLogout("")
285+
if err := session.doReject(msg, sendingTimeAccuracyProblem()); err != nil {
286+
return session.handleError(err)
287+
}
288+
289+
if err := session.initiateLogout(""); err != nil {
290+
return session.handleError(err)
291+
}
258292
return logoutState{}
259293
}
260294

261295
if appReject := session.fromCallback(msg); appReject != nil {
262-
session.doReject(msg, appReject)
263-
session.initiateLogout("")
296+
if err := session.doReject(msg, appReject); err != nil {
297+
return session.handleError(err)
298+
}
299+
300+
if err := session.initiateLogout(""); err != nil {
301+
return session.handleError(err)
302+
}
264303
return logoutState{}
265304
}
266305
} else {
267-
session.initiateLogout(rej.Error())
306+
if err := session.initiateLogout(rej.Error()); err != nil {
307+
return session.handleError(err)
308+
}
268309
return logoutState{}
269310
}
270311

session.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (s *session) fillDefaultHeader(msg Message) {
182182
s.insertSendingTime(msg.Header)
183183
}
184184

185-
func (s *session) sendLogout(reason string) {
185+
func (s *session) sendLogout(reason string) error {
186186
logout := NewMessage()
187187
logout.Header.SetField(tagMsgType, FIXString("5"))
188188
logout.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString))
@@ -191,10 +191,10 @@ func (s *session) sendLogout(reason string) {
191191
if reason != "" {
192192
logout.Body.SetField(tagText, FIXString(reason))
193193
}
194-
s.send(logout)
194+
return s.send(logout)
195195
}
196196

197-
func (s *session) resend(msg Message) {
197+
func (s *session) resend(msg Message) error {
198198
msg.Header.SetField(tagPossDupFlag, FIXBoolean(true))
199199

200200
var origSendingTime FIXString
@@ -204,8 +204,12 @@ func (s *session) resend(msg Message) {
204204

205205
s.insertSendingTime(msg.Header)
206206

207-
msg.Build()
207+
if _, err := msg.Build(); err != nil {
208+
return err
209+
}
208210
s.sendBytes(msg.rawMessage)
211+
212+
return nil
209213
}
210214

211215
//send should NOT be called outside of the run loop
@@ -241,7 +245,7 @@ func (s *session) sendBytes(msg []byte) {
241245
s.stateTimer.Reset(time.Duration(s.heartBeatTimeout))
242246
}
243247

244-
func (s *session) doTargetTooHigh(reject targetTooHigh) {
248+
func (s *session) doTargetTooHigh(reject targetTooHigh) error {
245249
s.log.OnEventf("MsgSeqNum too high, expecting %v but received %v", reject.ExpectedTarget, reject.ReceivedTarget)
246250

247251
resend := NewMessage()
@@ -254,9 +258,13 @@ func (s *session) doTargetTooHigh(reject targetTooHigh) {
254258
}
255259
resend.Body.SetField(tagEndSeqNo, FIXInt(endSeqNum))
256260

257-
s.send(resend)
261+
if err := s.send(resend); err != nil {
262+
return err
263+
}
258264

259265
s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", reject.ExpectedTarget, endSeqNum)
266+
267+
return nil
260268
}
261269

262270
func (s *session) verifyLogon(msg Message) MessageRejectError {
@@ -287,6 +295,7 @@ func (s *session) verifyLogon(msg Message) MessageRejectError {
287295

288296
return s.verifyIgnoreSeqNumTooHigh(msg)
289297
}
298+
290299
return nil
291300
}
292301

@@ -329,18 +338,22 @@ func (s *session) handleLogon(msg Message) error {
329338
if err := s.checkTargetTooHigh(msg); err != nil {
330339
switch TypedError := err.(type) {
331340
case targetTooHigh:
332-
s.doTargetTooHigh(TypedError)
333-
return nil
341+
return s.doTargetTooHigh(TypedError)
334342
}
335343
}
336344

337345
return s.store.IncrNextTargetMsgSeqNum()
338346
}
339347

340-
func (s *session) initiateLogout(reason string) {
348+
func (s *session) initiateLogout(reason string) (err error) {
349+
if err = s.sendLogout(reason); err != nil {
350+
s.logError(err)
351+
return
352+
}
341353
s.log.OnEvent("Inititated logout request")
342-
s.sendLogout(reason)
343354
time.AfterFunc(time.Duration(2)*time.Second, func() { s.sessionEvent <- logoutTimeout })
355+
356+
return
344357
}
345358

346359
func (s *session) verify(msg Message) MessageRejectError {
@@ -474,7 +487,7 @@ func (s *session) checkBeginString(msg Message) MessageRejectError {
474487
return nil
475488
}
476489

477-
func (s *session) doReject(msg Message, rej MessageRejectError) {
490+
func (s *session) doReject(msg Message, rej MessageRejectError) error {
478491
reply := msg.reverseRoute()
479492

480493
if s.sessionID.BeginString >= enum.BeginStringFIX42 {
@@ -516,8 +529,8 @@ func (s *session) doReject(msg Message, rej MessageRejectError) {
516529
reply.Body.SetField(tagRefSeqNum, seqNum)
517530
}
518531

519-
s.send(reply)
520532
s.log.OnEventf("Message Rejected: %v", rej.Error())
533+
return s.send(reply)
521534
}
522535

523536
type fixIn struct {
@@ -553,7 +566,10 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
553566

554567
if s.initiateLogon {
555568
if s.resetOnLogon {
556-
s.store.Reset()
569+
if err := s.store.Reset(); err != nil {
570+
s.logError(err)
571+
return
572+
}
557573
}
558574

559575
logon := NewMessage()
@@ -632,7 +648,10 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
632648
case <-quit:
633649
quit = nil // prevent infinitly receiving on a closed channel
634650
if s.IsLoggedOn() {
635-
s.initiateLogout("")
651+
if err := s.initiateLogout(""); err != nil {
652+
s.logError(err)
653+
return
654+
}
636655
s.sessionState = logoutState{}
637656
} else {
638657
return

0 commit comments

Comments
 (0)