Skip to content

Commit 6976a54

Browse files
committed
fix persist add store transaction
1 parent 7945e4b commit 6976a54

File tree

5 files changed

+76
-5
lines changed

5 files changed

+76
-5
lines changed

filestore.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
301301
return nil
302302
}
303303

304+
func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
305+
err := store.SaveMessage(seqNum, msg)
306+
if err != nil {
307+
return err
308+
}
309+
return store.IncrNextSenderMsgSeqNum()
310+
}
311+
304312
func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) {
305313
msgInfo, found := store.offsets[seqNum]
306314
if !found {

mongostore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,15 @@ func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) {
248248
return
249249
}
250250

251+
func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
252+
// TODO add transaction
253+
err := store.SaveMessage(seqNum, msg)
254+
if err != nil {
255+
return err
256+
}
257+
return store.IncrNextSenderMsgSeqNum()
258+
}
259+
251260
func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) {
252261
msgFilter := generateMessageFilter(&store.sessionID)
253262
//Marshal into database form

session.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -321,12 +321,10 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
321321

322322
func (s *session) persist(seqNum int, msgBytes []byte) error {
323323
if !s.DisableMessagePersist {
324-
if err := s.store.SaveMessage(seqNum, msgBytes); err != nil {
325-
return err
326-
}
324+
return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes)
325+
} else {
326+
return s.store.IncrNextSenderMsgSeqNum()
327327
}
328-
329-
return s.store.IncrNextSenderMsgSeqNum()
330328
}
331329

332330
func (s *session) sendQueued() {

sqlstore.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,53 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
274274
return err
275275
}
276276

277+
func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
278+
s := store.sessionID
279+
280+
tx, err := store.db.Begin()
281+
if err != nil {
282+
return err
283+
}
284+
defer tx.Rollback()
285+
286+
_, err = tx.Exec(sqlString(`INSERT INTO messages (
287+
msgseqnum, message,
288+
beginstring, session_qualifier,
289+
sendercompid, sendersubid, senderlocid,
290+
targetcompid, targetsubid, targetlocid)
291+
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
292+
seqNum, string(msg),
293+
s.BeginString, s.Qualifier,
294+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
295+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
296+
if err != nil {
297+
return err
298+
}
299+
300+
if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil {
301+
return errors.Wrap(err, "cache incr next")
302+
}
303+
304+
next := store.cache.NextSenderMsgSeqNum()
305+
_, err = store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
306+
WHERE beginstring=? AND session_qualifier=?
307+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
308+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
309+
next, s.BeginString, s.Qualifier,
310+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
311+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
312+
if err != nil {
313+
return err
314+
}
315+
316+
err = store.cache.SetNextSenderMsgSeqNum(next)
317+
if err != nil {
318+
return err
319+
}
320+
321+
return tx.Commit()
322+
}
323+
277324
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
278325
s := store.sessionID
279326
var msgs [][]byte

store.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type MessageStore interface {
2020
CreationTime() time.Time
2121

2222
SaveMessage(seqNum int, msg []byte) error
23+
SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error
2324
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
2425

2526
Refresh() error
@@ -97,6 +98,14 @@ func (store *memoryStore) SaveMessage(seqNum int, msg []byte) error {
9798
return nil
9899
}
99100

101+
func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error {
102+
err := store.SaveMessage(seqNum, msg)
103+
if err != nil {
104+
return err
105+
}
106+
return store.IncrNextSenderMsgSeqNum()
107+
}
108+
100109
func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
101110
var msgs [][]byte
102111
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {

0 commit comments

Comments
 (0)