@@ -26,20 +26,23 @@ type Broker struct {
26
26
clients map [chan internal.Command ]string
27
27
ids map [string ]chan internal.Command
28
28
conf map [string ]context.Context
29
- subscriptions map [string ][]string
29
+ subscriptions map [string ][]chan bool
30
30
validateAuth Validator
31
+
32
+ pubsub internal.PubSuber
31
33
}
32
34
33
- func NewBroker (v Validator ) * Broker {
35
+ func NewBroker (v Validator , pubsub internal. PubSuber ) * Broker {
34
36
b := & Broker {
35
37
Broadcast : make (chan internal.Command , 1 ),
36
38
newConnections : make (chan ConnectionData ),
37
39
closingConnections : make (chan chan internal.Command ),
38
40
clients : make (map [chan internal.Command ]string ),
39
41
ids : make (map [string ]chan internal.Command ),
40
42
conf : make (map [string ]context.Context ),
41
- subscriptions : make (map [string ][]string ),
43
+ subscriptions : make (map [string ][]chan bool ),
42
44
validateAuth : v ,
45
+ pubsub : pubsub ,
43
46
}
44
47
45
48
go b .start ()
@@ -85,6 +88,13 @@ func (b *Broker) unsub(c chan internal.Command) {
85
88
fmt .Println ("cannot find connection id" )
86
89
}
87
90
91
+ subs , ok := b .subscriptions [id ]
92
+ if ok {
93
+ for _ , ch := range subs {
94
+ ch <- true
95
+ }
96
+ }
97
+
88
98
delete (b .ids , id )
89
99
}
90
100
@@ -140,12 +150,15 @@ func (b *Broker) Accept(w http.ResponseWriter, r *http.Request) {
140
150
}
141
151
142
152
func (b * Broker ) getTargets (msg internal.Command ) (sockets []chan internal.Command , payload internal.Command ) {
153
+ var sender chan internal.Command
154
+
143
155
if msg .SID != internal .SystemID {
144
- sender , ok := b .ids [msg .SID ]
156
+ s , ok := b .ids [msg .SID ]
145
157
if ! ok {
146
158
fmt .Println ("cannot find sender socket" , msg .SID )
147
159
return
148
160
}
161
+ sender = s
149
162
sockets = append (sockets , sender )
150
163
}
151
164
@@ -167,15 +180,27 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
167
180
168
181
payload = internal.Command {Type : internal .MsgTypeToken , Data : msg .Data }
169
182
case internal .MsgTypeJoin :
170
- members , ok := b .subscriptions [msg .Data ]
183
+ subs , ok := b .subscriptions [msg .SID ]
171
184
if ! ok {
172
- members = make ([]string , 0 )
185
+ subs = make ([]chan bool , 0 )
173
186
}
174
187
175
- members = append (members , msg .SID )
176
- b .subscriptions [msg .Data ] = members
188
+ closesub := make (chan bool )
189
+
190
+ subs = append (subs , closesub )
191
+ b .subscriptions [msg .SID ] = subs
192
+
193
+ go b .pubsub .Subscribe (sender , msg .Token , msg .Data , closesub )
177
194
178
195
payload = internal.Command {Type : internal .MsgTypeJoined , Data : msg .Data }
196
+ case internal .MsgTypePresence :
197
+ v , err := b .pubsub .Get (msg .Data )
198
+ if err != nil {
199
+ //TODO: Make sure it's because the channel key does not exists
200
+ v = "0"
201
+ }
202
+
203
+ payload = internal.Command {Type : internal .MsgTypePresence , Data : v }
179
204
case internal .MsgTypeChanIn :
180
205
if len (msg .Channel ) == 0 {
181
206
payload = internal.Command {Type : internal .MsgTypeError , Data : "no channel was specified" }
@@ -188,7 +213,8 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
188
213
return
189
214
}
190
215
191
- go b .Publish (msg , msg .Channel )
216
+ go b .pubsub .Publish (msg )
217
+ //go b.Publish(msg, msg.Channel)
192
218
193
219
payload = internal.Command {Type : internal .MsgTypeOk }
194
220
default :
@@ -198,24 +224,3 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
198
224
199
225
return
200
226
}
201
-
202
- // Publish sends a message to all socket in that channel
203
- func (b * Broker ) Publish (msg internal.Command , channel string ) {
204
- if msg .Type == internal .MsgTypeChanIn {
205
- msg .Type = internal .MsgTypeChanOut
206
- }
207
-
208
- members , ok := b .subscriptions [channel ]
209
- if ! ok {
210
- return
211
- }
212
-
213
- for _ , sid := range members {
214
- c , ok := b .ids [sid ]
215
- if ! ok {
216
- continue
217
- }
218
-
219
- c <- msg
220
- }
221
- }
0 commit comments