Skip to content

Commit d03612a

Browse files
committed
Alter GetMessages return value.
Instead of returning a chan of messages, return a slice of messages. Simplifies error reporting (in preparation for file-based store impl). Refactor tests into a suite that can be reused by other impls (e.g. fileStore).
1 parent 092ebbc commit d03612a

File tree

3 files changed

+159
-142
lines changed

3 files changed

+159
-142
lines changed

in_session.go

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package quickfix
22

33
import (
4-
"github.com/quickfixgo/quickfix/enum"
54
"time"
5+
6+
"github.com/quickfixgo/quickfix/enum"
67
)
78

89
type inSession struct {
@@ -135,43 +136,34 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS
135136
}
136137

137138
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) {
138-
msgs := session.store.GetMessages(beginSeqNo, endSeqNo)
139+
msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo)
140+
if err != nil {
141+
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
142+
panic(err)
143+
}
139144

140145
seqNum := beginSeqNo
141146
nextSeqNum := seqNum
142-
143-
var msgBytes []byte
144-
var ok bool
145-
for {
146-
if msgBytes, ok = <-msgs; !ok {
147-
//gapfill for catch-up
148-
if seqNum != nextSeqNum {
149-
state.generateSequenceReset(session, seqNum, nextSeqNum)
150-
}
151-
152-
return
153-
}
154-
147+
for _, msgBytes := range msgs {
155148
msg, _ := parseMessage(msgBytes)
149+
msgType, _ := msg.Header.GetString(tagMsgType)
150+
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
156151

157-
var msgType FIXString
158-
msg.Header.GetField(tagMsgType, &msgType)
159-
160-
var sentMessageSeqNum FIXInt
161-
msg.Header.GetField(tagMsgSeqNum, &sentMessageSeqNum)
162-
163-
if isAdminMessageType(string(msgType)) {
164-
nextSeqNum = int(sentMessageSeqNum) + 1
165-
} else {
166-
167-
if seqNum != int(sentMessageSeqNum) {
168-
state.generateSequenceReset(session, seqNum, int(sentMessageSeqNum))
169-
}
152+
if isAdminMessageType(msgType) {
153+
nextSeqNum = sentMessageSeqNum + 1
154+
continue
155+
}
170156

171-
session.resend(msg)
172-
seqNum = int(sentMessageSeqNum) + 1
173-
nextSeqNum = seqNum
157+
if seqNum != sentMessageSeqNum {
158+
state.generateSequenceReset(session, seqNum, sentMessageSeqNum)
174159
}
160+
session.resend(msg)
161+
seqNum = sentMessageSeqNum + 1
162+
nextSeqNum = seqNum
163+
}
164+
165+
if seqNum != nextSeqNum { // gapfill for catch-up
166+
state.generateSequenceReset(session, seqNum, nextSeqNum)
175167
}
176168
}
177169

store.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ type MessageStore interface {
1515

1616
CreationTime() time.Time
1717

18-
SaveMessage(seqNum int, msg []byte)
19-
GetMessages(beginSeqNum, endSeqNum int) chan []byte
18+
SaveMessage(seqNum int, msg []byte) error
19+
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
2020

2121
Refresh() error
2222
Reset() error
@@ -84,27 +84,23 @@ func (store *memoryStore) Close() error {
8484
return nil
8585
}
8686

87-
func (store *memoryStore) SaveMessage(seqNum int, msg []byte) {
87+
func (store *memoryStore) SaveMessage(seqNum int, msg []byte) error {
8888
if store.messageMap == nil {
8989
store.messageMap = make(map[int][]byte)
9090
}
9191

9292
store.messageMap[seqNum] = msg
93+
return nil
9394
}
9495

95-
func (store memoryStore) GetMessages(beginSeqNum, endSeqNum int) chan []byte {
96-
msgs := make(chan []byte)
97-
98-
go func() {
99-
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
100-
if msg, ok := store.messageMap[seqNum]; ok {
101-
msgs <- msg
102-
}
96+
func (store memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
97+
var msgs [][]byte
98+
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
99+
if m, ok := store.messageMap[seqNum]; ok {
100+
msgs = append(msgs, m)
103101
}
104-
close(msgs)
105-
}()
106-
107-
return msgs
102+
}
103+
return msgs, nil
108104
}
109105

110106
type memoryStoreFactory struct{}

store_test.go

Lines changed: 125 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,142 @@
11
package quickfix
22

33
import (
4-
"bytes"
54
"testing"
65
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
"github.com/stretchr/testify/suite"
710
)
811

9-
func TestMemoryStore_IncMsgSeqNum(t *testing.T) {
10-
store, _ := NewMemoryStoreFactory().Create(SessionID{})
12+
// MessageStoreTestSuite is the suite of all tests that should be run against all MessageStore implementations
13+
type MessageStoreTestSuite struct {
14+
suite.Suite
15+
msgStore MessageStore
16+
}
1117

12-
var testCases = []struct {
13-
expectedNextSeqNum int
14-
}{
15-
{1}, {2}, {3},
16-
}
18+
// MemoryStoreTestSuite runs all tests in the MessageStoreTestSuite against the MemoryStore implementation
19+
type MemoryStoreTestSuite struct {
20+
MessageStoreTestSuite
21+
}
1722

18-
for _, tc := range testCases {
23+
func (suite *MemoryStoreTestSuite) SetupTest() {
24+
var err error
25+
suite.msgStore, err = NewMemoryStoreFactory().Create(SessionID{})
26+
require.Nil(suite.T(), err)
27+
}
1928

20-
if store.NextSenderMsgSeqNum() != tc.expectedNextSeqNum {
21-
t.Errorf("Did not get expected sender seq num %v, got %v", tc.expectedNextSeqNum, store.NextSenderMsgSeqNum())
22-
}
29+
func TestMemoryStoreTestSuite(t *testing.T) {
30+
suite.Run(t, new(MemoryStoreTestSuite))
31+
}
2332

24-
store.IncrNextSenderMsgSeqNum()
33+
func (suite *MessageStoreTestSuite) TestMessageStore_SetNextMsgSeqNum_Refresh_IncrNextMsgSeqNum() {
34+
t := suite.T()
2535

26-
if store.NextTargetMsgSeqNum() != tc.expectedNextSeqNum {
27-
t.Errorf("Did not get expected target seq num %v, got %v", tc.expectedNextSeqNum, store.NextTargetMsgSeqNum())
28-
}
36+
// Given a MessageStore with the following sender and target seqnums
37+
suite.msgStore.SetNextSenderMsgSeqNum(867)
38+
suite.msgStore.SetNextTargetMsgSeqNum(5309)
2939

30-
store.IncrNextTargetMsgSeqNum()
31-
}
40+
// When the store is refreshed from its backing store
41+
suite.msgStore.Refresh()
42+
43+
// Then the sender and target seqnums should still be
44+
assert.Equal(t, 867, suite.msgStore.NextSenderMsgSeqNum())
45+
assert.Equal(t, 5309, suite.msgStore.NextTargetMsgSeqNum())
46+
47+
// When the sender and target seqnums are incremented
48+
require.Nil(t, suite.msgStore.IncrNextSenderMsgSeqNum())
49+
require.Nil(t, suite.msgStore.IncrNextTargetMsgSeqNum())
50+
51+
// Then the sender and target seqnums should be
52+
assert.Equal(t, 868, suite.msgStore.NextSenderMsgSeqNum())
53+
assert.Equal(t, 5310, suite.msgStore.NextTargetMsgSeqNum())
54+
55+
// When the store is refreshed from its backing store
56+
suite.msgStore.Refresh()
57+
58+
// Then the sender and target seqnums should still be
59+
assert.Equal(t, 868, suite.msgStore.NextSenderMsgSeqNum())
60+
assert.Equal(t, 5310, suite.msgStore.NextTargetMsgSeqNum())
3261
}
3362

34-
func TestMemoryStore_SetMsgSeqNum(t *testing.T) {
35-
store, _ := NewMemoryStoreFactory().Create(SessionID{})
36-
store.SetNextSenderMsgSeqNum(50)
37-
store.SetNextTargetMsgSeqNum(30)
63+
func (suite *MessageStoreTestSuite) TestMessageStore_Reset() {
64+
t := suite.T()
3865

39-
if store.NextSenderMsgSeqNum() != 50 {
40-
t.Errorf("Did not get expected sender seq num %v, got %v", 50, store.NextSenderMsgSeqNum())
41-
}
66+
// Given a MessageStore with the following sender and target seqnums
67+
suite.msgStore.SetNextSenderMsgSeqNum(1234)
68+
suite.msgStore.SetNextTargetMsgSeqNum(5678)
69+
70+
// When the store is reset
71+
require.Nil(t, suite.msgStore.Reset())
72+
73+
// Then the sender and target seqnums should be
74+
assert.Equal(t, 1, suite.msgStore.NextSenderMsgSeqNum())
75+
assert.Equal(t, 1, suite.msgStore.NextTargetMsgSeqNum())
4276

43-
if store.NextTargetMsgSeqNum() != 30 {
44-
t.Errorf("Did not get expected target seq num %v, got %v", 30, store.NextTargetMsgSeqNum())
77+
// When the store is refreshed from its backing store
78+
suite.msgStore.Refresh()
79+
80+
// Then the sender and target seqnums should still be
81+
assert.Equal(t, 1, suite.msgStore.NextSenderMsgSeqNum())
82+
assert.Equal(t, 1, suite.msgStore.NextTargetMsgSeqNum())
83+
}
84+
85+
func (suite *MessageStoreTestSuite) TestMessageStore_SaveMessage_GetMessage() {
86+
t := suite.T()
87+
88+
// Given the following saved messages
89+
expectedMsgsBySeqNum := map[int]string{
90+
1: "In the frozen land of Nador",
91+
2: "they were forced to eat Robin's minstrels",
92+
3: "and there was much rejoicing",
93+
}
94+
for seqNum, msg := range expectedMsgsBySeqNum {
95+
suite.msgStore.SaveMessage(seqNum, []byte(msg))
4596
}
97+
98+
// When the messages are retrieved from the MessageStore
99+
actualMsgs, err := suite.msgStore.GetMessages(1, 3)
100+
require.Nil(t, err)
101+
102+
// Then the messages should be
103+
require.Len(t, actualMsgs, 3)
104+
assert.Equal(t, expectedMsgsBySeqNum[1], string(actualMsgs[0]))
105+
assert.Equal(t, expectedMsgsBySeqNum[2], string(actualMsgs[1]))
106+
assert.Equal(t, expectedMsgsBySeqNum[3], string(actualMsgs[2]))
107+
108+
// When the store is refreshed from its backing store
109+
suite.msgStore.Refresh()
110+
111+
// And the messages are retrieved from the MessageStore
112+
actualMsgs, err = suite.msgStore.GetMessages(1, 3)
113+
require.Nil(t, err)
114+
115+
// Then the messages should still be
116+
require.Len(t, actualMsgs, 3)
117+
assert.Equal(t, expectedMsgsBySeqNum[1], string(actualMsgs[0]))
118+
assert.Equal(t, expectedMsgsBySeqNum[2], string(actualMsgs[1]))
119+
assert.Equal(t, expectedMsgsBySeqNum[3], string(actualMsgs[2]))
46120
}
47121

48-
func TestMemoryStore_GetMessages(t *testing.T) {
49-
store, _ := NewMemoryStoreFactory().Create(SessionID{})
122+
func (suite *MessageStoreTestSuite) TestMessageStore_GetMessages_EmptyStore() {
123+
// When messages are retrieved from an empty store
124+
messages, err := suite.msgStore.GetMessages(1, 2)
125+
require.Nil(suite.T(), err)
50126

51-
messages := store.GetMessages(1, 2)
52-
msg, ok := <-messages
127+
// Then no messages should be returned
128+
require.Empty(suite.T(), messages, "Did not expect messages from empty store")
129+
}
53130

54-
if ok != false {
55-
t.Error("Did not expect messages from empty store", msg)
56-
}
131+
func (suite *MessageStoreTestSuite) TestMessageStore_GetMessages_VariousRanges() {
132+
t := suite.T()
57133

58-
store.SaveMessage(1, []byte("hello"))
59-
store.SaveMessage(2, []byte("cruel"))
60-
store.SaveMessage(3, []byte("world"))
134+
// Given the following saved messages
135+
suite.msgStore.SaveMessage(1, []byte("hello"))
136+
suite.msgStore.SaveMessage(2, []byte("cruel"))
137+
suite.msgStore.SaveMessage(3, []byte("world"))
61138

139+
// When the following requests are made to the store
62140
var testCases = []struct {
63141
beginSeqNo, endSeqNo int
64142
expectedBytes [][]byte
@@ -74,70 +152,21 @@ func TestMemoryStore_GetMessages(t *testing.T) {
74152
{beginSeqNo: 4, endSeqNo: 10, expectedBytes: [][]byte{}},
75153
}
76154

155+
// Then the returned messages should be
77156
for _, tc := range testCases {
78-
messages = store.GetMessages(tc.beginSeqNo, tc.endSeqNo)
79-
80-
expected := tc.expectedBytes
81-
for {
82-
msg, ok = <-messages
83-
84-
if len(expected) == 0 {
85-
if ok == true {
86-
t.Error("Did not expect additional messages", msg)
87-
}
88-
break
89-
}
90-
91-
if ok != true {
92-
t.Error("Did not get messages, expected ", expected[0])
93-
}
94-
95-
if !bytes.Equal(msg, expected[0]) {
96-
t.Error("Expected ", expected[0], " got ", msg)
97-
}
98-
99-
expected = expected[1:]
157+
actualMsgs, err := suite.msgStore.GetMessages(tc.beginSeqNo, tc.endSeqNo)
158+
require.Nil(t, err)
159+
require.Len(t, actualMsgs, len(tc.expectedBytes))
160+
for i, expectedMsg := range tc.expectedBytes {
161+
assert.Equal(t, string(expectedMsg), string(actualMsgs[i]))
100162
}
101163
}
102164
}
103165

104-
func TestMemoryStoreFactory_Create(t *testing.T) {
166+
func (suite *MessageStoreTestSuite) TestMemoryStoreFactory_CreationTime() {
105167
t0 := time.Now()
106-
store, _ := NewMemoryStoreFactory().Create(SessionID{})
168+
suite.msgStore.Reset()
107169
t1 := time.Now()
108-
109-
if store.CreationTime().Before(t0) {
110-
t.Errorf("Expected %v to be before %v", t0, store.CreationTime())
111-
}
112-
113-
if store.CreationTime().After(t1) {
114-
t.Errorf("Expected %v to be after %v", t1, store.CreationTime())
115-
}
116-
}
117-
118-
func TestMemoryStore_Reset(t *testing.T) {
119-
store, _ := NewMemoryStoreFactory().Create(SessionID{})
120-
store.SetNextSenderMsgSeqNum(50)
121-
store.SetNextTargetMsgSeqNum(30)
122-
123-
store.SaveMessage(1, []byte("hello"))
124-
store.SaveMessage(2, []byte("cruel"))
125-
store.SaveMessage(3, []byte("world"))
126-
127-
store.Reset()
128-
129-
messages := store.GetMessages(1, 3)
130-
msg, ok := <-messages
131-
132-
if ok {
133-
t.Error("Did not expect messages, got ", string(msg))
134-
}
135-
136-
if store.NextSenderMsgSeqNum() != 1 {
137-
t.Error("SenderMsgSeqNum should reset")
138-
}
139-
140-
if store.NextTargetMsgSeqNum() != 1 {
141-
t.Error("TargetMsgSeqNum should reset")
142-
}
170+
require.True(suite.T(), suite.msgStore.CreationTime().After(t0))
171+
require.True(suite.T(), suite.msgStore.CreationTime().Before(t1))
143172
}

0 commit comments

Comments
 (0)