Skip to content

Commit 3287298

Browse files
committed
make fieldMap thread safe + remove recycling of msg addresses
1 parent 94d7217 commit 3287298

File tree

8 files changed

+50
-84
lines changed

8 files changed

+50
-84
lines changed

field_map.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package quickfix
33
import (
44
"bytes"
55
"sort"
6+
"sync"
67
"time"
78
)
89

@@ -39,6 +40,7 @@ func (t tagSort) Less(i, j int) bool { return t.compare(t.tags[i], t.tags[j]) }
3940
type FieldMap struct {
4041
tagLookup map[Tag]field
4142
tagSort
43+
rwLock *sync.RWMutex
4244
}
4345

4446
// ascending tags
@@ -49,12 +51,16 @@ func (m *FieldMap) init() {
4951
}
5052

5153
func (m *FieldMap) initWithOrdering(ordering tagOrder) {
54+
m.rwLock = &sync.RWMutex{}
5255
m.tagLookup = make(map[Tag]field)
5356
m.compare = ordering
5457
}
5558

5659
//Tags returns all of the Field Tags in this FieldMap
5760
func (m FieldMap) Tags() []Tag {
61+
m.rwLock.RLock()
62+
defer m.rwLock.RUnlock()
63+
5864
tags := make([]Tag, 0, len(m.tagLookup))
5965
for t := range m.tagLookup {
6066
tags = append(tags, t)
@@ -70,12 +76,17 @@ func (m FieldMap) Get(parser Field) MessageRejectError {
7076

7177
//Has returns true if the Tag is present in this FieldMap
7278
func (m FieldMap) Has(tag Tag) bool {
79+
m.rwLock.RLock()
80+
defer m.rwLock.RUnlock()
7381
_, ok := m.tagLookup[tag]
7482
return ok
7583
}
7684

7785
//GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
7886
func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError {
87+
m.rwLock.RLock()
88+
defer m.rwLock.RUnlock()
89+
7990
f, ok := m.tagLookup[tag]
8091
if !ok {
8192
return ConditionallyRequiredFieldMissing(tag)
@@ -90,6 +101,9 @@ func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError
90101

91102
//GetBytes is a zero-copy GetField wrapper for []bytes fields
92103
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
104+
m.rwLock.RLock()
105+
defer m.rwLock.RUnlock()
106+
93107
f, ok := m.tagLookup[tag]
94108
if !ok {
95109
return nil, ConditionallyRequiredFieldMissing(tag)
@@ -124,6 +138,9 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
124138

125139
//GetTime is a GetField wrapper for utc timestamp fields
126140
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
141+
m.rwLock.RLock()
142+
defer m.rwLock.RUnlock()
143+
127144
bytes, err := m.GetBytes(tag)
128145
if err != nil {
129146
return
@@ -148,6 +165,9 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
148165

149166
//GetGroup is a Get function specific to Group Fields.
150167
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
168+
m.rwLock.RLock()
169+
defer m.rwLock.RUnlock()
170+
151171
f, ok := m.tagLookup[parser.Tag()]
152172
if !ok {
153173
return ConditionallyRequiredFieldMissing(parser.Tag())
@@ -193,6 +213,9 @@ func (m *FieldMap) SetString(tag Tag, value string) *FieldMap {
193213

194214
//Clear purges all fields from field map
195215
func (m *FieldMap) Clear() {
216+
m.rwLock.Lock()
217+
defer m.rwLock.Unlock()
218+
196219
m.tags = m.tags[0:0]
197220
for k := range m.tagLookup {
198221
delete(m.tagLookup, k)
@@ -201,6 +224,9 @@ func (m *FieldMap) Clear() {
201224

202225
//CopyInto overwrites the given FieldMap with this one
203226
func (m *FieldMap) CopyInto(to *FieldMap) {
227+
m.rwLock.RLock()
228+
defer m.rwLock.RUnlock()
229+
204230
to.tagLookup = make(map[Tag]field)
205231
for tag, f := range m.tagLookup {
206232
clone := make(field, 1)
@@ -213,6 +239,9 @@ func (m *FieldMap) CopyInto(to *FieldMap) {
213239
}
214240

215241
func (m *FieldMap) add(f field) {
242+
m.rwLock.Lock()
243+
defer m.rwLock.Unlock()
244+
216245
t := fieldTag(f)
217246
if _, ok := m.tagLookup[t]; !ok {
218247
m.tags = append(m.tags, t)
@@ -222,6 +251,9 @@ func (m *FieldMap) add(f field) {
222251
}
223252

224253
func (m *FieldMap) getOrCreate(tag Tag) field {
254+
m.rwLock.Lock()
255+
defer m.rwLock.Unlock()
256+
225257
if f, ok := m.tagLookup[tag]; ok {
226258
f = f[:1]
227259
return f
@@ -242,6 +274,9 @@ func (m *FieldMap) Set(field FieldWriter) *FieldMap {
242274

243275
//SetGroup is a setter specific to group fields
244276
func (m *FieldMap) SetGroup(field FieldGroupWriter) *FieldMap {
277+
m.rwLock.Lock()
278+
defer m.rwLock.Unlock()
279+
245280
_, ok := m.tagLookup[field.Tag()]
246281
if !ok {
247282
m.tags = append(m.tags, field.Tag())
@@ -256,6 +291,9 @@ func (m *FieldMap) sortedTags() []Tag {
256291
}
257292

258293
func (m FieldMap) write(buffer *bytes.Buffer) {
294+
m.rwLock.Lock()
295+
defer m.rwLock.Unlock()
296+
259297
for _, tag := range m.sortedTags() {
260298
if f, ok := m.tagLookup[tag]; ok {
261299
writeField(f, buffer)
@@ -264,6 +302,9 @@ func (m FieldMap) write(buffer *bytes.Buffer) {
264302
}
265303

266304
func (m FieldMap) total() int {
305+
m.rwLock.RLock()
306+
defer m.rwLock.RUnlock()
307+
267308
total := 0
268309
for _, fields := range m.tagLookup {
269310
for _, tv := range fields {
@@ -279,6 +320,9 @@ func (m FieldMap) total() int {
279320
}
280321

281322
func (m FieldMap) length() int {
323+
m.rwLock.RLock()
324+
defer m.rwLock.RUnlock()
325+
282326
length := 0
283327
for _, fields := range m.tagLookup {
284328
for _, tv := range fields {

internal/buffer_pool.go

Lines changed: 0 additions & 37 deletions
This file was deleted.

message_pool.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

parser.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@ import (
55
"errors"
66
"io"
77
"time"
8-
9-
"github.com/quickfixgo/quickfix/internal"
108
)
119

1210
const (
1311
defaultBufSize = 4096
1412
)
1513

16-
var bufferPool internal.BufferPool
1714

1815
type parser struct {
1916
//buffer is a slice of bigBuffer
@@ -145,7 +142,7 @@ func (p *parser) ReadMessage() (msgBytes *bytes.Buffer, err error) {
145142
return
146143
}
147144

148-
msgBytes = bufferPool.Get()
145+
msgBytes = new(bytes.Buffer)
149146
msgBytes.Reset()
150147
msgBytes.Write(p.buffer[:index])
151148
p.buffer = p.buffer[index:]

repeating_group.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,13 @@ func (f RepeatingGroup) Write() []TagValue {
114114

115115
for _, group := range f.groups {
116116
tags := group.sortedTags()
117-
117+
group.rwLock.RLock()
118118
for _, tag := range tags {
119119
if fields, ok := group.tagLookup[tag]; ok {
120120
tvs = append(tvs, fields...)
121121
}
122122
}
123+
group.rwLock.RUnlock()
123124
}
124125

125126
return tvs
@@ -199,7 +200,9 @@ func (f *RepeatingGroup) Read(tv []TagValue) ([]TagValue, error) {
199200
f.groups = append(f.groups, group)
200201
}
201202

203+
group.rwLock.Lock()
202204
group.tagLookup[tvRange[0].tag] = tvRange
205+
group.rwLock.Unlock()
203206
}
204207

205208
if len(f.groups) != expectedGroupSize {

resend_state.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session
5454

5555
delete(s.messageStash, targetSeqNum)
5656

57-
//return stashed message to pool
58-
session.returnToPool(msg)
59-
6057
nextState = inSession{}.FixMsgIn(session, msg)
6158
if !nextState.IsLoggedOn() {
6259
return

session.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type session struct {
4343
transportDataDictionary *datadictionary.DataDictionary
4444
appDataDictionary *datadictionary.DataDictionary
4545

46-
messagePool
4746
timestampPrecision TimestampPrecision
4847
}
4948

@@ -678,14 +677,6 @@ type fixIn struct {
678677
receiveTime time.Time
679678
}
680679

681-
func (s *session) returnToPool(msg *Message) {
682-
s.messagePool.Put(msg)
683-
if msg.rawMessage != nil {
684-
bufferPool.Put(msg.rawMessage)
685-
msg.rawMessage = nil
686-
}
687-
}
688-
689680
func (s *session) onDisconnect() {
690681
s.log.OnEvent("Disconnected")
691682
if s.ResetOnDisconnect {

session_state.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,14 @@ func (sm *stateMachine) Incoming(session *session, m fixIn) {
6868

6969
session.log.OnIncoming(m.bytes.Bytes())
7070

71-
msg := session.messagePool.Get()
71+
msg := NewMessage()
7272
if err := ParseMessageWithDataDictionary(msg, m.bytes, session.transportDataDictionary, session.appDataDictionary); err != nil {
7373
session.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), m.bytes)
7474
} else {
7575
msg.ReceiveTime = m.receiveTime
7676
sm.fixMsgIn(session, msg)
7777
}
7878

79-
if !msg.keepMessage {
80-
session.returnToPool(msg)
81-
}
8279
session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
8380
}
8481

0 commit comments

Comments
 (0)