Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit fadb3fc

Browse files
committed
Added redis connection counter.
1 parent e9b85bb commit fadb3fc

File tree

10 files changed

+252
-22
lines changed

10 files changed

+252
-22
lines changed

config/websockets.php

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,6 @@
4242

4343
'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,
4444

45-
/*
46-
|--------------------------------------------------------------------------
47-
| Channel Manager
48-
|--------------------------------------------------------------------------
49-
|
50-
| When users subscribe or unsubscribe from specific channels,
51-
| the connections are stored to keep track of any interaction with the
52-
| WebSocket server.
53-
| You can however add your own implementation that will help the store
54-
| of the channels alongside their connections.
55-
|
56-
*/
57-
58-
'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
59-
6045
],
6146

6247
/*
@@ -191,6 +176,8 @@
191176

192177
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,
193178

179+
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
180+
194181
],
195182

196183
/*
@@ -214,6 +201,8 @@
214201

215202
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,
216203

204+
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,
205+
217206
],
218207

219208
],

src/PubSub/Drivers/LocalClient.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,28 @@ public function unsubscribe($appId, string $channel): bool
6666
return true;
6767
}
6868

69+
/**
70+
* Subscribe to the app's pubsub keyspace.
71+
*
72+
* @param mixed $appId
73+
* @return bool
74+
*/
75+
public function subscribeToApp($appId): bool
76+
{
77+
return true;
78+
}
79+
80+
/**
81+
* Unsubscribe from the app's pubsub keyspace.
82+
*
83+
* @param mixed $appId
84+
* @return bool
85+
*/
86+
public function unsubscribeFromApp($appId): bool
87+
{
88+
return true;
89+
}
90+
6991
/**
7092
* Add a member to a channel. To be called when they have
7193
* subscribed to the channel.
@@ -137,4 +159,15 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
137159

138160
return new FulfilledPromise($results);
139161
}
162+
163+
/**
164+
* Get the amount of unique connections.
165+
*
166+
* @param mixed $appId
167+
* @return null|int|\React\Promise\PromiseInterface
168+
*/
169+
public function appConnectionsCount($appId)
170+
{
171+
return null;
172+
}
140173
}

src/PubSub/Drivers/RedisClient.php

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
88
use Clue\React\Redis\Client;
99
use Clue\React\Redis\Factory;
10+
use Illuminate\Support\Facades\Cache;
1011
use Illuminate\Support\Str;
1112
use React\EventLoop\LoopInterface;
1213
use React\Promise\PromiseInterface;
@@ -42,6 +43,13 @@ class RedisClient extends LocalClient
4243
*/
4344
protected $subscribeClient;
4445

46+
/**
47+
* The Redis manager instance.
48+
*
49+
* @var \Illuminate\Redis\RedisManager
50+
*/
51+
protected $redis;
52+
4553
/**
4654
* Mapping of subscribed channels, where the key is the channel name,
4755
* and the value is the amount of connections which are subscribed to
@@ -60,6 +68,7 @@ class RedisClient extends LocalClient
6068
public function __construct()
6169
{
6270
$this->serverId = Str::uuid()->toString();
71+
$this->redis = Cache::getRedis();
6372
}
6473

6574
/**
@@ -175,6 +184,36 @@ public function unsubscribe($appId, string $channel): bool
175184
return true;
176185
}
177186

187+
/**
188+
* Subscribe to the app's pubsub keyspace.
189+
*
190+
* @param mixed $appId
191+
* @return bool
192+
*/
193+
public function subscribeToApp($appId): bool
194+
{
195+
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
196+
197+
$this->redis->hincrby($this->getTopicName($appId), 'connections', 1);
198+
199+
return true;
200+
}
201+
202+
/**
203+
* Unsubscribe from the app's pubsub keyspace.
204+
*
205+
* @param mixed $appId
206+
* @return bool
207+
*/
208+
public function unsubscribeFromApp($appId): bool
209+
{
210+
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
211+
212+
$this->redis->hincrby($this->getTopicName($appId), 'connections', -1);
213+
214+
return true;
215+
}
216+
178217
/**
179218
* Add a member to a channel. To be called when they have
180219
* subscribed to the channel.
@@ -258,6 +297,19 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
258297
});
259298
}
260299

300+
/**
301+
* Get the amount of unique connections.
302+
*
303+
* @param mixed $appId
304+
* @return null|int|\React\Promise\PromiseInterface
305+
*/
306+
public function appConnectionsCount($appId)
307+
{
308+
// Use the in-built Redis manager to avoid async run.
309+
310+
return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0;
311+
}
312+
261313
/**
262314
* Handle a message received from Redis on a specific channel.
263315
*
@@ -377,13 +429,19 @@ public function getServerId()
377429
* app ID and channel name.
378430
*
379431
* @param mixed $appId
380-
* @param string $channel
432+
* @param string|null $channel
381433
* @return string
382434
*/
383-
protected function getTopicName($appId, string $channel): string
435+
protected function getTopicName($appId, string $channel = null): string
384436
{
385437
$prefix = config('database.redis.options.prefix', null);
386438

387-
return "{$prefix}{$appId}:{$channel}";
439+
$hash = "{$prefix}{$appId}";
440+
441+
if ($channel) {
442+
$hash .= ":{$channel}";
443+
}
444+
445+
return $hash;
388446
}
389447
}

