Skip to content

Commit 945c595

Browse files
committed
Add sql database impl of MessageStore
1 parent 635c648 commit 945c595

File tree

2 files changed

+320
-0
lines changed

2 files changed

+320
-0
lines changed

sqlstore.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
package quickfix
2+
3+
import (
4+
"database/sql"
5+
"time"
6+
7+
"github.com/quickfixgo/quickfix/config"
8+
)
9+
10+
type sqlStoreFactory struct {
11+
settings *SessionSettings
12+
}
13+
14+
type sqlStore struct {
15+
sessionID SessionID
16+
cache *memoryStore
17+
sqlDriver string
18+
sqlDataSourceName string
19+
db *sql.DB
20+
}
21+
22+
// NewSQLStoreFactory returns a sql-based implementation of MessageStoreFactory
23+
func NewSQLStoreFactory(settings *SessionSettings) MessageStoreFactory {
24+
return sqlStoreFactory{settings: settings}
25+
}
26+
27+
// Create creates a new SQLStore implementation of the MessageStore interface
28+
func (f sqlStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, err error) {
29+
sqlDriver, err := f.settings.Setting(config.SQLDriver)
30+
if err != nil {
31+
return nil, err
32+
}
33+
sqlDataSourceName, err := f.settings.Setting(config.SQLDataSourceName)
34+
if err != nil {
35+
return nil, err
36+
}
37+
return newSQLStore(sessionID, sqlDriver, sqlDataSourceName)
38+
}
39+
40+
func newSQLStore(sessionID SessionID, driver string, dataSourceName string) (store *sqlStore, err error) {
41+
store = &sqlStore{
42+
sessionID: sessionID,
43+
cache: &memoryStore{},
44+
sqlDriver: driver,
45+
sqlDataSourceName: dataSourceName,
46+
}
47+
48+
if store.db, err = sql.Open(store.sqlDriver, store.sqlDataSourceName); err != nil {
49+
return nil, err
50+
}
51+
if err = store.db.Ping(); err != nil { // ensure immediate connection
52+
return nil, err
53+
}
54+
if err = store.populateCache(); err != nil {
55+
return nil, err
56+
}
57+
58+
return store, nil
59+
}
60+
61+
// Reset deletes the store records and sets the seqnums back to 1
62+
func (store *sqlStore) Reset() error {
63+
s := store.sessionID
64+
_, err := store.db.Exec(`DELETE FROM messages
65+
WHERE beginstring=? AND session_qualifier=?
66+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
67+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
68+
s.BeginString, s.Qualifier,
69+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
70+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
71+
if err != nil {
72+
return err
73+
}
74+
75+
if err = store.cache.Reset(); err != nil {
76+
return err
77+
}
78+
79+
_, err = store.db.Exec(`UPDATE sessions
80+
SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=?
81+
WHERE beginstring=? AND session_qualifier=?
82+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
83+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
84+
store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(),
85+
s.BeginString, s.Qualifier,
86+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
87+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
88+
89+
return err
90+
}
91+
92+
// Refresh closes the store sqls and then reloads from them
93+
func (store *sqlStore) Refresh() error {
94+
if err := store.cache.Reset(); err != nil {
95+
return err
96+
}
97+
return store.populateCache()
98+
}
99+
100+
func (store *sqlStore) populateCache() (err error) {
101+
s := store.sessionID
102+
var creationTime time.Time
103+
var incomingSeqNum, outgoingSeqNum int
104+
row := store.db.QueryRow(`SELECT creation_time, incoming_seqnum, outgoing_seqnum
105+
FROM sessions
106+
WHERE beginstring=? AND session_qualifier=?
107+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
108+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
109+
s.BeginString, s.Qualifier,
110+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
111+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
112+
113+
err = row.Scan(&creationTime, &incomingSeqNum, &outgoingSeqNum)
114+
115+
// session record found, load it
116+
if err == nil {
117+
store.cache.creationTime = creationTime
118+
store.cache.SetNextTargetMsgSeqNum(incomingSeqNum)
119+
store.cache.SetNextSenderMsgSeqNum(outgoingSeqNum)
120+
return nil
121+
}
122+
123+
// fatal error, give up
124+
if err != sql.ErrNoRows {
125+
return err
126+
}
127+
128+
// session record not found, create it
129+
_, err = store.db.Exec(`INSERT INTO sessions (
130+
creation_time, incoming_seqnum, outgoing_seqnum,
131+
beginstring, session_qualifier,
132+
sendercompid, sendersubid, senderlocid,
133+
targetcompid, targetsubid, targetlocid)
134+
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
135+
store.cache.creationTime,
136+
store.cache.NextTargetMsgSeqNum(),
137+
store.cache.NextSenderMsgSeqNum(),
138+
s.BeginString, s.Qualifier,
139+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
140+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
141+
142+
return err
143+
}
144+
145+
// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent
146+
func (store *sqlStore) NextSenderMsgSeqNum() int {
147+
return store.cache.NextSenderMsgSeqNum()
148+
}
149+
150+
// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received
151+
func (store *sqlStore) NextTargetMsgSeqNum() int {
152+
return store.cache.NextTargetMsgSeqNum()
153+
}
154+
155+
// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent
156+
func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
157+
s := store.sessionID
158+
_, err := store.db.Exec(`UPDATE sessions SET outgoing_seqnum = ?
159+
WHERE beginstring=? AND session_qualifier=?
160+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
161+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
162+
next, s.BeginString, s.Qualifier,
163+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
164+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
165+
if err != nil {
166+
return err
167+
}
168+
return store.cache.SetNextSenderMsgSeqNum(next)
169+
}
170+
171+
// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received
172+
func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {
173+
s := store.sessionID
174+
_, err := store.db.Exec(`UPDATE sessions SET incoming_seqnum = ?
175+
WHERE beginstring=? AND session_qualifier=?
176+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
177+
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
178+
next, s.BeginString, s.Qualifier,
179+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
180+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
181+
if err != nil {
182+
return err
183+
}
184+
return store.cache.SetNextTargetMsgSeqNum(next)
185+
}
186+
187+
// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent
188+
func (store *sqlStore) IncrNextSenderMsgSeqNum() error {
189+
store.cache.IncrNextSenderMsgSeqNum()
190+
return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum())
191+
}
192+
193+
// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received
194+
func (store *sqlStore) IncrNextTargetMsgSeqNum() error {
195+
store.cache.IncrNextTargetMsgSeqNum()
196+
return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum())
197+
}
198+
199+
// CreationTime returns the creation time of the store
200+
func (store *sqlStore) CreationTime() time.Time {
201+
return store.cache.CreationTime()
202+
}
203+
204+
func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
205+
s := store.sessionID
206+
207+
_, err := store.db.Exec(`INSERT INTO messages (
208+
msgseqnum, message,
209+
beginstring, session_qualifier,
210+
sendercompid, sendersubid, senderlocid,
211+
targetcompid, targetsubid, targetlocid)
212+
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
213+
seqNum, string(msg),
214+
s.BeginString, s.Qualifier,
215+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
216+
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
217+
218+
return err
219+
}
220+
221+
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
222+
s := store.sessionID
223+
var msgs [][]byte
224+
rows, err := store.db.Query(`SELECT message FROM messages
225+
WHERE beginstring=? AND session_qualifier=?
226+
AND sendercompid=? AND sendersubid=? AND senderlocid=?
227+
AND targetcompid=? AND targetsubid=? AND targetlocid=?
228+
AND msgseqnum>=? AND msgseqnum<=?
229+
ORDER BY msgseqnum`,
230+
s.BeginString, s.Qualifier,
231+
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
232+
s.TargetCompID, s.TargetSubID, s.TargetLocationID,
233+
beginSeqNum, endSeqNum)
234+
if err != nil {
235+
return nil, err
236+
}
237+
defer rows.Close()
238+
239+
for rows.Next() {
240+
var message string
241+
if err := rows.Scan(&message); err != nil {
242+
return nil, err
243+
}
244+
msgs = append(msgs, []byte(message))
245+
}
246+
247+
if err := rows.Err(); err != nil {
248+
return nil, err
249+
}
250+
251+
return msgs, nil
252+
}
253+
254+
// Close closes the store's database connection
255+
func (store *sqlStore) Close() error {
256+
if store.db != nil {
257+
store.db.Close()
258+
store.db = nil
259+
}
260+
return nil
261+
}

sqlstore_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package quickfix
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"io/ioutil"
7+
"os"
8+
"path"
9+
"path/filepath"
10+
"testing"
11+
"time"
12+
13+
_ "github.com/mattn/go-sqlite3"
14+
"github.com/quickfixgo/quickfix/config"
15+
"github.com/stretchr/testify/require"
16+
"github.com/stretchr/testify/suite"
17+
)
18+
19+
// SqlStoreTestSuite runs all tests in the MessageStoreTestSuite against the SqlStore implementation
20+
type SQLStoreTestSuite struct {
21+
MessageStoreTestSuite
22+
sqlStoreRootPath string
23+
}
24+
25+
func (suite *SQLStoreTestSuite) SetupTest() {
26+
suite.sqlStoreRootPath = path.Join(os.TempDir(), fmt.Sprintf("SqlStoreTestSuite-%d", os.Getpid()))
27+
err := os.MkdirAll(suite.sqlStoreRootPath, os.ModePerm)
28+
require.Nil(suite.T(), err)
29+
sqlDriver := "sqlite3"
30+
sqlDsn := path.Join(suite.sqlStoreRootPath, fmt.Sprintf("%d.db", time.Now().UnixNano()))
31+
32+
// create tables
33+
db, err := sql.Open(sqlDriver, sqlDsn)
34+
require.Nil(suite.T(), err)
35+
ddlFnames, err := filepath.Glob(fmt.Sprintf("_sql/%s/*.sql", sqlDriver))
36+
require.Nil(suite.T(), err)
37+
for _, fname := range ddlFnames {
38+
sqlBytes, err := ioutil.ReadFile(fname)
39+
require.Nil(suite.T(), err)
40+
_, err = db.Exec(string(sqlBytes))
41+
require.Nil(suite.T(), err)
42+
}
43+
44+
// create store
45+
settings := NewSessionSettings()
46+
settings.Set(config.SQLDataSourceName, sqlDsn)
47+
settings.Set(config.SQLDriver, sqlDriver)
48+
suite.msgStore, err = NewSQLStoreFactory(settings).Create(SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"})
49+
require.Nil(suite.T(), err)
50+
}
51+
52+
func (suite *SQLStoreTestSuite) TearDownTest() {
53+
suite.msgStore.Close()
54+
os.RemoveAll(suite.sqlStoreRootPath)
55+
}
56+
57+
func TestSqlStoreTestSuite(t *testing.T) {
58+
suite.Run(t, new(SQLStoreTestSuite))
59+
}

0 commit comments

Comments
 (0)