Skip to content

Commit ee21dfe

Browse files
authored
Merge pull request #358 from cbusbey/session_reset
Initiator ResetSeqNumFlag fix
2 parents 5bcdd16 + c24ccbb commit ee21dfe

File tree

5 files changed

+99
-40
lines changed

5 files changed

+99
-40
lines changed

logon_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS
2727
session.log.OnEvent(err.Text)
2828
logout := session.buildLogout(err.Text)
2929

30-
if err := session.dropAndSendInReplyTo(logout, false, msg); err != nil {
30+
if err := session.dropAndSendInReplyTo(logout, msg); err != nil {
3131
session.logError(err)
3232
}
3333

mongostore.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package quickfix
22

33
import (
44
"fmt"
5+
"time"
6+
57
"github.com/globalsign/mgo"
68
"github.com/globalsign/mgo/bson"
7-
"time"
89

910
"github.com/quickfixgo/quickfix/config"
1011
)
@@ -18,7 +19,7 @@ type mongoStoreFactory struct {
1819
type mongoStore struct {
1920
sessionID SessionID
2021
cache *memoryStore
21-
mongoUrl string
22+
mongoURL string
2223
mongoDatabase string
2324
db *mgo.Session
2425
messagesCollection string
@@ -45,29 +46,29 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e
4546
if !ok {
4647
return nil, fmt.Errorf("unknown session: %v", sessionID)
4748
}
48-
mongoConnectionUrl, err := sessionSettings.Setting(config.MongoStoreConnection)
49+
mongoConnectionURL, err := sessionSettings.Setting(config.MongoStoreConnection)
4950
if err != nil {
5051
return nil, err
5152
}
5253
mongoDatabase, err := sessionSettings.Setting(config.MongoStoreDatabase)
5354
if err != nil {
5455
return nil, err
5556
}
56-
return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection)
57+
return newMongoStore(sessionID, mongoConnectionURL, mongoDatabase, f.messagesCollection, f.sessionsCollection)
5758
}
5859

59-
func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) {
60+
func newMongoStore(sessionID SessionID, mongoURL string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) {
6061
store = &mongoStore{
6162
sessionID: sessionID,
6263
cache: &memoryStore{},
63-
mongoUrl: mongoUrl,
64+
mongoURL: mongoURL,
6465
mongoDatabase: mongoDatabase,
6566
messagesCollection: messagesCollection,
6667
sessionsCollection: sessionsCollection,
6768
}
6869
store.cache.Reset()
6970

70-
if store.db, err = mgo.Dial(mongoUrl); err != nil {
71+
if store.db, err = mgo.Dial(mongoURL); err != nil {
7172
return
7273
}
7374
err = store.populateCache()
@@ -79,12 +80,12 @@ func generateMessageFilter(s *SessionID) (messageFilter *mongoQuickFixEntryData)
7980
messageFilter = &mongoQuickFixEntryData{
8081
BeginString: s.BeginString,
8182
SessionQualifier: s.Qualifier,
82-
SenderCompId: s.SenderCompID,
83-
SenderSubId: s.SenderSubID,
84-
SenderLocId: s.SenderLocationID,
85-
TargetCompId: s.TargetCompID,
86-
TargetSubId: s.TargetSubID,
87-
TargetLocId: s.TargetLocationID,
83+
SenderCompID: s.SenderCompID,
84+
SenderSubID: s.SenderSubID,
85+
SenderLocID: s.SenderLocationID,
86+
TargetCompID: s.TargetCompID,
87+
TargetSubID: s.TargetSubID,
88+
TargetLocID: s.TargetLocationID,
8889
}
8990
return
9091
}
@@ -100,12 +101,12 @@ type mongoQuickFixEntryData struct {
100101
//Indexed data
101102
BeginString string `bson:"begin_string"`
102103
SessionQualifier string `bson:"session_qualifier"`
103-
SenderCompId string `bson:"sender_comp_id"`
104-
SenderSubId string `bson:"sender_sub_id"`
105-
SenderLocId string `bson:"sender_loc_id"`
106-
TargetCompId string `bson:"target_comp_id"`
107-
TargetSubId string `bson:"target_sub_id"`
108-
TargetLocId string `bson:"target_loc_id"`
104+
SenderCompID string `bson:"sender_comp_id"`
105+
SenderSubID string `bson:"sender_sub_id"`
106+
SenderLocID string `bson:"sender_loc_id"`
107+
TargetCompID string `bson:"target_comp_id"`
108+
TargetSubID string `bson:"target_sub_id"`
109+
TargetLocID string `bson:"target_loc_id"`
109110
}
110111

111112
// Reset deletes the store records and sets the seqnums back to 1

session.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,20 @@ func (s *session) fillDefaultHeader(msg *Message, inReplyTo *Message) {
133133
}
134134
}
135135

136-
func (s *session) sendLogon(resetStore, setResetSeqNum bool) error {
137-
return s.sendLogonInReplyTo(resetStore, setResetSeqNum, nil)
136+
func (s *session) shouldSendReset() bool {
137+
if s.sessionID.BeginString < BeginStringFIX41 {
138+
return false
139+
}
140+
141+
return (s.ResetOnLogon || s.ResetOnDisconnect || s.ResetOnLogout) &&
142+
s.store.NextTargetMsgSeqNum() == 1 && s.store.NextSenderMsgSeqNum() == 1
143+
}
144+
145+
func (s *session) sendLogon() error {
146+
return s.sendLogonInReplyTo(s.shouldSendReset(), nil)
138147
}
139148

140-
func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo *Message) error {
149+
func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) error {
141150
logon := NewMessage()
142151
logon.Header.SetField(tagMsgType, FIXString("A"))
143152
logon.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString))
@@ -154,7 +163,7 @@ func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo
154163
logon.Body.SetField(tagDefaultApplVerID, FIXString(s.DefaultApplVerID))
155164
}
156165

