Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 223a789

Browse files
committed
Added local removal for obsolete connections.
1 parent 1f6e714 commit 223a789

File tree

5 files changed

+209
-6
lines changed

5 files changed

+209
-6
lines changed

src/ChannelManagers/LocalChannelManager.php

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
88
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
99
use BeyondCode\LaravelWebSockets\Helpers;
10+
use Carbon\Carbon;
11+
use Illuminate\Cache\ArrayLock;
12+
use Illuminate\Cache\ArrayStore;
1013
use Illuminate\Support\Str;
1114
use Ratchet\ConnectionInterface;
1215
use React\EventLoop\LoopInterface;
@@ -43,6 +46,14 @@ class LocalChannelManager implements ChannelManager
4346
*/
4447
protected $acceptsNewConnections = true;
4548

49+
/**
50+
* The lock name to use on Array to avoid multiple
51+
* actions that might lead to multiple processings.
52+
*
53+
* @var string
54+
*/
55+
protected static $lockName = 'laravel-websockets:channel-manager:lock';
56+
4657
/**
4758
* Create a new channel manager instance.
4859
*
@@ -398,7 +409,9 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
398409
*/
399410
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
400411
{
401-
return Helpers::createFulfilledPromise(true);
412+
$connection->lastPongedAt = Carbon::now();
413+
414+
return $this->updateConnectionInChannels($connection);
402415
}
403416

404417
/**
@@ -408,7 +421,43 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
408421
*/
409422
public function removeObsoleteConnections(): PromiseInterface
410423
{
411-
return Helpers::createFulfilledPromise(true);
424+
if (! $this->lock()->acquire()) {
425+
return Helpers::createFulfilledPromise(false);
426+
}
427+
428+
$this->getLocalConnections()->then(function ($connections) {
429+
foreach ($connections as $connection) {
430+
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
431+
432+
if ($differenceInSeconds > 120) {
433+
$this->unsubscribeFromAllChannels($connection);
434+
}
435+
}
436+
});
437+
438+
return Helpers::createFulfilledPromise(
439+
$this->lock()->release()
440+
);
441+
}
442+
443+
/**
444+
* Update the connection in all channels.
445+
*
446+
* @param ConnectionInterface $connection
447+
* @return PromiseInterface[bool]
448+
*/
449+
public function updateConnectionInChannels($connection): PromiseInterface
450+
{
451+
return $this->getLocalChannels($connection->app->id)
452+
->then(function ($channels) use ($connection) {
453+
foreach ($channels as $channel) {
454+
if ($channel->hasConnection($connection)) {
455+
$channel->saveConnection($connection);
456+
}
457+
}
458+
459+
return true;
460+
});
412461
}
413462

414463
/**
@@ -452,4 +501,14 @@ protected function getChannelClassName(string $channelName): string
452501

453502
return Channel::class;
454503
}
504+
505+
/**
506+
* Get a new ArrayLock instance to avoid race conditions.
507+
*
508+
* @return \Illuminate\Cache\CacheLock
509+
*/
510+
protected function lock()
511+
{
512+
return new ArrayLock(new ArrayStore, static::$lockName, 0);
513+
}
455514
}

src/ChannelManagers/RedisChannelManager.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class RedisChannelManager extends LocalChannelManager
5959
*
6060
* @var string
6161
*/
62-
protected static $redisLockName = 'laravel-websockets:channel-manager:lock';
62+
protected static $lockName = 'laravel-websockets:channel-manager:lock';
6363

6464
/**
6565
* Create a new channel manager instance.
@@ -768,7 +768,7 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
768768
*/
769769
protected function lock()
770770
{
771-
return new RedisLock($this->redis, static::$redisLockName, 0);
771+
return new RedisLock($this->redis, static::$lockName, 0);
772772
}
773773

