Skip to content

Commit 4d324d5

Browse files
add resend mutex and unit test for blocking sends during resend request
1 parent c527fc8 commit 4d324d5

File tree

2 files changed

+32
-42
lines changed

2 files changed

+32
-42
lines changed

in_session_test.go

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020
"time"
2121

22+
"github.com/stretchr/testify/mock"
2223
"github.com/stretchr/testify/suite"
2324

2425
"github.com/quickfixgo/quickfix/internal"
@@ -373,40 +374,29 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
373374
s.State(inSession{})
374375
}
375376

376-
func (s *InSessionTestSuite) TestSendBlockedWhenResendRequestActive() {
377+
func (s *InSessionTestSuite) TestFIXMsgInResendRequestBlocksSend() {
377378
s.MockApp.On("ToApp").Return(nil)
379+
s.Require().Nil(s.session.send(s.NewOrderSingle()))
380+
s.LastToAppMessageSent()
381+
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
382+
s.NextSenderMsgSeqNum(2)
378383

379-
s.session.resendMutex.Lock()
380-
s.session.resendRequestActive = true
381-
s.session.resendMutex.Unlock()
382-
383-
sendCompleted := make(chan struct{})
384-
go func() {
385-
err := s.session.send(s.NewOrderSingle())
386-
s.Require().NoError(err)
387-
close(sendCompleted)
388-
}()
389-
390-
select {
391-
case <-sendCompleted:
392-
s.Fail("send should be blocked during active resend")
393-
case <-time.After(50 * time.Millisecond):
394-
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 0)
395-
}
384+
s.MockStore.On("IterateMessages", mock.Anything, mock.Anything, mock.AnythingOfType("func([]byte) error")).
385+
Run(func(args mock.Arguments) {
386+
s.Require().Nil(s.session.send(s.NewOrderSingle()))
387+
}).
388+
Return(nil)
396389

397-
s.session.resendMutex.Lock()
398-
s.session.resendRequestActive = false
399-
s.session.resendMutex.Unlock()
400-
s.session.resendCond.Broadcast()
401-
402-
select {
403-
case <-sendCompleted:
404-
s.LastToAppMessageSent()
405-
s.MessageType("D", s.MockApp.lastToApp)
406-
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
407-
case <-time.After(100 * time.Millisecond):
408-
s.Fail("send did not proceed after resend was cleared")
409-
}
390+
s.MockApp.On("FromAdmin").Return(nil)
391+
go s.fixMsgIn(s.session, s.ResendRequest(1))
392+
393+
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
394+
s.NextSenderMsgSeqNum(2)
395+
396+
s.Require().Nil(s.session.send(s.NewOrderSingle()))
397+
s.LastToAppMessageSent()
398+
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2)
399+
s.NextSenderMsgSeqNum(3)
410400
}
411401

412402
func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() {

session.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type session struct {
4040
toSend [][]byte
4141

4242
// Mutex for access to toSend.
43-
sendMutex sync.Mutex
43+
sendMutex sync.RWMutex
4444

4545
resendRequestActive bool
4646
resendMutex sync.Mutex
@@ -275,8 +275,8 @@ func (s *session) resend(msg *Message) bool {
275275

276276
// queueForSend will validate, persist, and queue the message for send.
277277
func (s *session) queueForSend(msg *Message) error {
278-
s.sendMutex.Lock()
279-
defer s.sendMutex.Unlock()
278+
s.sendMutex.RLock()
279+
defer s.sendMutex.RUnlock()
280280

281281
msgBytes, err := s.prepMessageForSend(msg, nil)
282282
if err != nil {
@@ -312,8 +312,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
312312
}
313313
s.resendMutex.Unlock()
314314

315-
s.sendMutex.Lock()
316-
defer s.sendMutex.Unlock()
315+
s.sendMutex.RLock()
316+
defer s.sendMutex.RUnlock()
317317

318318
msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
319319
if err != nil {
@@ -328,8 +328,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
328328

329329
// dropAndReset will drop the send queue and reset the message store.
330330
func (s *session) dropAndReset() error {
331-
s.sendMutex.Lock()
332-
defer s.sendMutex.Unlock()
331+
s.sendMutex.RLock()
332+
defer s.sendMutex.RUnlock()
333333

334334
s.dropQueued()
335335
return s.store.Reset()
@@ -340,8 +340,8 @@ func (s *session) dropAndSend(msg *Message) error {
340340
return s.dropAndSendInReplyTo(msg, nil)
341341
}
342342
func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
343-
s.sendMutex.Lock()
344-
defer s.sendMutex.Unlock()
343+
s.sendMutex.RLock()
344+
defer s.sendMutex.RUnlock()
345345

346346
msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
347347
if err != nil {
@@ -423,8 +423,8 @@ func (s *session) dropQueued() {
423423
}
424424

425425
func (s *session) EnqueueBytesAndSend(msg []byte) {
426-
s.sendMutex.Lock()
427-
defer s.sendMutex.Unlock()
426+
s.sendMutex.RLock()
427+
defer s.sendMutex.RUnlock()
428428

429429
s.toSend = append(s.toSend, msg)
430430
s.sendQueued(true)

0 commit comments

Comments
 (0)