157-
if err := s.dropAndSendInReplyTo(logon, resetStore, inReplyTo); err != nil {
166+
if err := s.dropAndSendInReplyTo(logon, inReplyTo); err != nil {
158167
return err
159168
}
160169

@@ -248,20 +257,14 @@ func (s *session) dropAndReset() error {
248257
return s.store.Reset()
249258
}
250259

251-
//dropAndSend will optionally reset the store, validate and persist the message, then drops the send queue and sends the message.
252-
func (s *session) dropAndSend(msg *Message, resetStore bool) error {
253-
return s.dropAndSendInReplyTo(msg, resetStore, nil)
260+
//dropAndSend will validate and persist the message, then drops the send queue and sends the message.
261+
func (s *session) dropAndSend(msg *Message) error {
262+
return s.dropAndSendInReplyTo(msg, nil)
254263
}
255-
func (s *session) dropAndSendInReplyTo(msg *Message, resetStore bool, inReplyTo *Message) error {
264+
func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
256265
s.sendMutex.Lock()
257266
defer s.sendMutex.Unlock()
258267

259-
if resetStore {
260-
if err := s.store.Reset(); err != nil {
261-
return err
262-
}
263-
}
264-
265268
msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
266269
if err != nil {
267270
return err
@@ -304,7 +307,6 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
304307
seqNum = s.store.NextSenderMsgSeqNum()
305308
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
306309
}
307-
308310
}
309311
} else {
310312
if err = s.application.ToApp(msg, s.sessionID); err != nil {
@@ -437,7 +439,7 @@ func (s *session) handleLogon(msg *Message) error {
437439
}
438440

439441
s.log.OnEvent("Responding to logon request")
440-
if err := s.sendLogonInReplyTo(resetStore, resetSeqNumFlag.Bool(), msg); err != nil {
442+
if err := s.sendLogonInReplyTo(resetSeqNumFlag.Bool(), msg); err != nil {
441443
return err
442444
}
443445
}

session_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (sm *stateMachine) Connect(session *session) {
3737
}
3838

3939
session.log.OnEvent("Sending logon request")
40-
if err := session.sendLogon(false, false); err != nil {
40+
if err := session.sendLogon(); err != nil {
4141
session.logError(err)
4242
return
4343
}

session_test.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,61 @@ func (s *SessionSuite) TestCheckTargetTooLow() {
228228
s.Nil(s.session.checkTargetTooLow(msg))
229229
}
230230

231+
func (s *SessionSuite) TestShouldSendReset() {
232+
var tests = []struct {
233+
BeginString string
234+
ResetOnLogon bool
235+
ResetOnDisconnect bool
236+
ResetOnLogout bool
237+
NextSenderMsgSeqNum int
238+
NextTargetMsgSeqNum int
239+
Expected bool
240+
}{
241+
{BeginStringFIX40, true, false, false, 1, 1, false}, //ResetSeqNumFlag not available < fix41
242+
243+
{BeginStringFIX41, true, false, false, 1, 1, true}, //session must be configured to reset on logon
244+
{BeginStringFIX42, true, false, false, 1, 1, true},
245+
{BeginStringFIX43, true, false, false, 1, 1, true},
246+
{BeginStringFIX44, true, false, false, 1, 1, true},
247+
{BeginStringFIXT11, true, false, false, 1, 1, true},
248+
249+
{BeginStringFIX41, false, true, false, 1, 1, true}, //or disconnect
250+
{BeginStringFIX42, false, true, false, 1, 1, true},
251+
{BeginStringFIX43, false, true, false, 1, 1, true},
252+
{BeginStringFIX44, false, true, false, 1, 1, true},
253+
{BeginStringFIXT11, false, true, false, 1, 1, true},
254+
255+
{BeginStringFIX41, false, false, true, 1, 1, true}, //or logout
256+
{BeginStringFIX42, false, false, true, 1, 1, true},
257+
{BeginStringFIX43, false, false, true, 1, 1, true},
258+
{BeginStringFIX44, false, false, true, 1, 1, true},
259+
{BeginStringFIXT11, false, false, true, 1, 1, true},
260+
261+
{BeginStringFIX41, true, true, false, 1, 1, true}, //or combo
262+
{BeginStringFIX42, false, true, true, 1, 1, true},
263+
{BeginStringFIX43, true, false, true, 1, 1, true},
264+
{BeginStringFIX44, true, true, true, 1, 1, true},
265+
266+
{BeginStringFIX41, false, false, false, 1, 1, false}, //or will not be set
267+
268+
{BeginStringFIX41, true, false, false, 1, 10, false}, //session seq numbers should be reset at the time of check
269+
{BeginStringFIX42, true, false, false, 2, 1, false},
270+
{BeginStringFIX43, true, false, false, 14, 100, false},
271+
}
272+
273+
for _, test := range tests {
274+
s.session.sessionID.BeginString = test.BeginString
275+
s.session.ResetOnLogon = test.ResetOnLogon
276+
s.session.ResetOnDisconnect = test.ResetOnDisconnect
277+
s.session.ResetOnLogout = test.ResetOnLogout
278+
279+
s.MockStore.SetNextSenderMsgSeqNum(test.NextSenderMsgSeqNum)
280+
s.MockStore.SetNextTargetMsgSeqNum(test.NextTargetMsgSeqNum)
281+
282+
s.Equal(s.shouldSendReset(), test.Expected)
283+
}
284+
}
285+
231286
func (s *SessionSuite) TestCheckSessionTimeNoStartTimeEndTime() {
232287
var tests = []struct {
233288
before, after sessionState
@@ -851,7 +906,7 @@ func (suite *SessionSendTestSuite) TestSendDisableMessagePersist() {
851906

852907
func (suite *SessionSendTestSuite) TestDropAndSendAdminMessage() {
853908
suite.MockApp.On("ToAdmin")
854-
suite.Require().Nil(suite.dropAndSend(suite.Heartbeat(), false))
909+
suite.Require().Nil(suite.dropAndSend(suite.Heartbeat()))
855910
suite.MockApp.AssertExpectations(suite.T())
856911

857912
suite.MessagePersisted(suite.MockApp.lastToAdmin)
@@ -868,7 +923,7 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueue() {
868923
suite.NoMessageSent()
869924

870925
suite.MockApp.On("ToAdmin")
871-
require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), false))
926+
require.Nil(suite.T(), suite.dropAndSend(suite.Logon()))
872927
suite.MockApp.AssertExpectations(suite.T())
873928

874929
msg := suite.MockApp.lastToAdmin
@@ -889,7 +944,8 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueueWithReset() {
889944
suite.NoMessageSent()
890945

891946
suite.MockApp.On("ToAdmin")
892-
require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), true))
947+
suite.MockStore.Reset()
948+
require.Nil(suite.T(), suite.dropAndSend(suite.Logon()))
893949
suite.MockApp.AssertExpectations(suite.T())
894950
msg := suite.MockApp.lastToAdmin
895951

0 commit comments

Comments
 (0)