@@ -14,16 +14,30 @@ import (
14
14
type wsWriter struct {
15
15
session * Session
16
16
17
- conn net.Conn
18
- closer chan interface {}
19
- incoming chan interface {}
20
- sendCloseQueue chan []byte
17
+ conn net.Conn
18
+ closer chan interface {}
19
+ incoming chan outgoingEvent
20
+ sendCloseQueue chan []byte
21
+ readyRecv chan bool
22
+ waitingForIdentify bool
23
+ queuedEvents []* outgoingEvent
21
24
22
25
writer * wsutil.Writer
23
26
24
27
sendRatelimiter chan bool
25
28
}
26
29
30
+ func newWSWriter (conn net.Conn , session * Session , closer chan interface {}) * wsWriter {
31
+ return & wsWriter {
32
+ conn : conn ,
33
+ session : session ,
34
+ closer : closer ,
35
+ incoming : make (chan outgoingEvent , 10 ),
36
+ sendCloseQueue : make (chan []byte ),
37
+ readyRecv : make (chan bool ),
38
+ }
39
+ }
40
+
27
41
func (w * wsWriter ) Run () {
28
42
w .sendRatelimiter = make (chan bool )
29
43
go w .runSendRatelimiter ()
@@ -41,20 +55,34 @@ func (w *wsWriter) Run() {
41
55
}
42
56
return
43
57
case msg := <- w .incoming :
44
- var err error
45
- switch t := msg .(type ) {
46
- case []byte :
47
- err = w .writeRaw (t )
48
- default :
49
- err = w .writeJson (t )
58
+ if w .waitingForIdentify {
59
+ w .queuedEvents = append (w .queuedEvents , & msg )
60
+ continue
61
+ }
62
+
63
+ if msg .Operation == GatewayOPIdentify {
64
+ w .waitingForIdentify = true
50
65
}
51
66
67
+ err := w .writeJson (msg )
52
68
if err != nil {
53
69
w .session .log (LogError , "Error writing to gateway: %s" , err .Error ())
54
70
return
55
71
}
56
72
case body := <- w .sendCloseQueue :
57
73
w .sendClose (body )
74
+
75
+ case <- w .readyRecv :
76
+ w .waitingForIdentify = false
77
+ for _ , msg := range w .queuedEvents {
78
+ err := w .writeJson (msg )
79
+ if err != nil {
80
+ w .session .log (LogError , "Error writing to gateway: %s" , err .Error ())
81
+ return
82
+ }
83
+ }
84
+
85
+ w .queuedEvents = nil
58
86
}
59
87
}
60
88
}
@@ -106,7 +134,7 @@ func (w *wsWriter) sendClose(body []byte) error {
106
134
107
135
}
108
136
109
- func (w * wsWriter ) Queue (data interface {} ) {
137
+ func (w * wsWriter ) Queue (data outgoingEvent ) {
110
138
select {
111
139
case <- time .After (time .Second * 10 ):
112
140
case <- w .closer :
@@ -177,10 +205,10 @@ func (wh *wsHeartBeater) SendBeat() {
177
205
wh .Lock ()
178
206
wh .lastSend = time .Now ()
179
207
wh .Unlock ()
180
-
208
+
181
209
seq := atomic .LoadInt64 (wh .sequence )
182
210
183
- wh .writer .Queue (& outgoingEvent {
211
+ wh .writer .Queue (outgoingEvent {
184
212
Operation : GatewayOPHeartbeat ,
185
213
Data : seq ,
186
214
})
0 commit comments