Skip to content

Commit 42e876b

Browse files
authored
Merge pull request #100 from troutowicz/per-message-deadline
Support per-message write deadlines
2 parents 3ea5efd + 5c8d8f2 commit 42e876b

File tree

4 files changed

+211
-4
lines changed

4 files changed

+211
-4
lines changed

envelope.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package melody
22

3+
import "time"
4+
35
type envelope struct {
4-
t int
5-
msg []byte
6-
filter filterFunc
6+
t int
7+
msg []byte
8+
filter filterFunc
9+
writeWait time.Duration // Optional per-message deadline (0 = use Config.WriteWait)
710
}

melody.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package melody
22

33
import (
44
"net/http"
5+
"time"
56

67
"github.com/gorilla/websocket"
78
)
@@ -298,3 +299,73 @@ func (m *Melody) IsClosed() bool {
298299
func FormatCloseMessage(closeCode int, text string) []byte {
299300
return websocket.FormatCloseMessage(closeCode, text)
300301
}
302+
303+
// BroadcastWithDeadline broadcasts a text message with a custom write deadline.
304+
// If deadline is 0, uses Config.WriteWait.
305+
func (m *Melody) BroadcastWithDeadline(msg []byte, deadline time.Duration) error {
306+
if m.hub.closed() {
307+
return ErrClosed
308+
}
309+
310+
message := envelope{
311+
t: websocket.TextMessage,
312+
msg: msg,
313+
writeWait: deadline,
314+
}
315+
m.hub.broadcast(message)
316+
317+
return nil
318+
}
319+
320+
// BroadcastFilterWithDeadline broadcasts a text message to filtered sessions with a custom write deadline.
321+
// If deadline is 0, uses Config.WriteWait.
322+
func (m *Melody) BroadcastFilterWithDeadline(msg []byte, deadline time.Duration, fn func(*Session) bool) error {
323+
if m.hub.closed() {
324+
return ErrClosed
325+
}
326+
327+
message := envelope{
328+
t: websocket.TextMessage,
329+
msg: msg,
330+
filter: fn,
331+
writeWait: deadline,
332+
}
333+
m.hub.broadcast(message)
334+
335+
return nil
336+
}
337+
338+
// BroadcastBinaryWithDeadline broadcasts a binary message with a custom write deadline.
339+
// If deadline is 0, uses Config.WriteWait.
340+
func (m *Melody) BroadcastBinaryWithDeadline(msg []byte, deadline time.Duration) error {
341+
if m.hub.closed() {
342+
return ErrClosed
343+
}
344+
345+
message := envelope{
346+
t: websocket.BinaryMessage,
347+
msg: msg,
348+
writeWait: deadline,
349+
}
350+
m.hub.broadcast(message)
351+
352+
return nil
353+
}
354+
355+
// BroadcastBinaryFilterWithDeadline broadcasts a binary message to filtered sessions with a custom write deadline.
356+
// If deadline is 0, uses Config.WriteWait.
357+
func (m *Melody) BroadcastBinaryFilterWithDeadline(msg []byte, deadline time.Duration, fn func(*Session) bool) error {
358+
if m.hub.closed() {
359+
return ErrClosed
360+
}
361+
362+
message := envelope{
363+
t: websocket.BinaryMessage,
364+
msg: msg,
365+
filter: fn,
366+
writeWait: deadline,
367+
}
368+
m.hub.broadcast(message)
369+
370+
return nil
371+
}

melody_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,3 +847,98 @@ func TestConcurrentMessageHandling(t *testing.T) {
847847
}
848848
})
849849
}
850+
851+
func TestPerMessageDeadline(t *testing.T) {
852+
done := make(chan bool)
853+
854+
ws := NewTestServer()
855+
ws.m.Config.WriteWait = 5 * time.Second
856+
857+
ws.m.HandleConnect(func(s *Session) {
858+
// New API with custom deadline
859+
err := s.WriteWithDeadline([]byte("test"), 30*time.Second)
860+
assert.Nil(t, err)
861+
close(done)
862+
})
863+
864+
server := httptest.NewServer(ws)
865+
defer server.Close()
866+
867+
conn := MustNewDialer(server.URL)
868+
defer conn.Close()
869+
870+
_, msg, err := conn.ReadMessage()
871+
assert.Nil(t, err)
872+
assert.Equal(t, "test", string(msg))
873+
874+
<-done
875+
}
876+
877+
func TestZeroDeadlineUsesConfig(t *testing.T) {
878+
done := make(chan bool)
879+
880+
ws := NewTestServer()
881+
ws.m.Config.WriteWait = 5 * time.Second
882+
883+
ws.m.HandleConnect(func(s *Session) {
884+
// Explicitly pass 0, should use Config.WriteWait
885+
err := s.WriteWithDeadline([]byte("test"), 0)
886+
assert.Nil(t, err)
887+
close(done)
888+
})
889+
890+
server := httptest.NewServer(ws)
891+
defer server.Close()
892+
893+
conn := MustNewDialer(server.URL)
894+
defer conn.Close()
895+
896+
_, msg, err := conn.ReadMessage()
897+
assert.Nil(t, err)
898+
assert.Equal(t, "test", string(msg))
899+
900+
<-done
901+
}
902+
903+
func TestSizeBasedDeadline(t *testing.T) {
904+
received := make(chan bool, 2)
905+
906+
ws := NewTestServer()
907+
ws.m.Config.WriteWait = 10 * time.Second
908+
909+
server := httptest.NewServer(ws)
910+
defer server.Close()
911+
912+
conn := MustNewDialer(server.URL)
913+
defer conn.Close()
914+
915+
// Small message - short deadline
916+
smallMsg := []byte("ping")
917+
deadline := 2 * time.Second
918+
err := ws.m.BroadcastWithDeadline(smallMsg, deadline)
919+
assert.Nil(t, err)
920+
921+
// Large message - long deadline
922+
largeMsg := make([]byte, 8192)
923+
for i := range largeMsg {
924+
largeMsg[i] = 'x'
925+
}
926+
longDeadline := 30 * time.Second
927+
err = ws.m.BroadcastWithDeadline(largeMsg, longDeadline)
928+
assert.Nil(t, err)
929+
930+
// Read first message
931+
_, msg1, err := conn.ReadMessage()
932+
assert.Nil(t, err)
933+
assert.Equal(t, smallMsg, msg1)
934+
received <- true
935+
936+
// Read second message
937+
_, msg2, err := conn.ReadMessage()
938+
assert.Nil(t, err)
939+
assert.Equal(t, largeMsg, msg2)
940+
received <- true
941+
942+
<-received
943+
<-received
944+
}

