Skip to content

Commit 928f1a8

Browse files
authored
Merge pull request #525 from taurusgroup/fix/persist-add-transaction-447
fix persist add store transaction fix #447
2 parents 04aa323 + dfc17b0 commit 928f1a8

File tree

5 files changed

+75
-3
lines changed

5 files changed

+75
-3
lines changed

filestore.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
301301
return nil
302302
}
303303

304+
func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
305+
err := store.SaveMessage(seqNum, msg)
306+
if err != nil {
307+
return err
308+
}
309+
return store.IncrNextSenderMsgSeqNum()
310+
}
311+
304312
func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) {
305313
msgInfo, found := store.offsets[seqNum]
306314
if !found {

mongostore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,15 @@ func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) {
251251
return
252252
}
253253

254+
func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
255+
// TODO add transaction
256+
err := store.SaveMessage(seqNum, msg)
257+
if err != nil {
258+
return err
259+
}
260+
return store.IncrNextSenderMsgSeqNum()
261+
}
262+
254263
func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) {
255264
msgFilter := generateMessageFilter(&store.sessionID)
256265
//Marshal into database form

session.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
321321

322322
func (s *session) persist(seqNum int, msgBytes []byte) error {
323323
if !s.DisableMessagePersist {
324-
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
325-
return err
326-
}
324+
return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes)
327325
}
328326

329327
return s.store.IncrNextSenderMsgSeqNum()

sqlstore.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,54 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
274274
return err
275275
}
276276

277+
func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
278+
s := store.sessionID
279+
280+
tx, err := store.db.Begin()
281+
if err != nil {
282+
return err
283+
}
284+
defer tx.Rollback()
285+
286+
_, err = tx.Exec(sqlString(`INSERT INTO messages (
287+
msgseqnum, message,
288+
beginstring, session_qualifier,
289+
sendercompid, sendersubid, senderlocid,
290+
targetcompid, targetsubid, targetlocid)
291+
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
292+
seqNum, string(msg),
293+
s.BeginString, s.Qualifier,
294+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
295+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
296+
if err != nil {
297+
return err
298+
}
299+
300+
next := store.cache.NextSenderMsgSeqNum() + 1
301+
_, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
302+
WHERE beginstring=? AND session_qualifier=?
303+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
304+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
305+
next, s.BeginString, s.Qualifier,
306+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
307+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
308+
if err != nil {
309+
return err
310+
}
311+
312+
err = tx.Commit()
313+
if err != nil {
314+
return err
315+
}
316+
317+
err = store.cache.SetNextSenderMsgSeqNum(next)
318+
if err != nil {
319+
return err
320+
}
321+
322+
return nil
323+
}
324+
277325
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
278326
s := store.sessionID
279327
var msgs [][]byte

store.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type MessageStore interface {
2020
CreationTime() time.Time
2121

2222
SaveMessage(seqNum int, msg []byte) error
23+
SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error
2324
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
2425

2526
Refresh() error
@@ -97,6 +98,14 @@ func (store *memoryStore) SaveMessage(seqNum int, msg []byte) error {
9798
return nil
9899
}
99100

101+
func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
102+
err := store.SaveMessage(seqNum, msg)
103+
if err != nil {
104+
return err
105+
}
106+
return store.IncrNextSenderMsgSeqNum()
107+
}
108+
100109
func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
101110
var msgs [][]byte
102111
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {

0 commit comments

Comments
 (0)