Skip to content

Commit decdb69

Browse files
author
Jannis Pohlmann
committed
connections: Fix crash when sending data over a closed connection
The problem was a race condition between removing the subscriptions of the connection from the subscription manager when a connection is closed and writing sends to the already-closed outgoing channel. This commit resolves this by guarding writes to the outgoing channel with a combination of a "closed" flag and a mutex that synchronizes access to the closed flag and channel close operation. This fixes #22.
1 parent c8b63e0 commit decdb69

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

connections.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/google/uuid"
@@ -121,12 +122,14 @@ type Connection interface {
121122
*/
122123

123124
type connection struct {
124-
id string
125-
ws *websocket.Conn
126-
config ConnectionConfig
127-
logger *log.Entry
128-
outgoing chan OperationMessage
129-
user interface{}
125+
id string
126+
ws *websocket.Conn
127+
config ConnectionConfig
128+
logger *log.Entry
129+
outgoing chan OperationMessage
130+
user interface{}
131+
closeMutex *sync.Mutex
132+
closed bool
130133
}
131134

132135
func operationMessageForType(messageType string) OperationMessage {
@@ -144,6 +147,8 @@ func NewConnection(ws *websocket.Conn, config ConnectionConfig) Connection {
144147
conn.ws = ws
145148
conn.config = config
146149
conn.logger = NewLogger("connection/" + conn.id)
150+
conn.closed = false
151+
conn.closeMutex = &sync.Mutex{}
147152

148153
conn.outgoing = make(chan OperationMessage)
149154

@@ -167,25 +172,43 @@ func (conn *connection) SendData(opID string, data *DataMessagePayload) {
167172
msg := operationMessageForType(gqlData)
168173
msg.ID = opID
169174
msg.Payload = data
170-
conn.outgoing <- msg
175+
conn.closeMutex.Lock()
176+
if !conn.closed {
177+
conn.outgoing <- msg
178+
}
179+
conn.closeMutex.Unlock()
171180
}
172181

173182
func (conn *connection) SendError(err error) {
174183
msg := operationMessageForType(gqlError)
175184
msg.Payload = err.Error()
176-
conn.outgoing <- msg
185+
conn.closeMutex.Lock()
186+
if !conn.closed {
187+
conn.outgoing <- msg
188+
}
189+
conn.closeMutex.Unlock()
177190
}
178191

179192
func (conn *connection) sendOperationErrors(opID string, errs []error) {
193+
if conn.closed {
194+
return
195+
}
180196
msg := operationMessageForType(gqlError)
181197
msg.ID = opID
182198
msg.Payload = errs
183-
conn.outgoing <- msg
199+
conn.closeMutex.Lock()
200+
if !conn.closed {
201+
conn.outgoing <- msg
202+
}
203+
conn.closeMutex.Unlock()
184204
}
185205

186206
func (conn *connection) close() {
187207
// Close the write loop by closing the outgoing messages channels
208+
conn.closeMutex.Lock()
209+
conn.closed = true
188210
close(conn.outgoing)
211+
conn.closeMutex.Unlock()
189212

190213
// Notify event handlers
191214
if conn.config.EventHandlers.Close != nil {

0 commit comments

Comments
 (0)