Skip to content

Commit c527fc8

Browse files
block messages during resend request
1 parent 41798d6 commit c527fc8

File tree

5 files changed

+60
-0
lines changed

5 files changed

+60
-0
lines changed

in_session.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
230230
return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo)
231231
}
232232

233+
session.resendMutex.Lock()
234+
session.resendRequestActive = true
235+
session.resendMutex.Unlock()
236+
237+
defer func() {
238+
session.resendMutex.Lock()
239+
session.resendRequestActive = false
240+
session.resendMutex.Unlock()
241+
session.resendCond.Broadcast()
242+
}()
243+
233244
seqNum := beginSeqNo
234245
nextSeqNum := seqNum
235246
msg := NewMessage()

in_session_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,42 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
373373
s.State(inSession{})
374374
}
375375

376+
func (s *InSessionTestSuite) TestSendBlockedWhenResendRequestActive() {
377+
s.MockApp.On("ToApp").Return(nil)
378+
379+
s.session.resendMutex.Lock()
380+
s.session.resendRequestActive = true
381+
s.session.resendMutex.Unlock()
382+
383+
sendCompleted := make(chan struct{})
384+
go func() {
385+
err := s.session.send(s.NewOrderSingle())
386+
s.Require().NoError(err)
387+
close(sendCompleted)
388+
}()
389+
390+
select {
391+
case <-sendCompleted:
392+
s.Fail("send should be blocked during active resend")
393+
case <-time.After(50 * time.Millisecond):
394+
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 0)
395+
}
396+
397+
s.session.resendMutex.Lock()
398+
s.session.resendRequestActive = false
399+
s.session.resendMutex.Unlock()
400+
s.session.resendCond.Broadcast()
401+
402+
select {
403+
case <-sendCompleted:
404+
s.LastToAppMessageSent()
405+
s.MessageType("D", s.MockApp.lastToApp)
406+
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
407+
case <-time.After(100 * time.Millisecond):
408+
s.Fail("send did not proceed after resend was cleared")
409+
}
410+
}
411+
376412
func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() {
377413
s.IncrNextTargetMsgSeqNum()
378414

quickfix_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package quickfix
1717

1818
import (
19+
"sync"
1920
"time"
2021

2122
"github.com/stretchr/testify/mock"
@@ -221,6 +222,7 @@ func (s *SessionSuiteRig) Init() {
221222
messageOut: s.Receiver.sendChannel,
222223
sessionEvent: make(chan internal.Event),
223224
}
225+
s.session.resendCond = sync.NewCond(&s.resendMutex)
224226
s.MaxLatency = 120 * time.Second
225227
}
226228

session.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type session struct {
4242
// Mutex for access to toSend.
4343
sendMutex sync.Mutex
4444

45+
resendRequestActive bool
46+
resendMutex sync.Mutex
47+
resendCond *sync.Cond
48+
4549
sessionEvent chan internal.Event
4650
messageEvent chan bool
4751
application Application
@@ -302,6 +306,12 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
302306
return s.queueForSend(msg)
303307
}
304308

309+
s.resendMutex.Lock()
310+
for s.resendRequestActive {
311+
s.resendCond.Wait()
312+
}
313+
s.resendMutex.Unlock()
314+
305315
s.sendMutex.Lock()
306316
defer s.sendMutex.Unlock()
307317

session_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (f sessionFactory) newSession(
8989
sessionID: sessionID,
9090
stopOnce: sync.Once{},
9191
}
92+
s.resendCond = sync.NewCond(&s.resendMutex)
9293

9394
var validatorSettings = defaultValidatorSettings
9495
if settings.HasSetting(config.ValidateFieldsOutOfOrder) {

0 commit comments

Comments
 (0)