src/PubSub/ReplicationInterface.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,22 @@ public function subscribe($appId, string $channel): bool;
4545
*/
4646
public function unsubscribe($appId, string $channel): bool;
4747

48+
/**
49+
* Subscribe to the app's pubsub keyspace.
50+
*
51+
* @param mixed $appId
52+
* @return bool
53+
*/
54+
public function subscribeToApp($appId): bool;
55+
56+
/**
57+
* Unsubscribe from the app's pubsub keyspace.
58+
*
59+
* @param mixed $appId
60+
* @return bool
61+
*/
62+
public function unsubscribeFromApp($appId): bool;
63+
4864
/**
4965
* Add a member to a channel. To be called when they have
5066
* subscribed to the channel.
@@ -85,4 +101,12 @@ public function channelMembers($appId, string $channel): PromiseInterface;
85101
* @return PromiseInterface
86102
*/
87103
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;
104+
105+
/**
106+
* Get the amount of unique connections.
107+
*
108+
* @param mixed $appId
109+
* @return null|int|\React\Promise\PromiseInterface
110+
*/
111+
public function appConnectionsCount($appId);
88112
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers;
4+
5+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
6+
7+
class RedisChannelManager extends ArrayChannelManager
8+
{
9+
/**
10+
* The replicator driver.
11+
*
12+
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
13+
*/
14+
protected $replicator;
15+
16+
/**
17+
* Initialize the channel manager.
18+
*
19+
* @return void
20+
*/
21+
public function __construct()
22+
{
23+
$this->replicator = app(ReplicationInterface::class);
24+
}
25+
26+
/**
27+
* Get the connections count on the app.
28+
*
29+
* @param mixed $appId
30+
* @return int
31+
*/
32+
public function getConnectionCount($appId): int
33+
{
34+
return $this->replicator->appConnectionsCount($appId);
35+
}
36+
}

src/WebSockets/WebSocketHandler.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use BeyondCode\LaravelWebSockets\Apps\App;
66
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
77
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
8+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
89
use BeyondCode\LaravelWebSockets\QueryParameters;
910
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
1011
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
@@ -26,6 +27,13 @@ class WebSocketHandler implements MessageComponentInterface
2627
*/
2728
protected $channelManager;
2829

30+
/**
31+
* The replicator client.
32+
*
33+
* @var ReplicationInterface
34+
*/
35+
protected $replicator;
36+
2937
/**
3038
* Initialize a new handler.
3139
*
@@ -35,6 +43,7 @@ class WebSocketHandler implements MessageComponentInterface
3543
public function __construct(ChannelManager $channelManager)
3644
{
3745
$this->channelManager = $channelManager;
46+
$this->replicator = app(ReplicationInterface::class);
3847
}
3948

4049
/**
@@ -83,6 +92,8 @@ public function onClose(ConnectionInterface $connection)
8392
]);
8493

8594
StatisticsLogger::disconnection($connection->app->id);
95+
96+
$this->replicator->unsubscribeFromApp($connection->app->id);
8697
}
8798

8899
/**
@@ -99,6 +110,8 @@ public function onError(ConnectionInterface $connection, Exception $exception)
99110
$exception->getPayload()
100111
));
101112
}
113+
114+
$this->replicator->unsubscribeFromApp($connection->app->id);
102115
}
103116

104117
/**
@@ -203,6 +216,8 @@ protected function establishConnection(ConnectionInterface $connection)
203216

204217
StatisticsLogger::connection($connection->app->id);
205218

219+
$this->replicator->subscribeToApp($connection->app->id);
220+
206221
return $this;
207222
}
208223
}

src/WebSocketsServiceProvider.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ public function register()
5959
});
6060

6161
$this->app->singleton(ChannelManager::class, function () {
62-
$channelManager = config('websockets.managers.channel', ArrayChannelManager::class);
62+
$replicationDriver = config('websockets.replication.driver', 'local');
6363

64-
return new $channelManager;
64+
$class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class);
65+
66+
return new $class;
6567
});
6668

6769
$this->app->singleton(AppManager::class, function () {

tests/ConnectionTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
88
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\OriginNotAllowed;
99
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\UnknownAppKey;
10+
use Illuminate\Support\Facades\Cache;
1011

1112
class ConnectionTest extends TestCase
1213
{
@@ -31,6 +32,25 @@ public function known_app_keys_can_connect()
3132
/** @test */
3233
public function app_can_not_exceed_maximum_capacity()
3334
{
35+
$this->runOnlyOnLocalReplication();
36+
37+
$this->app['config']->set('websockets.apps.0.capacity', 2);
38+
39+
$this->getConnectedWebSocketConnection(['test-channel']);
40+
$this->getConnectedWebSocketConnection(['test-channel']);
41+
$this->expectException(ConnectionsOverCapacity::class);
42+
$this->getConnectedWebSocketConnection(['test-channel']);
43+
}
44+
45+
/** @test */
46+
public function app_can_not_exceed_maximum_capacity_on_redis_replication()
47+
{
48+
$this->runOnlyOnRedisReplication();
49+
50+
$redis = Cache::getRedis();
51+
52+
$redis->hdel('laravel_database_1234', 'connections');
53+
3454
$this->app['config']->set('websockets.apps.0.capacity', 2);
3555

3656
$this->getConnectedWebSocketConnection(['test-channel']);

0 commit comments

Comments
 (0)