774774
/**

src/Channels/Channel.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b
100100
*/
101101
public function unsubscribe(ConnectionInterface $connection): bool
102102
{
103-
if (! isset($this->connections[$connection->socketId])) {
103+
if (! $this->hasConnection($connection)) {
104104
return false;
105105
}
106106

@@ -109,13 +109,24 @@ public function unsubscribe(ConnectionInterface $connection): bool
109109
return true;
110110
}
111111

112+
/**
113+
* Check if the given connection exists.
114+
*
115+
* @param \Ratchet\ConnectionInterface $connection
116+
* @return bool
117+
*/
118+
public function hasConnection(ConnectionInterface $connection): bool
119+
{
120+
return isset($this->connections[$connection->socketId]);
121+
}
122+
112123
/**
113124
* Store the connection to the subscribers list.
114125
*
115126
* @param \Ratchet\ConnectionInterface $connection
116127
* @return void
117128
*/
118-
protected function saveConnection(ConnectionInterface $connection)
129+
public function saveConnection(ConnectionInterface $connection)
119130
{
120131
$this->connections[$connection->socketId] = $connection;
121132
}

src/Server/WebSocketHandler.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public function onOpen(ConnectionInterface $connection)
5757

5858
$this->channelManager->subscribeToApp($connection->app->id);
5959

60+
$this->channelManager->connectionPonged($connection);
61+
6062
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [
6163
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
6264
'socketId' => $connection->socketId,

tests/LocalPongRemovalTest.php

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\Test;
4+
5+
use Carbon\Carbon;
6+
7+
class LocalPongRemovalTest extends TestCase
8+
{
9+
public function test_not_ponged_connections_do_get_removed_on_local_for_public_channels()
10+
{
11+
$this->runOnlyOnLocalReplication();
12+
13+
$activeConnection = $this->newActiveConnection(['public-channel']);
14+
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
15+
16+
// The active connection just pinged, it should not be closed.
17+
$activeConnection->lastPongedAt = Carbon::now();
18+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
19+
20+
$this->channelManager->updateConnectionInChannels($activeConnection);
21+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
22+
23+
$this->channelManager
24+
->getGlobalConnectionsCount('1234', 'public-channel')
25+
->then(function ($count) {
26+
$this->assertEquals(2, $count);
27+
});
28+
29+
$this->channelManager->removeObsoleteConnections();
30+
31+
$this->channelManager
32+
->getGlobalConnectionsCount('1234', 'public-channel')
33+
->then(function ($count) {
34+
$this->assertEquals(1, $count);
35+
});
36+
37+
$this->channelManager
38+
->getLocalConnections()
39+
->then(function ($connections) use ($activeConnection) {
40+
$connection = $connections[$activeConnection->socketId];
41+
42+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
43+
});
44+
}
45+
46+
public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels()
47+
{
48+
$this->runOnlyOnLocalReplication();
49+
50+
$activeConnection = $this->newPrivateConnection('private-channel');
51+
$obsoleteConnection = $this->newPrivateConnection('private-channel');
52+
53+
// The active connection just pinged, it should not be closed.
54+
$activeConnection->lastPongedAt = Carbon::now();
55+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
56+
57+
$this->channelManager->updateConnectionInChannels($activeConnection);
58+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
59+
60+
$this->channelManager
61+
->getGlobalConnectionsCount('1234', 'private-channel')
62+
->then(function ($count) {
63+
$this->assertEquals(2, $count);
64+
});
65+
66+
$this->channelManager->removeObsoleteConnections();
67+
68+
$this->channelManager
69+
->getGlobalConnectionsCount('1234', 'private-channel')
70+
->then(function ($count) {
71+
$this->assertEquals(1, $count);
72+
});
73+
74+
$this->channelManager
75+
->getLocalConnections()
76+
->then(function ($connections) use ($activeConnection) {
77+
$connection = $connections[$activeConnection->socketId];
78+
79+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
80+
});
81+
}
82+
83+
public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels()
84+
{
85+
$this->runOnlyOnLocalReplication();
86+
87+
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
88+
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
89+
90+
// The active connection just pinged, it should not be closed.
91+
$activeConnection->lastPongedAt = Carbon::now();
92+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
93+
94+
$this->channelManager->updateConnectionInChannels($activeConnection);
95+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
96+
97+
$this->channelManager
98+
->getGlobalConnectionsCount('1234', 'presence-channel')
99+
->then(function ($count) {
100+
$this->assertEquals(2, $count);
101+
});
102+
103+
$this->channelManager
104+
->getChannelMembers('1234', 'presence-channel')
105+
->then(function ($members) {
106+
$this->assertCount(2, $members);
107+
});
108+
109+
$this->channelManager->removeObsoleteConnections();
110+
111+
$this->channelManager
112+
->getGlobalConnectionsCount('1234', 'presence-channel')
113+
->then(function ($count) {
114+
$this->assertEquals(1, $count);
115+
});
116+
117+
$this->channelManager
118+
->getLocalConnections()
119+
->then(function ($connections) use ($activeConnection) {
120+
$connection = $connections[$activeConnection->socketId];
121+
122+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
123+
});
124+
125+
$this->channelManager
126+
->getChannelMembers('1234', 'presence-channel')
127+
->then(function ($members) {
128+
$this->assertCount(1, $members);
129+
});
130+
}
131+
}

0 commit comments

Comments
 (0)