@@ -13,6 +13,11 @@ import (
1313 "time"
1414)
1515
16+ const (
17+ notificationQueueSize = 1024
18+ notificationQueueDrainTimeout = 5 * time .Second
19+ )
20+
1621type anyMessage struct {
1722 JSONRPC string `json:"jsonrpc"`
1823 ID * json.RawMessage `json:"id,omitempty"`
@@ -73,7 +78,7 @@ func NewConnection(handler MethodHandler, peerInput io.Writer, peerOutput io.Rea
7378 cancel : cancel ,
7479 inboundCtx : inboundCtx ,
7580 inboundCancel : inboundCancel ,
76- notificationQueue : make (chan * anyMessage , 1024 ),
81+ notificationQueue : make (chan * anyMessage , notificationQueueSize ),
7782 }
7883 go c .receive ()
7984 go c .processNotifications ()
@@ -160,7 +165,6 @@ func (c *Connection) receive() {
160165
161166 // Cancel inboundCtx after notifications finish, but ensure we don't leak forever if a
162167 // handler blocks waiting for cancellation.
163- const drainTimeout = 5 * time .Second
164168 go func () {
165169 done := make (chan struct {})
166170 go func () {
@@ -169,15 +173,16 @@ func (c *Connection) receive() {
169173 }()
170174 select {
171175 case <- done :
172- case <- time .After (drainTimeout ):
176+ case <- time .After (notificationQueueDrainTimeout ):
173177 }
174178 c .inboundCancel (cause )
175179 }()
176180
177181 c .loggerOrDefault ().Info ("peer connection closed" )
178182}
179183
180- // processNotifications processes notifications sequentially to maintain order
184+ // processNotifications processes notifications sequentially to maintain order.
185+ // It terminates when notificationQueue is closed (e.g. on disconnect in receive()).
181186func (c * Connection ) processNotifications () {
182187 for msg := range c .notificationQueue {
183188 c .handleInbound (msg )
0 commit comments