Skip to content

Commit efd43e2

Browse files
authored
Merge pull request #485 from waheedoo/make_field_map_thread_safe
Solve concurrent map read and map write by making Fieldmap thread-safe & avoiding recycling of addresses
2 parents 94d7217 + 5207087 commit efd43e2

File tree

8 files changed

+51
-84
lines changed

8 files changed

+51
-84
lines changed

field_map.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package quickfix
33
import (
44
"bytes"
55
"sort"
6+
"sync"
67
"time"
78
)
89

@@ -39,6 +40,7 @@ func (t tagSort) Less(i, j int) bool { return t.compare(t.tags[i], t.tags[j]) }
3940
type FieldMap struct {
4041
tagLookup map[Tag]field
4142
tagSort
43+
rwLock *sync.RWMutex
4244
}
4345

4446
// ascending tags
@@ -49,12 +51,16 @@ func (m *FieldMap) init() {
4951
}
5052

5153
func (m *FieldMap) initWithOrdering(ordering tagOrder) {
54+
m.rwLock = &sync.RWMutex{}
5255
m.tagLookup = make(map[Tag]field)
5356
m.compare = ordering
5457
}
5558

5659
//Tags returns all of the Field Tags in this FieldMap
5760
func (m FieldMap) Tags() []Tag {
61+
m.rwLock.RLock()
62+
defer m.rwLock.RUnlock()
63+
5864
tags := make([]Tag, 0, len(m.tagLookup))
5965
for t := range m.tagLookup {
6066
tags = append(tags, t)
@@ -70,12 +76,18 @@ func (m FieldMap) Get(parser Field) MessageRejectError {
7076

7177
//Has returns true if the Tag is present in this FieldMap
7278
func (m FieldMap) Has(tag Tag) bool {
79+
m.rwLock.RLock()
80+
defer m.rwLock.RUnlock()
81+
7382
_, ok := m.tagLookup[tag]
7483
return ok
7584
}
7685

7786
//GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
7887
func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError {
88+
m.rwLock.RLock()
89+
defer m.rwLock.RUnlock()
90+
7991
f, ok := m.tagLookup[tag]
8092
if !ok {
8193
return ConditionallyRequiredFieldMissing(tag)
@@ -90,6 +102,9 @@ func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError
90102

91103
//GetBytes is a zero-copy GetField wrapper for []bytes fields
92104
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
105+
m.rwLock.RLock()
106+
defer m.rwLock.RUnlock()
107+
93108
f, ok := m.tagLookup[tag]
94109
if !ok {
95110
return nil, ConditionallyRequiredFieldMissing(tag)
@@ -124,6 +139,9 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
124139

125140
//GetTime is a GetField wrapper for utc timestamp fields
126141
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
142+
m.rwLock.RLock()
143+
defer m.rwLock.RUnlock()
144+
127145
bytes, err := m.GetBytes(tag)
128146
if err != nil {
129147
return
@@ -148,6 +166,9 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
148166

149167
//GetGroup is a Get function specific to Group Fields.
150168
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
169+
m.rwLock.RLock()
170+
defer m.rwLock.RUnlock()
171+
151172
f, ok := m.tagLookup[parser.Tag()]
152173
if !ok {
153174
return ConditionallyRequiredFieldMissing(parser.Tag())
@@ -193,6 +214,9 @@ func (m *FieldMap) SetString(tag Tag, value string) *FieldMap {
193214

194215
//Clear purges all fields from field map
195216
func (m *FieldMap) Clear() {
217+
m.rwLock.Lock()
218+
defer m.rwLock.Unlock()
219+
196220
m.tags = m.tags[0:0]
197221
for k := range m.tagLookup {
198222
delete(m.tagLookup, k)
@@ -201,6 +225,9 @@ func (m *FieldMap) Clear() {
201225

202226
//CopyInto overwrites the given FieldMap with this one
203227
func (m *FieldMap) CopyInto(to *FieldMap) {
228+
m.rwLock.RLock()
229+
defer m.rwLock.RUnlock()
230+
204231
to.tagLookup = make(map[Tag]field)
205232
for tag, f := range m.tagLookup {
206233
clone := make(field, 1)
@@ -213,6 +240,9 @@ func (m *FieldMap) CopyInto(to *FieldMap) {
213240
}
214241

215242
func (m *FieldMap) add(f field) {
243+
m.rwLock.Lock()
244+
defer m.rwLock.Unlock()
245+
216246
t := fieldTag(f)
217247
if _, ok := m.tagLookup[t]; !ok {
218248
m.tags = append(m.tags, t)
@@ -222,6 +252,9 @@ func (m *FieldMap) add(f field) {
222252
}
223253

224254
func (m *FieldMap) getOrCreate(tag Tag) field {
255+
m.rwLock.Lock()
256+
defer m.rwLock.Unlock()
257+
225258
if f, ok := m.tagLookup[tag]; ok {
226259
f = f[:1]
227260
return f
@@ -242,6 +275,9 @@ func (m *FieldMap) Set(field FieldWriter) *FieldMap {
242275

243276
//SetGroup is a setter specific to group fields
244277
func (m *FieldMap) SetGroup(field FieldGroupWriter) *FieldMap {
278+
m.rwLock.Lock()
279+
defer m.rwLock.Unlock()
280+
245281
_, ok := m.tagLookup[field.Tag()]
246282
if !ok {
247283
m.tags = append(m.tags, field.Tag())
@@ -256,6 +292,9 @@ func (m *FieldMap) sortedTags() []Tag {
256292
}
257293

258294
func (m FieldMap) write(buffer *bytes.Buffer) {
295+
m.rwLock.Lock()
296+
defer m.rwLock.Unlock()
297+
259298
for _, tag := range m.sortedTags() {
260299
if f, ok := m.tagLookup[tag]; ok {
261300
writeField(f, buffer)
@@ -264,6 +303,9 @@ func (m FieldMap) write(buffer *bytes.Buffer) {
264303
}
265304

266305
func (m FieldMap) total() int {
306+
m.rwLock.RLock()
307+
defer m.rwLock.RUnlock()
308+
267309
total := 0
268310
for _, fields := range m.tagLookup {
269311
for _, tv := range fields {
@@ -279,6 +321,9 @@ func (m FieldMap) total() int {
279321
}
280322

281323
func (m FieldMap) length() int {
324+
m.rwLock.RLock()
325+
defer m.rwLock.RUnlock()
326+
282327
length := 0
283328
for _, fields := range m.tagLookup {
284329
for _, tv := range fields {

internal/buffer_pool.go

Lines changed: 0 additions & 37 deletions
This file was deleted.

message_pool.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

parser.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@ import (
55
"errors"
66
"io"
77
"time"
8-
9-
"github.com/quickfixgo/quickfix/internal"
108
)
119

1210
const (
1311
defaultBufSize = 4096
1412
)
1513

16-
var bufferPool internal.BufferPool
1714

1815
type parser struct {
1916
//buffer is a slice of bigBuffer
@@ -145,7 +142,7 @@ func (p *parser) ReadMessage() (msgBytes *bytes.Buffer, err error) {
145142
return
146143
}
147144

148-
msgBytes = bufferPool.Get()
145+
msgBytes = new(bytes.Buffer)
149146
msgBytes.Reset()
150147
msgBytes.Write(p.buffer[:index])
151148
p.buffer = p.buffer[index:]

repeating_group.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,13 @@ func (f RepeatingGroup) Write() []TagValue {
114114

115115
for _, group := range f.groups {
116116
tags := group.sortedTags()
117-
117+
group.rwLock.RLock()
118118
for _, tag := range tags {
119119
if fields, ok := group.tagLookup[tag]; ok {
120120
tvs = append(tvs, fields...)
121121
}
122122
}
123+
group.rwLock.RUnlock()
123124
}
124125

125126
return tvs
@@ -199,7 +200,9 @@ func (f *RepeatingGroup) Read(tv []TagValue) ([]TagValue, error) {
199200
f.groups = append(f.groups, group)
200201
}
201202

203+
group.rwLock.Lock()
202204
group.tagLookup[tvRange[0].tag] = tvRange
205+
group.rwLock.Unlock()
203206
}
204207

205208
if len(f.groups) != expectedGroupSize {

resend_state.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session
5454

5555
delete(s.messageStash, targetSeqNum)
5656

57-
//return stashed message to pool
58-
session.returnToPool(msg)
59-
6057
nextState = inSession{}.FixMsgIn(session, msg)
6158
if !nextState.IsLoggedOn() {
6259
return

session.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type session struct {
4343
transportDataDictionary *datadictionary.DataDictionary
4444
appDataDictionary *datadictionary.DataDictionary
4545

46-
messagePool
4746
timestampPrecision TimestampPrecision
4847
}
4948

@@ -678,14 +677,6 @@ type fixIn struct {
678677
receiveTime time.Time
679678
}
680679

681-
func (s *session) returnToPool(msg *Message) {
682-
s.messagePool.Put(msg)
683-
if msg.rawMessage != nil {
684-
bufferPool.Put(msg.rawMessage)
685-
msg.rawMessage = nil
686-
}
687-
}
688-
689680
func (s *session) onDisconnect() {
690681
s.log.OnEvent("Disconnected")
691682
if s.ResetOnDisconnect {

session_state.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,14 @@ func (sm *stateMachine) Incoming(session *session, m fixIn) {
6868

6969
session.log.OnIncoming(m.bytes.Bytes())
7070

71-
msg := session.messagePool.Get()
71+
msg := NewMessage()
7272
if err := ParseMessageWithDataDictionary(msg, m.bytes, session.transportDataDictionary, session.appDataDictionary); err != nil {
7373
session.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), m.bytes)
7474
} else {
7575
msg.ReceiveTime = m.receiveTime
7676
sm.fixMsgIn(session, msg)
7777
}
7878

79-
if !msg.keepMessage {
80-
session.returnToPool(msg)
81-
}
8279
session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
8380
}
8481

0 commit comments

Comments
 (0)