Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit b584d0c

Browse files
snellingiofrancislavoie
authored andcommitted
Update with pub sub replication and redis driver
1 parent 8e422cb commit b584d0c

File tree

7 files changed

+174
-3
lines changed

7 files changed

+174
-3
lines changed

PubSub/PubSubInterface.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\PubSub;
4+
5+
use React\EventLoop\LoopInterface;
6+
7+
interface PubSubInterface
8+
{
9+
public function publish(string $appId, array $payload): bool;
10+
11+
public function subscribe(LoopInterface $loop): PubSubInterface;
12+
}

PubSub/Redis/RedisClient.php

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
4+
5+
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
6+
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
7+
use Clue\React\Block;
8+
use Clue\React\Redis\Client;
9+
use Clue\React\Redis\Factory;
10+
use Illuminate\Support\Str;
11+
use React\EventLoop\LoopInterface;
12+
use React\Promise\PromiseInterface;
13+
14+
class RedisClient implements PubSubInterface
15+
{
16+
17+
const REDIS_KEY = ':websockets:replication:';
18+
protected $apps;
19+
protected $loop;
20+
protected $serverId;
21+
protected $publishClient;
22+
protected $subscribeClient;
23+
24+
public function __construct()
25+
{
26+
$this->apps = collect(config('websockets.apps'));
27+
$this->serverId = Str::uuid()->toString();
28+
}
29+
30+
public function publish(string $appId, array $payload): bool
31+
{
32+
$payload['appId'] = $appId;
33+
$payload['serverId'] = $this->serverId;
34+
$this->publishClient->publish(self::REDIS_KEY, json_encode($payload));
35+
return true;
36+
}
37+
38+
public function subscribe(LoopInterface $loop): PubSubInterface
39+
{
40+
$this->loop = $loop;
41+
[$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop);
42+
return $this->publishClient;
43+
}
44+
45+
protected function publishConnection(): PromiseInterface
46+
{
47+
$connectionUri = $this->getConnectionUri();
48+
$factory = new Factory($this->loop);
49+
return $factory->createClient($connectionUri)->then(
50+
function (Client $client) {
51+
$this->publishClient = $client;
52+
return $this;
53+
}
54+
);
55+
}
56+
57+
58+
protected function subscribeConnection(): PromiseInterface
59+
{
60+
$connectionUri = $this->getConnectionUri();
61+
$factory = new Factory($this->loop);
62+
return $factory->createClient($connectionUri)->then(
63+
function (Client $client) {
64+
$this->subscribeClient = $client;
65+
$this->onConnected();
66+
return $this;
67+
}
68+
);
69+
}
70+
71+
protected function getConnectionUri()
72+
{
73+
$name = config('websockets.replication.connection') ?? 'default';
74+
$config = config('database.redis.' . $name);
75+
$host = $config['host'];
76+
$port = $config['port'] ? (':' . $config['port']) : ':6379';
77+
78+
$query = [];
79+
if ($config['password']) {
80+
$query['password'] = $config['password'];
81+
}
82+
if ($config['database']) {
83+
$query['database'] = $config['database'];
84+
}
85+
$query = http_build_query($query);
86+
87+
return "redis://$host$port" . ($query ? '?' . $query : '');
88+
}
89+
90+
protected function onConnected()
91+
{
92+
$this->subscribeClient->subscribe(self::REDIS_KEY);
93+
$this->subscribeClient->on('message', function ($channel, $payload) {
94+
$this->onMessage($channel, $payload);
95+
});
96+
}
97+
98+
protected function onMessage($channel, $payload)
99+
{
100+
$payload = json_decode($payload);
101+
102+
if ($this->serverId === $payload->serverId) {
103+
return false;
104+
}
105+
106+
/* @var $channelManager ChannelManager */
107+
$channelManager = app(ChannelManager::class);
108+
$channelSearch = $channelManager->find($payload->appId, $payload->channel);
109+
110+
if ($channelSearch === null) {
111+
return false;
112+
}
113+
114+
$channel->broadcast($payload);
115+
return true;
116+
}
117+
118+
}

composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
"php": "^7.1",
2626
"ext-json": "*",
2727
"cboden/ratchet": "^0.4.1",
28+
"clue/block-react": "^1.3",
2829
"clue/buzz-react": "^2.5",
30+
"clue/redis-react": "^2.2",
2931
"facade/ignition-contracts": "^1.0",
3032
"guzzlehttp/psr7": "^1.5",
3133
"illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0",

config/websockets.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@
124124
'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null),
125125
],
126126

127+
/*
128+
* You can enable replication to publish and subscribe to messages across the driver
129+
*/
130+
'replication' => [
131+
'enabled' => false,
132+
133+
'driver' => 'redis',
134+
135+
'redis' => [
136+
'connection' => 'default',
137+
],
138+
],
139+
140+
127141
/*
128142
* Channel Manager
129143
* This class handles how channel persistence is handled.

src/Console/StartWebSocketServer.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
use React\EventLoop\Factory as LoopFactory;
1111
use React\Dns\Resolver\Factory as DnsFactory;
1212
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
13+
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
14+
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient;
1315
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
1416
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
1517
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
@@ -45,6 +47,7 @@ public function handle()
4547
->configureConnectionLogger()
4648
->registerEchoRoutes()
4749
->registerCustomRoutes()
50+
->configurePubSubReplication()
4851
->startWebSocketServer();
4952
}
5053

@@ -135,6 +138,23 @@ protected function startWebSocketServer()
135138
->run();
136139
}
137140

141+
protected function configurePubSubReplication()
142+
{
143+
if (config('websockets.replication.enabled') !== true) {
144+
return $this;
145+
}
146+
147+
if (config('websockets.replication.driver') === 'redis') {
148+
$connection = (new RedisClient())->subscribe($this->loop);
149+
}
150+
151+
app()->singleton(PubSubInterface::class, function () use ($connection) {
152+
return $connection;
153+
});
154+
155+
return $this;
156+
}
157+
138158
protected function getDnsResolver(): ResolverInterface
139159
{
140160
if (! config('websockets.statistics.perform_dns_lookup')) {

src/HttpApi/Controllers/TriggerEventController.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public function __invoke(Request $request)
1919
'channel' => $channelName,
2020
'event' => $request->json()->get('name'),
2121
'data' => $request->json()->get('data'),
22-
], $request->json()->get('socket_id'));
22+
], $request->json()->get('socket_id'), $request->appId);
2323

2424
DashboardLogger::apiMessage(
2525
$request->appId,

src/WebSockets/Channels/Channel.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use stdClass;
66
use Illuminate\Support\Str;
77
use Ratchet\ConnectionInterface;
8+
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
89
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
910
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
1011

@@ -88,11 +89,15 @@ public function broadcast($payload)
8889

8990
public function broadcastToOthers(ConnectionInterface $connection, $payload)
9091
{
91-
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
92+
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
9293
}
9394

94-
public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
95+
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
9596
{
97+
if (config('websockets.replication.enabled') === true) {
98+
app()->get(PubSubInterface::class)->publish($appId, $payload);
99+
}
100+
96101
if (is_null($socketId)) {
97102
return $this->broadcast($payload);
98103
}

0 commit comments

Comments
 (0)