Skip to content

Commit 7850dca

Browse files
committed
dedicated resend channel during resend state
1 parent 7126814 commit 7850dca

File tree

3 files changed

+20
-18
lines changed

3 files changed

+20
-18
lines changed

connection.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ func writeLoop(connection io.Writer, messageOut chan []byte) {
116116
}
117117

118118
func readLoop(parser *parser, msgIn chan fixIn) {
119-
defer func() {
120-
close(msgIn)
121-
}()
119+
defer close(msgIn)
122120

123121
for {
124122
msg, err := parser.ReadMessage()

resend_state.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@ func (state resendState) handleNextState(session *session, nextState sessionStat
2222
targetSeqNum := session.store.NextTargetMsgSeqNum()
2323
if msg, ok := session.messageStash[targetSeqNum]; ok {
2424
delete(session.messageStash, targetSeqNum)
25-
26-
// FIXME add a "resend" channel to the session loop to differentiate between
27-
// new incoming fix messages, and stashed messages from the resend state
28-
go func() { session.messageIn <- fixIn{msg.rawMessage, msg.ReceiveTime} }()
25+
session.resendIn <- msg
2926
}
3027

3128
return resendState{}

session.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type session struct {
1919
messageOut chan []byte
2020
messageIn chan fixIn
2121
toSend chan sendRequest
22+
resendIn chan Message
2223

2324
sessionEvent chan event
2425
messageEvent chan bool
@@ -495,6 +496,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
495496
s.messageIn = msgIn
496497
s.messageOut = msgOut
497498
s.toSend = make(chan sendRequest)
499+
s.resendIn = make(chan Message, 1)
498500

499501
type fromCallback struct {
500502
msg Message
@@ -505,7 +507,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
505507
defer func() {
506508
close(s.messageOut)
507509
close(s.toSend)
508-
close(fromCallbackCh)
509510
s.onDisconnect()
510511
}()
511512

@@ -532,6 +533,19 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
532533
s.send(logon)
533534
}
534535

536+
fixMsgIn := func(msg Message) {
537+
if rej := s.sessionState.VerifyMsgIn(s, msg); rej != nil {
538+
s.sessionState = s.sessionState.FixMsgInRej(s, msg, rej)
539+
} else {
540+
// "turn off" incoming fix messages until the call
541+
// to FromAdmin/App returns
542+
msgIn = nil
543+
go func() {
544+
fromCallbackCh <- fromCallback{msg, s.fromCallback(msg)}
545+
}()
546+
}
547+
}
548+
535549
for {
536550

537551
switch s.sessionState.(type) {
@@ -553,21 +567,14 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
553567
s.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), fixIn.bytes)
554568
} else {
555569
msg.ReceiveTime = fixIn.receiveTime
556-
if rej := s.sessionState.VerifyMsgIn(s, msg); rej != nil {
557-
s.sessionState = s.sessionState.FixMsgInRej(s, msg, rej)
558-
} else {
559-
// "turn off" incoming fix messages until the call
560-
// to FromAdmin/App returns
561-
msgIn = nil
562-
go func() {
563-
fromCallbackCh <- fromCallback{msg, s.fromCallback(msg)}
564-
}()
565-
}
570+
fixMsgIn(msg)
566571
}
567572
} else {
568573
return
569574
}
570575
s.peerTimer.Reset(time.Duration(int64(1.2 * float64(s.heartBeatTimeout))))
576+
case msg := <-s.resendIn:
577+
fixMsgIn(msg)
571578
case callback := <-fromCallbackCh:
572579
// "turn on" incoming fix message now that
573580
// FromAdmin/App has completed

0 commit comments

Comments
 (0)