session.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ func (s *Session) writeRaw(message envelope) error {
3939
return ErrWriteClosed
4040
}
4141

42-
s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
42+
// Use per-message deadline if specified, otherwise use global config
43+
deadline := message.writeWait
44+
if deadline == 0 {
45+
deadline = s.melody.Config.WriteWait
46+
}
47+
48+
s.conn.SetWriteDeadline(time.Now().Add(deadline))
4349
err := s.conn.WriteMessage(message.t, message.msg)
4450

4551
if err != nil {
@@ -259,3 +265,35 @@ func (s *Session) RemoteAddr() net.Addr {
259265
func (s *Session) WebsocketConnection() *websocket.Conn {
260266
return s.conn
261267
}
268+
269+
// WriteWithDeadline writes a text message to the session with a custom write deadline.
270+
// If deadline is 0, uses Config.WriteWait.
271+
func (s *Session) WriteWithDeadline(msg []byte, deadline time.Duration) error {
272+
if s.closed() {
273+
return ErrSessionClosed
274+
}
275+
276+
s.writeMessage(envelope{
277+
t: websocket.TextMessage,
278+
msg: msg,
279+
writeWait: deadline,
280+
})
281+
282+
return nil
283+
}
284+
285+
// WriteBinaryWithDeadline writes a binary message to the session with a custom write deadline.
286+
// If deadline is 0, uses Config.WriteWait.
287+
func (s *Session) WriteBinaryWithDeadline(msg []byte, deadline time.Duration) error {
288+
if s.closed() {
289+
return ErrSessionClosed
290+
}
291+
292+
s.writeMessage(envelope{
293+
t: websocket.BinaryMessage,
294+
msg: msg,
295+
writeWait: deadline,
296+
})
297+
298+
return nil
299+
}

0 commit comments

Comments
 (0)