Skip to content

Commit 8d97b41

Browse files
committed
fix goroutine leak
1 parent 2497d57 commit 8d97b41

File tree

4 files changed

+106
-2
lines changed

4 files changed

+106
-2
lines changed

session.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type session struct {
5353
stateTimer *internal.EventTimer
5454
peerTimer *internal.EventTimer
5555
sentReset bool
56+
stopCh chan struct{}
5657
stopOnce sync.Once
5758

5859
targetDefaultApplVerID string
@@ -98,6 +99,9 @@ type stopReq struct{}
9899
func (s *session) stop() {
99100
// Stop once.
100101
s.stopOnce.Do(func() {
102+
if s.stopCh != nil {
103+
close(s.stopCh)
104+
}
101105
s.admin <- stopReq{}
102106
})
103107
}
@@ -596,7 +600,13 @@ func (s *session) initiateLogoutInReplyTo(reason string, inReplyTo *Message) (er
596600
return
597601
}
598602
s.log.OnEvent("Inititated logout request")
599-
time.AfterFunc(s.LogoutTimeout, func() { s.sessionEvent <- internal.LogoutTimeout })
603+
time.AfterFunc(s.LogoutTimeout, func() {
604+
select {
605+
case <-s.stopCh:
606+
return
607+
case s.sessionEvent <- internal.LogoutTimeout:
608+
}
609+
})
600610
return
601611
}
602612

session_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func (f sessionFactory) newSession(
9090
s = &session{
9191
sessionID: sessionID,
9292
stopOnce: sync.Once{},
93+
stopCh: make(chan struct{}),
9394
}
9495

9596
var validatorSettings = defaultValidatorSettings

session_leak_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package quickfix
2+
3+
import (
4+
"bytes"
5+
"runtime/pprof"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/quickfixgo/quickfix/internal"
11+
)
12+
13+
type testLog struct{}
14+
15+
func (testLog) OnIncoming([]byte) {}
16+
func (testLog) OnOutgoing([]byte) {}
17+
func (testLog) OnEvent(string) {}
18+
func (testLog) OnEventf(string, ...interface{}) {}
19+
20+
// testApp is a no-op Application for tests.
21+
type testApp struct{}
22+
23+
func (testApp) OnCreate(SessionID) {}
24+
func (testApp) OnLogon(SessionID) {}
25+
func (testApp) OnLogout(SessionID) {}
26+
func (testApp) ToAdmin(*Message, SessionID) {}
27+
func (testApp) FromAdmin(*Message, SessionID) MessageRejectError { return nil }
28+
func (testApp) ToApp(*Message, SessionID) error { return nil }
29+
func (testApp) FromApp(*Message, SessionID) MessageRejectError { return nil }
30+
31+
func newTimerOnlySession() *session {
32+
tr, _ := internal.NewUTCTimeRange(internal.NewTimeOfDay(0, 0, 0), internal.NewTimeOfDay(23, 59, 59), nil)
33+
s := &session{
34+
store: &memoryStore{},
35+
log: testLog{},
36+
sessionID: SessionID{BeginString: BeginStringFIX44, SenderCompID: "S", TargetCompID: "T"},
37+
messageOut: make(chan []byte, 2),
38+
messageIn: make(chan fixIn),
39+
sessionEvent: make(chan internal.Event),
40+
messageEvent: make(chan bool),
41+
application: testApp{},
42+
stopCh: make(chan struct{}),
43+
SessionSettings: internal.SessionSettings{
44+
SessionTime: tr,
45+
},
46+
}
47+
return s
48+
}
49+
50+
func countGoroutinesContaining(substr string) int {
51+
var buf bytes.Buffer
52+
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
53+
return strings.Count(buf.String(), substr)
54+
}
55+
56+
func TestLogonTimeoutDoesNotLeakGoroutine(t *testing.T) {
57+
s := newTimerOnlySession()
58+
s.InitiateLogon = true
59+
s.LogonTimeout = 10 * time.Millisecond
60+
61+
baseline := countGoroutinesContaining("stateMachine).Connect.func1")
62+
63+
s.stateMachine.Connect(s)
64+
close(s.stopCh)
65+
time.Sleep(4 * s.LogonTimeout)
66+
67+
if got := countGoroutinesContaining("stateMachine).Connect.func1"); got > baseline {
68+
t.Fatalf("logon timeout goroutine leaked: baseline=%d current=%d", baseline, got)
69+
}
70+
}
71+
72+
func TestLogoutTimeoutDoesNotLeakGoroutine(t *testing.T) {
73+
s := newTimerOnlySession()
74+
s.LogoutTimeout = 10 * time.Millisecond
75+
baseline := countGoroutinesContaining("initiateLogoutInReplyTo")
76+
s.stateMachine.Connect(s)
77+
78+
if err := s.initiateLogout("bye"); err != nil {
79+
t.Fatalf("initiateLogout returned error: %v", err)
80+
}
81+
close(s.stopCh)
82+
time.Sleep(4 * s.LogoutTimeout)
83+
84+
if got := countGoroutinesContaining("initiateLogoutInReplyTo"); got > baseline {
85+
t.Fatalf("logout timeout goroutine leaked: baseline=%d current=%d", baseline, got)
86+
}
87+
}

session_state.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ func (sm *stateMachine) Connect(session *session) {
6565

6666
sm.setState(session, logonState{})
6767
// Fire logon timeout event after the pre-configured delay period.
68-
time.AfterFunc(session.LogonTimeout, func() { session.sessionEvent <- internal.LogonTimeout })
68+
time.AfterFunc(session.LogonTimeout, func() {
69+
select {
70+
case <-session.stopCh:
71+
return
72+
case session.sessionEvent <- internal.LogonTimeout:
73+
}
74+
})
6975
}
7076

7177
func (sm *stateMachine) Stop(session *session) {

0 commit comments

Comments
 (0)