Skip to content

Commit bdae846

Browse files
author
Chris Busbey
committed
allow sends outside of session being logged on
1 parent 384c286 commit bdae846

File tree

5 files changed

+200
-64
lines changed

5 files changed

+200
-64
lines changed

pending_timeout_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,23 @@ import (
77
)
88

99
func TestPendingTimeout_SessionTimeout(t *testing.T) {
10-
session := &session{
11-
log: nullLog{},
12-
}
13-
1410
tests := []pendingTimeout{
1511
pendingTimeout{inSession{}},
1612
pendingTimeout{resendState{}},
1713
}
1814

1915
for _, state := range tests {
16+
session := &session{
17+
log: nullLog{},
18+
sessionState: state,
19+
}
20+
2021
nextState := state.Timeout(session, peerTimeout)
2122
assert.IsType(t, latentState{}, nextState)
2223
}
2324
}
2425

2526
func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) {
26-
session := &session{
27-
log: nullLog{},
28-
}
29-
3027
tests := []pendingTimeout{
3128
pendingTimeout{inSession{}},
3229
pendingTimeout{resendState{}},
@@ -35,6 +32,11 @@ func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) {
3532
testEvents := []event{needHeartbeat, logonTimeout, logoutTimeout}
3633

3734
for _, state := range tests {
35+
session := &session{
36+
log: nullLog{},
37+
sessionState: state,
38+
}
39+
3840
for _, event := range testEvents {
3941
nextState := state.Timeout(session, event)
4042
assert.Equal(t, state, nextState)

registry.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
3939
session, err := lookupSession(sessionID)
4040
if err != nil {
4141
return err
42-
} else if session.toSend == nil {
43-
return fmt.Errorf("Not logged on")
4442
}
4543

4644
request := sendRequest{msg, make(chan error)}

resend_state_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ func TestResendState_TimeoutPeerTimeout(t *testing.T) {
1616
<-otherEnd
1717
}()
1818

19+
state := resendState{}
1920
session := &session{
20-
store: new(memoryStore),
21-
application: new(TestClient),
22-
messageOut: otherEnd,
23-
log: nullLog{},
21+
store: new(memoryStore),
22+
application: new(TestClient),
23+
messageOut: otherEnd,
24+
log: nullLog{},
25+
sessionState: state,
2426
}
25-
state := resendState{}
27+
2628
nextState := state.Timeout(session, peerTimeout)
2729
assert.Equal(t, pendingTimeout{state}, nextState)
2830
}
@@ -33,13 +35,14 @@ func TestResendState_TimeoutUnchanged(t *testing.T) {
3335
<-otherEnd
3436
}()
3537

38+
state := resendState{}
3639
session := &session{
37-
store: new(memoryStore),
38-
application: new(TestClient),
39-
messageOut: otherEnd,
40-
log: nullLog{},
40+
store: new(memoryStore),
41+
application: new(TestClient),
42+
messageOut: otherEnd,
43+
log: nullLog{},
44+
sessionState: state,
4145
}
42-
state := resendState{}
4346

4447
tests := []event{needHeartbeat, logonTimeout, logoutTimeout}
4548

session.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
130130
return err
131131
}
132132

133+
session.toSend = make(chan sendRequest)
133134
session.sessionEvent = make(chan event)
134135
session.messageEvent = make(chan bool)
135136
session.application = application
@@ -212,6 +213,14 @@ func (s *session) resend(msg Message) error {
212213
return nil
213214
}
214215

216+
func (s *session) persist(seqNum int, msgBytes []byte) error {
217+
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
218+
return err
219+
}
220+
221+
return s.store.IncrNextSenderMsgSeqNum()
222+
}
223+
215224
//send should NOT be called outside of the run loop
216225
func (s *session) send(msg Message) (err error) {
217226
s.fillDefaultHeader(msg)
@@ -220,7 +229,11 @@ func (s *session) send(msg Message) (err error) {
220229
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
221230

222231
var msgType FIXString
223-
if msg.Header.GetField(tagMsgType, &msgType); isAdminMessageType(string(msgType)) {
232+
if err = msg.Header.GetField(tagMsgType, &msgType); err != nil {
233+
return err
234+
}
235+
236+
if isAdminMessageType(string(msgType)) {
224237
s.application.ToAdmin(msg, s.sessionID)
225238
} else {
226239
s.application.ToApp(msg, s.sessionID)
@@ -231,9 +244,19 @@ func (s *session) send(msg Message) (err error) {
231244
return
232245
}
233246

234-
if err = s.store.SaveMessage(seqNum, msgBytes); err == nil {
247+
if err = s.persist(seqNum, msgBytes); err != nil {
248+
return
249+
}
250+
251+
if s.IsLoggedOn() {
235252
s.sendBytes(msgBytes)
236-
err = s.store.IncrNextSenderMsgSeqNum()
253+
} else {
254+
switch msgType {
255+
case enum.MsgType_LOGON:
256+
fallthrough
257+
case enum.MsgType_RESEND_REQUEST:
258+
s.sendBytes(msgBytes)
259+
}
237260
}
238261

239262
return
@@ -546,7 +569,6 @@ type sendRequest struct {
546569
func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
547570
s.messageIn = msgIn
548571
s.messageOut = msgOut
549-
s.toSend = make(chan sendRequest)
550572
s.resendIn = make(chan Message, 1)
551573

552574
type fromCallback struct {
@@ -557,8 +579,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
557579

558580
defer func() {
559581
close(s.messageOut)
560-
close(s.toSend)
561-
s.toSend = nil
562582
s.stateTimer.Stop()
563583
s.peerTimer.Stop()
564584
s.onDisconnect()
@@ -615,11 +635,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
615635

616636
select {
617637
case request := <-s.toSend:
618-
if s.IsLoggedOn() {
619-
request.err <- s.send(request.msg)
620-
} else {
621-
request.err <- fmt.Errorf("Not logged on")
622-
}
638+
request.err <- s.send(request.msg)
623639
case fixIn, ok := <-msgIn:
624640
if ok {
625641
s.log.OnIncoming(string(fixIn.bytes))

session_test.go

Lines changed: 150 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66

77
"github.com/quickfixgo/quickfix/enum"
8+
"github.com/stretchr/testify/require"
9+
"github.com/stretchr/testify/suite"
810
)
911

1012
func buildMessage() Message {
@@ -279,50 +281,165 @@ func (e *TestClient) FromApp(msg Message, sessionID SessionID) (reject MessageRe
279281
return nil
280282
}
281283

282-
func TestSession_CheckToAdminCalled(t *testing.T) {
283-
app := new(TestClient)
284-
otherEnd := make(chan []byte)
285-
go func() {
286-
<-otherEnd
287-
}()
284+
type SessionSendTestSuite struct {
285+
suite.Suite
288286

289-
session := session{
287+
*TestClient
288+
session
289+
sendChannel chan []byte
290+
}
291+
292+
func (suite *SessionSendTestSuite) SetupTest() {
293+
suite.sendChannel = make(chan []byte, 1)
294+
suite.TestClient = new(TestClient)
295+
suite.session = session{
290296
store: new(memoryStore),
291-
application: app,
292-
messageOut: otherEnd,
297+
application: suite,
293298
log: nullLog{},
299+
messageOut: suite.sendChannel,
294300
}
295-
builder := buildMessage()
296-
builder.Header.SetField(tagMsgType, FIXString("A"))
297-
session.send(builder)
301+
}
302+
303+
func (suite *SessionSendTestSuite) SentMessage() (msg []byte) {
304+
select {
305+
case msg = <-suite.sendChannel:
306+
default:
307+
}
308+
return
309+
}
310+
311+
func TestSessionSendTestSuite(t *testing.T) {
312+
suite.Run(t, new(SessionSendTestSuite))
313+
}
314+
315+
func (suite *SessionSendTestSuite) SendNewOrderSingle() {
316+
msg := buildMessage()
317+
msg.Header.SetField(tagMsgType, FIXString("D"))
318+
require.Nil(suite.T(), suite.send(msg))
319+
}
320+
321+
func (suite *SessionSendTestSuite) SendHeartbeat() {
322+
msg := buildMessage()
323+
msg.Header.SetField(tagMsgType, FIXString("0"))
324+
require.Nil(suite.T(), suite.send(msg))
325+
}
326+
327+
func (suite *SessionSendTestSuite) SendLogon() {
328+
msg := buildMessage()
329+
msg.Header.SetField(tagMsgType, FIXString("A"))
330+
require.Nil(suite.T(), suite.send(msg))
331+
}
298332

299-
if app.adminCalled != 1 {
300-
t.Error("ToAdmin should have been called exactly once, instead was called", app.adminCalled, "times")
333+
func (suite *SessionSendTestSuite) MessagePersisted() {
334+
suite.Equal(2, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should be incremented")
335+
persistedMessages, err := suite.store.GetMessages(1, 1)
336+
suite.Nil(err)
337+
suite.Len(persistedMessages, 1, "The message should be persisted")
338+
}
339+
340+
func (suite *SessionSendTestSuite) TestSendAppMessageLoggedOn() {
341+
var tests = []struct {
342+
sessionState
343+
}{
344+
{inSession{}},
345+
{resendState{}},
346+
{pendingTimeout{inSession{}}},
347+
{pendingTimeout{resendState{}}},
301348
}
302-
if app.appCalled != 0 {
303-
t.Error("ToApp should not have been called, instead was called", app.appCalled, "times")
349+
350+
for _, test := range tests {
351+
suite.SetupTest()
352+
suite.session.sessionState = test.sessionState
353+
354+
suite.SendNewOrderSingle()
355+
356+
suite.Equal(1, suite.appCalled, "ToApp should be called")
357+
suite.MessagePersisted()
358+
suite.NotNil(suite.SentMessage(), "The message should have been sent")
304359
}
305360
}
306361

307-
func TestSession_CheckToAppCalled(t *testing.T) {
308-
app := new(TestClient)
309-
otherEnd := make(chan []byte)
310-
go func() {
311-
<-otherEnd
312-
}()
362+
func (suite *SessionSendTestSuite) TestSendAdminMessageLoggedOn() {
363+
var tests = []struct {
364+
sessionState
365+
}{
366+
{inSession{}},
367+
{resendState{}},
368+
{pendingTimeout{inSession{}}},
369+
{pendingTimeout{resendState{}}},
370+
}
313371

314-
session := session{
315-
store: new(memoryStore),
316-
application: app,
317-
messageOut: otherEnd,
318-
log: nullLog{}}
319-
builder := buildMessage()
320-
session.send(builder)
372+
for _, test := range tests {
373+
suite.SetupTest()
374+
suite.session.sessionState = test.sessionState
375+
376+
suite.SendHeartbeat()
321377

322-
if app.appCalled != 1 {
323-
t.Error("Toapp should have been called exactly once, instead was called", app.appCalled, "times")
378+
suite.Equal(1, suite.adminCalled, "ToAdmin should be called")
379+
suite.MessagePersisted()
380+
suite.NotNil(suite.SentMessage(), "The message should have been sent")
324381
}
325-
if app.adminCalled != 0 {
326-
t.Error("Toadmin should not have been called, instead was called", app.adminCalled, "times")
382+
}
383+
384+
func (suite *SessionSendTestSuite) TestSendAppMessageNotLoggedOn() {
385+
var tests = []struct {
386+
sessionState
387+
}{
388+
{logonState{}},
389+
{logoutState{}},
390+
{latentState{}},
391+
}
392+
393+
for _, test := range tests {
394+
suite.SetupTest()
395+
suite.session.sessionState = test.sessionState
396+
397+
suite.SendNewOrderSingle()
398+
399+
suite.Equal(1, suite.appCalled, "ToApp should be called even if the message is not sent")
400+
suite.MessagePersisted()
401+
suite.Nil(suite.SentMessage(), "The message should not be sent")
402+
}
403+
}
404+
405+
func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnNotSent() {
406+
var tests = []struct {
407+
sessionState
408+
}{
409+
{logonState{}},
410+
{logoutState{}},
411+
{latentState{}},
412+
}
413+
414+
for _, test := range tests {
415+
suite.SetupTest()
416+
suite.session.sessionState = test.sessionState
417+
418+
suite.SendHeartbeat()
419+
420+
suite.Equal(1, suite.adminCalled, "ToAdmin should be called even if the message is not sent")
421+
suite.MessagePersisted()
422+
suite.Nil(suite.SentMessage(), "The message should not be sent")
423+
}
424+
}
425+
426+
func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnIsSent() {
427+
var tests = []struct {
428+
sessionState
429+
}{
430+
{logonState{}},
431+
{logoutState{}},
432+
{latentState{}},
433+
}
434+
435+
for _, test := range tests {
436+
suite.SetupTest()
437+
suite.session.sessionState = test.sessionState
438+
439+
suite.SendLogon()
440+
441+
suite.Equal(1, suite.adminCalled, "ToAdmin should be called")
442+
suite.MessagePersisted()
443+
suite.NotNil(suite.SentMessage(), "The message should be sent")
327444
}
328445
}

0 commit comments

Comments
 (0)