12
12
use React \Promise \PromiseInterface ;
13
13
use stdClass ;
14
14
15
- class RedisClient implements ReplicationInterface
15
+ class RedisClient extends LocalClient
16
16
{
17
17
/**
18
18
* The running loop.
@@ -90,49 +90,29 @@ public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInte
90
90
}
91
91
92
92
/**
93
- * Handle a message received from Redis on a specific channel .
93
+ * Publish a message to a channel on behalf of a websocket user .
94
94
*
95
- * @param string $redisChannel
96
- * @param string $payload
97
- * @return void
95
+ * @param string $appId
96
+ * @param string $channel
97
+ * @param stdClass $payload
98
+ * @return bool
98
99
*/
99
- protected function onMessage (string $ redisChannel , string $ payload )
100
+ public function publish (string $ appId , string $ channel , stdClass $ payload ): bool
100
101
{
101
- $ payload = json_decode ($ payload );
102
-
103
- // Ignore messages sent by ourselves.
104
- if (isset ($ payload ->serverId ) && $ this ->serverId === $ payload ->serverId ) {
105
- return ;
106
- }
107
-
108
- // Pull out the app ID. See RedisPusherBroadcaster
109
- $ appId = $ payload ->appId ;
110
-
111
- // We need to put the channel name in the payload.
112
- // We strip the app ID from the channel name, websocket clients
113
- // expect the channel name to not include the app ID.
114
- $ payload ->channel = Str::after ($ redisChannel , "{$ appId }: " );
115
-
116
- $ channelManager = app (ChannelManager::class);
117
-
118
- // Load the Channel instance to sync.
119
- $ channel = $ channelManager ->find ($ appId , $ payload ->channel );
102
+ $ payload ->appId = $ appId ;
103
+ $ payload ->serverId = $ this ->getServerId ();
120
104
121
- // If no channel is found, none of our connections want to
122
- // receive this message, so we ignore it.
123
- if (! $ channel ) {
124
- return ;
125
- }
105
+ $ payload = json_encode ($ payload );
126
106
127
- $ socket = $ payload -> socket ?? null ;
107
+ $ this -> publishClient -> __call ( ' publish ' , [ " $ appId : $ channel " , $ payload ]) ;
128
108
129
- // Remove fields intended for internal use from the payload.
130
- unset($ payload ->socket );
131
- unset($ payload ->serverId );
132
- unset($ payload ->appId );
109
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED , [
110
+ 'channel ' => $ channel ,
111
+ 'serverId ' => $ this ->getServerId (),
112
+ 'payload ' => $ payload ,
113
+ ]);
133
114
134
- // Push the message out to connected websocket clients.
135
- $ channel ->broadcastToEveryoneExcept ($ payload , $ socket , $ appId , false );
115
+ return true ;
136
116
}
137
117
138
118
/**
@@ -153,7 +133,10 @@ public function subscribe(string $appId, string $channel): bool
153
133
$ this ->subscribedChannels ["$ appId: $ channel " ]++;
154
134
}
155
135
156
- DashboardLogger::replicatorSubscribed ($ appId , $ channel , $ this ->serverId );
136
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED , [
137
+ 'channel ' => $ channel ,
138
+ 'serverId ' => $ this ->getServerId (),
139
+ ]);
157
140
158
141
return true ;
159
142
}
@@ -181,25 +164,10 @@ public function unsubscribe(string $appId, string $channel): bool
181
164
unset($ this ->subscribedChannels ["$ appId: $ channel " ]);
182
165
}
183
166
184
- DashboardLogger::replicatorUnsubscribed ($ appId , $ channel , $ this ->serverId );
185
-
186
- return true ;
187
- }
188
-
189
- /**
190
- * Publish a message to a channel on behalf of a websocket user.
191
- *
192
- * @param string $appId
193
- * @param string $channel
194
- * @param stdClass $payload
195
- * @return bool
196
- */
197
- public function publish (string $ appId , string $ channel , stdClass $ payload ): bool
198
- {
199
- $ payload ->appId = $ appId ;
200
- $ payload ->serverId = $ this ->serverId ;
201
-
202
- $ this ->publishClient ->__call ('publish ' , ["$ appId: $ channel " , json_encode ($ payload )]);
167
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED , [
168
+ 'channel ' => $ channel ,
169
+ 'serverId ' => $ this ->getServerId (),
170
+ ]);
203
171
204
172
return true ;
205
173
}
@@ -217,6 +185,13 @@ public function publish(string $appId, string $channel, stdClass $payload): bool
217
185
public function joinChannel (string $ appId , string $ channel , string $ socketId , string $ data )
218
186
{
219
187
$ this ->publishClient ->__call ('hset ' , ["$ appId: $ channel " , $ socketId , $ data ]);
188
+
189
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL , [
190
+ 'channel ' => $ channel ,
191
+ 'serverId ' => $ this ->getServerId (),
192
+ 'socketId ' => $ socketId ,
193
+ 'data ' => $ data ,
194
+ ]);
220
195
}
221
196
222
197
/**
@@ -231,6 +206,12 @@ public function joinChannel(string $appId, string $channel, string $socketId, st
231
206
public function leaveChannel (string $ appId , string $ channel , string $ socketId )
232
207
{
233
208
$ this ->publishClient ->__call ('hdel ' , ["$ appId: $ channel " , $ socketId ]);
209
+
210
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL , [
211
+ 'channel ' => $ channel ,
212
+ 'serverId ' => $ this ->getServerId (),
213
+ 'socketId ' => $ socketId ,
214
+ ]);
234
215
}
235
216
236
217
/**
@@ -272,6 +253,62 @@ public function channelMemberCounts(string $appId, array $channelNames): Promise
272
253
});
273
254
}
274
255
256
+ /**
257
+ * Handle a message received from Redis on a specific channel.
258
+ *
259
+ * @param string $redisChannel
260
+ * @param string $payload
261
+ * @return void
262
+ */
263
+ protected function onMessage (string $ redisChannel , string $ payload )
264
+ {
265
+ $ payload = json_decode ($ payload );
266
+
267
+ // Ignore messages sent by ourselves.
268
+ if (isset ($ payload ->serverId ) && $ this ->getServerId () === $ payload ->serverId ) {
269
+ return ;
270
+ }
271
+
272
+ // Pull out the app ID. See RedisPusherBroadcaster
273
+ $ appId = $ payload ->appId ;
274
+
275
+ // We need to put the channel name in the payload.
276
+ // We strip the app ID from the channel name, websocket clients
277
+ // expect the channel name to not include the app ID.
278
+ $ payload ->channel = Str::after ($ redisChannel , "{$ appId }: " );
279
+
280
+ $ channelManager = app (ChannelManager::class);
281
+
282
+ // Load the Channel instance to sync.
283
+ $ channel = $ channelManager ->find ($ appId , $ payload ->channel );
284
+
285
+ // If no channel is found, none of our connections want to
286
+ // receive this message, so we ignore it.
287
+ if (! $ channel ) {
288
+ return ;
289
+ }
290
+
291
+ $ socket = $ payload ->socket ?? null ;
292
+ $ serverId = $ payload ->serverId ?? null ;
293
+
294
+ // Remove fields intended for internal use from the payload.
295
+ unset($ payload ->socket );
296
+ unset($ payload ->serverId );
297
+ unset($ payload ->appId );
298
+
299
+ // Push the message out to connected websocket clients.
300
+ $ channel ->broadcastToEveryoneExcept ($ payload , $ socket , $ appId , false );
301
+
302
+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED , [
303
+ 'channel ' => $ channel ->getChannelName (),
304
+ 'redisChannel ' => $ redisChannel ,
305
+ 'serverId ' => $ this ->getServer (),
306
+ 'incomingServerId ' => $ serverId ,
307
+ 'incomingSocketId ' => $ socket ,
308
+ 'payload ' => $ payload ,
309
+ ]);
310
+ }
311
+
275
312
/**
276
313
* Build the Redis connection URL from Laravel database config.
277
314
*
0 commit comments