Skip to content

Commit 6cd9aed

Browse files
committed
feat(websocket): increase send buffer size
1 parent dc4d092 commit 6cd9aed

File tree

2 files changed

+46
-26
lines changed

2 files changed

+46
-26
lines changed

api/sockets/handler.go

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/gorilla/context"
1111
"github.com/gorilla/websocket"
12-
"github.com/semaphoreui/semaphore/util"
1312
log "github.com/sirupsen/logrus"
1413
)
1514

@@ -33,6 +32,10 @@ const (
3332

3433
// Maximum message size allowed from peer.
3534
maxMessageSize = 512
35+
36+
// Maximum size of the connection.send channel.
37+
// When the channel is full, the hub closes it (see method hub.run).
38+
connectionChannelSize = 2048
3639
)
3740

3841
type connection struct {
@@ -41,18 +44,23 @@ type connection struct {
4144
userID int
4245
}
4346

44-
func (c *connection) logError(err error, msg string) {
47+
func (c *connection) log(level log.Level, err error, msg string) {
4548
log.WithError(err).WithFields(log.Fields{
4649
"context": "websocket",
4750
"user_id": c.userID,
48-
}).Error(msg)
51+
}).Log(level, msg)
4952
}
5053

51-
func (c *connection) debugError(err error, msg string) {
52-
log.WithError(err).WithFields(log.Fields{
53-
"context": "websocket",
54-
"user_id": c.userID,
55-
}).Debug(msg)
54+
func (c *connection) logError(err error, msg string) {
55+
c.log(log.ErrorLevel, err, msg)
56+
}
57+
58+
func (c *connection) logWarn(err error, msg string) {
59+
c.log(log.DebugLevel, err, msg)
60+
}
61+
62+
func (c *connection) logDebug(err error, msg string) {
63+
c.log(log.DebugLevel, err, msg)
5664
}
5765

5866
// readPump pumps messages from the websocket connection to the hub.
@@ -65,12 +73,16 @@ func (c *connection) readPump() {
6573
c.ws.SetReadLimit(maxMessageSize)
6674

6775
if err := c.ws.SetReadDeadline(tz.Now().Add(pongWait)); err != nil {
68-
c.logError(err, "Cannot set read deadline")
76+
c.logWarn(err, "Cannot set read deadline")
6977
}
7078

7179
c.ws.SetPongHandler(func(string) error {
72-
err2 := c.ws.SetReadDeadline(tz.Now().Add(pongWait))
73-
util.LogErrorF(err2, log.Fields{"error": "Cannot set read deadline"})
80+
deadline := tz.Now().Add(pongWait)
81+
82+
if err := c.ws.SetReadDeadline(deadline); err != nil {
83+
c.logWarn(err, "Cannot set read deadline")
84+
}
85+
7486
return nil
7587
})
7688

@@ -80,7 +92,7 @@ func (c *connection) readPump() {
8092

8193
if err != nil {
8294
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
83-
c.logError(err, "Cannot read message from websocket")
95+
c.logDebug(err, "Cannot read message from websocket")
8496
}
8597
break
8698
}
@@ -90,8 +102,10 @@ func (c *connection) readPump() {
90102
// write writes a message with the given message type and payload.
91103
func (c *connection) write(mt int, payload []byte) error {
92104

93-
if err := c.ws.SetWriteDeadline(tz.Now().Add(writeWait)); err != nil {
94-
c.logError(err, "Cannot set write deadline")
105+
deadline := tz.Now().Add(writeWait)
106+
107+
if err := c.ws.SetWriteDeadline(deadline); err != nil {
108+
c.logWarn(err, "Cannot set write deadline")
95109
}
96110

97111
return c.ws.WriteMessage(mt, payload)
@@ -109,19 +123,23 @@ func (c *connection) writePump() {
109123
for {
110124
select {
111125
case message, ok := <-c.send:
126+
112127
if !ok {
113128
if err := c.write(websocket.CloseMessage, []byte{}); err != nil {
114-
c.debugError(err, "Cannot send close message")
129+
c.logDebug(err, "Cannot send close message")
115130
}
116131
return
117132
}
118133

119134
if err := c.write(websocket.TextMessage, message); err != nil {
120-
c.debugError(err, "Cannot send message")
135+
c.logDebug(err, "Cannot send message")
136+
return
121137
}
138+
122139
case <-ticker.C:
140+
123141
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
124-
c.debugError(err, "Cannot send ping message")
142+
c.logDebug(err, "Cannot send ping message")
125143
return
126144
}
127145
}
@@ -138,13 +156,16 @@ func Handler(w http.ResponseWriter, r *http.Request) {
138156
user := usr.(*db.User)
139157
ws, err := upgrader.Upgrade(w, r, nil)
140158
if err != nil {
141-
log.Error(err)
159+
log.WithError(err).WithFields(log.Fields{
160+
"context": "websocket",
161+
"user_id": user.ID,
162+
}).Error("Cannot upgrade connection")
142163
w.WriteHeader(http.StatusInternalServerError)
143164
return
144165
}
145166

146167
c := &connection{
147-
send: make(chan []byte, 256),
168+
send: make(chan []byte, connectionChannelSize),
148169
ws: ws,
149170
userID: user.ID,
150171
}

api/sockets/pool.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package sockets
33
// hub maintains the set of active connections and broadcasts messages to the
44
// connections.
55
type hub struct {
6-
// Registered connections.
6+
// Registered websocket connections.
77
connections map[*connection]bool
88

99
// Inbound messages from the connections.
@@ -28,7 +28,6 @@ var h = hub{
2828
connections: make(map[*connection]bool),
2929
}
3030

31-
//nolint: gocyclo
3231
func (h *hub) run() {
3332
for {
3433
select {
@@ -40,16 +39,16 @@ func (h *hub) run() {
4039
close(c.send)
4140
}
4241
case m := <-h.broadcast:
43-
for c := range h.connections {
44-
if m.userID > 0 && m.userID != c.userID {
42+
for conn := range h.connections {
43+
if m.userID > 0 && m.userID != conn.userID {
4544
continue
4645
}
4746

4847
select {
49-
case c.send <- m.msg:
48+
case conn.send <- m.msg:
5049
default:
51-
close(c.send)
52-
delete(h.connections, c)
50+
close(conn.send)
51+
delete(h.connections, conn)
5352
}
5453
}
5554
}

0 commit comments

Comments
 (0)