Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit d7c30f3

Browse files
cleanup & refactor of pubsub code
1 parent ed55034 commit d7c30f3

File tree

10 files changed

+170
-47
lines changed

10 files changed

+170
-47
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
"require-dev": {
4343
"mockery/mockery": "^1.2",
4444
"orchestra/testbench": "3.7.* || 3.8.* || ^4.0",
45-
"phpunit/phpunit": "^7.0 || ^8.0"
45+
"phpunit/phpunit": "^7.0 || ^8.0",
46+
"predis/predis": "^1.1"
4647
},
4748
"autoload": {
4849
"psr-4": {

phpunit.xml.dist

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@
2727
</logging>
2828
<php>
2929
<env name="DB_CONNECTION" value="testing"/>
30+
<env name="REDIS_HOST" value="redis"/>
3031
</php>
3132
</phpunit>

src/Console/StartWebSocketServer.php

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
1313
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
1414
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
15-
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient;
1615
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
1716
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
1817
use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory;
@@ -117,7 +116,6 @@ protected function registerEchoRoutes()
117116
protected function registerCustomRoutes()
118117
{
119118
WebSocketsRouter::customRoutes();
120-
121119
return $this;
122120
}
123121

@@ -140,15 +138,7 @@ protected function startWebSocketServer()
140138

141139
protected function configurePubSubReplication()
142140
{
143-
if (config('websockets.replication.enabled') !== true) {
144-
return $this;
145-
}
146-
147-
if (config('websockets.replication.driver') === 'redis') {
148-
$this->laravel->singleton(ReplicationInterface::class, function () {
149-
return (new RedisClient())->boot($this->loop);
150-
});
151-
}
141+
app(ReplicationInterface::class)->boot($this->loop);
152142

153143
return $this;
154144
}

src/PubSub/Redis/RedisPusherBroadcaster.php renamed to src/PubSub/Broadcasters/RedisPusherBroadcaster.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
3+
namespace BeyondCode\LaravelWebSockets\PubSub\Broadcasters;
44

55
use Pusher\Pusher;
66
use Illuminate\Support\Arr;

src/PubSub/Drivers/EmptyClient.php

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4+
5+
use stdClass;
6+
use React\EventLoop\LoopInterface;
7+
use React\Promise\FulfilledPromise;
8+
use React\Promise\PromiseInterface;
9+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
10+
11+
class EmptyClient implements ReplicationInterface
12+
{
13+
14+
/**
15+
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
16+
*
17+
* @param LoopInterface $loop
18+
* @return self
19+
*/
20+
public function boot(LoopInterface $loop) : ReplicationInterface
21+
{
22+
return $this;
23+
}
24+
25+
/**
26+
* Publish a payload on a specific channel, for a specific app.
27+
*
28+
* @param string $appId
29+
* @param string $channel
30+
* @param stdClass $payload
31+
* @return bool
32+
*/
33+
public function publish(string $appId, string $channel, stdClass $payload) : bool
34+
{
35+
return true;
36+
}
37+
38+
/**
39+
* Subscribe to receive messages for a channel.
40+
*
41+
* @param string $appId
42+
* @param string $channel
43+
* @return bool
44+
*/
45+
public function subscribe(string $appId, string $channel) : bool
46+
{
47+
return true;
48+
}
49+
50+
/**
51+
* Unsubscribe from a channel.
52+
*
53+
* @param string $appId
54+
* @param string $channel
55+
* @return bool
56+
*/
57+
public function unsubscribe(string $appId, string $channel) : bool
58+
{
59+
return true;
60+
}
61+
62+
/**
63+
* Add a member to a channel. To be called when they have
64+
* subscribed to the channel.
65+
*
66+
* @param string $appId
67+
* @param string $channel
68+
* @param string $socketId
69+
* @param string $data
70+
*/
71+
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
72+
{
73+
74+
}
75+
76+
/**
77+
* Remove a member from the channel. To be called when they have
78+
* unsubscribed from the channel.
79+
*
80+
* @param string $appId
81+
* @param string $channel
82+
* @param string $socketId
83+
*/
84+
public function leaveChannel(string $appId, string $channel, string $socketId)
85+
{
86+
87+
}
88+
89+
/**
90+
* Retrieve the full information about the members in a presence channel.
91+
*
92+
* @param string $appId
93+
* @param string $channel
94+
* @return PromiseInterface
95+
*/
96+
public function channelMembers(string $appId, string $channel) : PromiseInterface
97+
{
98+
return new FulfilledPromise(null);
99+
}
100+
101+
/**
102+
* Get the amount of users subscribed for each presence channel.
103+
*
104+
* @param string $appId
105+
* @param array $channelNames
106+
* @return PromiseInterface
107+
*/
108+
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
109+
{
110+
return new FulfilledPromise(null);
111+
}
112+
}

src/PubSub/Redis/RedisClient.php renamed to src/PubSub/Drivers/RedisClient.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
3+
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
44

55
use stdClass;
66
use Illuminate\Support\Str;
@@ -129,6 +129,7 @@ protected function onMessage(string $redisChannel, string $payload)
129129
*/
130130
public function subscribe(string $appId, string $channel): bool
131131
{
132+
$this->publishClient->__call('hset', ["$appId:$channel", 541561516, "qsgdqgsd"]);
132133
if (! isset($this->subscribedChannels["$appId:$channel"])) {
133134
// We're not subscribed to the channel yet, subscribe and set the count to 1
134135
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]);

src/WebSockets/Channels/Channel.php

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ class Channel
1414
/** @var string */
1515
protected $channelName;
1616

17+
/**
18+
* @var ReplicationInterface
19+
*/
20+
protected $pubSub;
21+
1722
/** @var \Ratchet\ConnectionInterface[] */
1823
protected $subscribedConnections = [];
1924

2025
public function __construct(string $channelName)
2126
{
2227
$this->channelName = $channelName;
28+
$this->pubSub = app(ReplicationInterface::class);
2329
}
2430

2531
public function getChannelName(): string
@@ -48,7 +54,7 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa
4854
$signature .= ":{$payload->channel_data}";
4955
}
5056

51-
if (! hash_equals(
57+
if (!hash_equals(
5258
hash_hmac('sha256', $signature, $connection->app->secret),
5359
Str::after($payload->auth, ':'))
5460
) {
@@ -63,11 +69,8 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
6369
{
6470
$this->saveConnection($connection);
6571

66-
if (config('websockets.replication.enabled') === true) {
67-
// Subscribe for broadcasted messages from the pub/sub backend
68-
app(ReplicationInterface::class)
69-
->subscribe($connection->app->id, $this->channelName);
70-
}
72+
// Subscribe to broadcasted messages from the pub/sub backend
73+
$this->pubSub->subscribe($connection->app->id, $this->channelName);
7174

7275
$connection->send(json_encode([
7376
'event' => 'pusher_internal:subscription_succeeded',
@@ -79,13 +82,10 @@ public function unsubscribe(ConnectionInterface $connection)
7982
{
8083
unset($this->subscribedConnections[$connection->socketId]);
8184

82-
if (config('websockets.replication.enabled') === true) {
83-
// Unsubscribe from the pub/sub backend
84-
app(ReplicationInterface::class)
85-
->unsubscribe($connection->app->id, $this->channelName);
86-
}
85+
// Unsubscribe from the pub/sub backend
86+
$this->pubSub->unsubscribe($connection->app->id, $this->channelName);
8787

88-
if (! $this->hasConnections()) {
88+
if (!$this->hasConnections()) {
8989
DashboardLogger::vacated($connection, $this->channelName);
9090
}
9191
}
@@ -96,7 +96,7 @@ protected function saveConnection(ConnectionInterface $connection)
9696

9797
$this->subscribedConnections[$connection->socketId] = $connection;
9898

99-
if (! $hadConnectionsPreviously) {
99+
if (!$hadConnectionsPreviously) {
100100
DashboardLogger::occupied($connection, $this->channelName);
101101
}
102102

@@ -112,11 +112,8 @@ public function broadcast($payload)
112112

113113
public function broadcastToOthers(ConnectionInterface $connection, $payload)
114114
{
115-
if (config('websockets.replication.enabled') === true) {
116-
// Also broadcast via the other websocket servers
117-
app(ReplicationInterface::class)
118-
->publish($connection->app->id, $this->channelName, $payload);
119-
}
115+
// Also broadcast via the other websocket servers
116+
$this->pubSub->publish($connection->app->id, $this->channelName, $payload);
120117

121118
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
122119
}

src/WebSocketsServiceProvider.php

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
namespace BeyondCode\LaravelWebSockets;
44

5+
use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster;
6+
use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient;
7+
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
8+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
59
use Pusher\Pusher;
610
use Psr\Log\LoggerInterface;
711
use Illuminate\Support\Facades\Gate;
@@ -11,7 +15,6 @@
1115
use BeyondCode\LaravelWebSockets\Server\Router;
1216
use BeyondCode\LaravelWebSockets\Apps\AppProvider;
1317
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
14-
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster;
1518
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
1619
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
1720
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard;
@@ -23,30 +26,47 @@
2326

2427
class WebSocketsServiceProvider extends ServiceProvider
2528
{
26-
public function boot(BroadcastManager $broadcastManager)
29+
public function boot()
2730
{
2831
$this->publishes([
29-
__DIR__.'/../config/websockets.php' => base_path('config/websockets.php'),
32+
__DIR__ . '/../config/websockets.php' => base_path('config/websockets.php'),
3033
], 'config');
3134

32-
if (! class_exists('CreateWebSocketsStatisticsEntries')) {
35+
if (!class_exists('CreateWebSocketsStatisticsEntries')) {
3336
$this->publishes([
34-
__DIR__.'/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/'.date('Y_m_d_His', time()).'_create_websockets_statistics_entries_table.php'),
37+
__DIR__ . '/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/' . date('Y_m_d_His', time()) . '_create_websockets_statistics_entries_table.php'),
3538
], 'migrations');
3639
}
3740

3841
$this
3942
->registerRoutes()
4043
->registerDashboardGate();
4144

42-
$this->loadViewsFrom(__DIR__.'/../resources/views/', 'websockets');
45+
$this->loadViewsFrom(__DIR__ . '/../resources/views/', 'websockets');
4346

4447
$this->commands([
4548
Console\StartWebSocketServer::class,
4649
Console\CleanStatistics::class,
4750
]);
4851

49-
$broadcastManager->extend('redis-pusher', function ($app, array $config) {
52+
$this->configurePubSub();
53+
54+
}
55+
56+
protected function configurePubSub()
57+
{
58+
if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') {
59+
$this->app->singleton(ReplicationInterface::class, function () {
60+
return (new EmptyClient());
61+
});
62+
return;
63+
}
64+
65+
$this->app->singleton(ReplicationInterface::class, function () {
66+
return (new RedisClient())->boot($this->loop);
67+
});
68+
69+
app(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) {
5070
$pusher = new Pusher(
5171
$config['key'], $config['secret'],
5272
$config['app_id'], $config['options'] ?? []
@@ -67,7 +87,7 @@ public function boot(BroadcastManager $broadcastManager)
6787

6888
public function register()
6989
{
70-
$this->mergeConfigFrom(__DIR__.'/../config/websockets.php', 'websockets');
90+
$this->mergeConfigFrom(__DIR__ . '/../config/websockets.php', 'websockets');
7191

7292
$this->app->singleton('websockets.router', function () {
7393
return new Router();
@@ -88,7 +108,7 @@ protected function registerRoutes()
88108
Route::prefix(config('websockets.path'))->group(function () {
89109
Route::middleware(config('websockets.middleware', [AuthorizeDashboard::class]))->group(function () {
90110
Route::get('/', ShowDashboard::class);
91-
Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']);
111+
Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']);
92112
Route::post('auth', AuthenticateDashboard::class);
93113
Route::post('event', SendMessage::class);
94114
});

src/PubSub/Fake/FakeReplication.php renamed to tests/Mocks/FakeReplicationClient.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
<?php
22

3-
namespace BeyondCode\LaravelWebSockets\PubSub\Fake;
3+
namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
44

55
use stdClass;
66
use React\EventLoop\LoopInterface;
77
use React\Promise\FulfilledPromise;
88
use React\Promise\PromiseInterface;
99
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
1010

11-
class FakeReplication implements ReplicationInterface
11+
class FakeReplicationClient implements ReplicationInterface
1212
{
1313
protected $channels = [];
1414

0 commit comments

Comments
 (0)