Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit ec47925

Browse files
committed
Added soft closes for connections on SIGTERM/SIGINT
1 parent 86fbf76 commit ec47925

File tree

12 files changed

+278
-12
lines changed

12 files changed

+278
-12
lines changed

composer.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
"orchestra/database": "^4.0|^5.0|^6.0",
5757
"phpunit/phpunit": "^8.0|^9.0"
5858
},
59+
"suggest": {
60+
"ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown."
61+
},
5962
"autoload": {
6063
"psr-4": {
6164
"BeyondCode\\LaravelWebSockets\\": "src/"

src/ChannelManagers/LocalChannelManager.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
2929
*/
3030
protected $users = [];
3131

32+
/**
33+
* Wether the current instance accepts new connections.
34+
*
35+
* @var bool
36+
*/
37+
protected $acceptsNewConnections = true;
38+
3239
/**
3340
* Create a new channel manager instance.
3441
*
@@ -71,6 +78,28 @@ public function findOrCreate($appId, string $channel)
7178
return $this->channels[$appId][$channel];
7279
}
7380

81+
/**
82+
* Get the local connections, regardless of the channel
83+
* they are connected to.
84+
*
85+
* @return \React\Promise\PromiseInterface
86+
*/
87+
public function getLocalConnections(): PromiseInterface
88+
{
89+
$connections = collect($this->channels)
90+
->map(function ($channelsWithConnections, $appId) {
91+
return collect($channelsWithConnections)->values();
92+
})
93+
->values()->collapse()
94+
->map(function ($channel) {
95+
return collect($channel->getConnections());
96+
})
97+
->values()->collapse()
98+
->toArray();
99+
100+
return new FulfilledPromise($connections);
101+
}
102+
74103
/**
75104
* Get all channels for a specific app
76105
* for the current instance.
@@ -313,6 +342,29 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
313342
return new FulfilledPromise($results);
314343
}
315344

345+
/**
346+
* Mark the current instance as unable to accept new connections.
347+
*
348+
* @return $this
349+
*/
350+
public function declineNewConnections()
351+
{
352+
$this->acceptsNewConnections = false;
353+
354+
return $this;
355+
}
356+
357+
/**
358+
* Check if the current server instance
359+
* accepts new connections.
360+
*
361+
* @return bool
362+
*/
363+
public function acceptsNewConnections(): bool
364+
{
365+
return $this->acceptsNewConnections;
366+
}
367+
316368
/**
317369
* Get the channel class by the channel name.
318370
*

src/ChannelManagers/RedisChannelManager.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
6767
$this->serverId = Str::uuid()->toString();
6868
}
6969

70+
/**
71+
* Get the local connections, regardless of the channel
72+
* they are connected to.
73+
*
74+
* @return \React\Promise\PromiseInterface
75+
*/
76+
public function getLocalConnections(): PromiseInterface
77+
{
78+
return parent::getLocalConnections();
79+
}
80+
7081
/**
7182
* Get all channels for a specific app
7283
* for the current instance.
@@ -108,9 +119,9 @@ public function unsubscribeFromAllChannels(ConnectionInterface $connection)
108119
$connection, $channel, new stdClass
109120
);
110121
}
122+
})->then(function () use ($connection) {
123+
parent::unsubscribeFromAllChannels($connection);
111124
});
112-
113-
parent::unsubscribeFromAllChannels($connection);
114125
}
115126

116127
/**

src/Console/Commands/StartServer.php

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class StartServer extends Command
2626
{--disable-statistics : Disable the statistics tracking.}
2727
{--statistics-interval= : The amount of seconds to tick between statistics saving.}
2828
{--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.}
29-
{--test : Prepare the server, but do not start it.}
29+
{--loop : Programatically inject the loop.}
3030
';
3131

3232
/**
@@ -79,6 +79,8 @@ public function handle()
7979

8080
$this->configureRoutes();
8181

82+
$this->configurePcntlSignal();
83+
8284
$this->startServer();
8385
}
8486

@@ -156,6 +158,31 @@ protected function configureRoutes()
156158
WebSocketRouter::routes();
157159
}
158160

161+
/**
162+
* Configure the PCNTL signals for soft shutdown.
163+
*
164+
* @return void
165+
*/
166+
protected function configurePcntlSignal()
167+
{
168+
// When the process receives a SIGTERM or a SIGINT
169+
// signal, it should mark the server as unavailable
170+
// to receive new connections, close the current connections,
171+
// then stopping the loop.
172+
173+
$this->loop->addSignal(SIGTERM, function () {
174+
$this->line('Closing existing connections...');
175+
176+
$this->triggerSoftShutdown();
177+
});
178+
179+
$this->loop->addSignal(SIGINT, function () {
180+
$this->line('Closing existing connections...');
181+
182+
$this->triggerSoftShutdown();
183+
});
184+
}
185+
159186
/**
160187
* Configure the HTTP logger class.
161188
*
@@ -209,14 +236,6 @@ protected function startServer()
209236

210237
$this->buildServer();
211238

212-
// For testing, just boot up the server, run it
213-
// but exit after the next tick.
214-
if ($this->option('test')) {
215-
$this->loop->futureTick(function () {
216-
$this->loop->stop();
217-
});
218-
}
219-
220239
$this->server->run();
221240
}
222241

@@ -231,6 +250,10 @@ protected function buildServer()
231250
$this->option('host'), $this->option('port')
232251
);
233252

253+
if ($loop = $this->option('loop')) {
254+
$this->loop = $loop;
255+
}
256+
234257
$this->server = $this->server
235258
->setLoop($this->loop)
236259
->withRoutes(WebSocketRouter::getRoutes())
@@ -249,4 +272,29 @@ protected function getLastRestart()
249272
'beyondcode:websockets:restart', 0
250273
);
251274
}
275+
276+
/**
277+
* Trigger a soft shutdown for the process.
278+
*
279+
* @return void
280+
*/
281+
protected function triggerSoftShutdown()
282+
{
283+
$channelManager = $this->laravel->make(ChannelManager::class);
284+
285+
// Close the new connections allowance on this server.
286+
$channelManager->declineNewConnections();
287+
288+
// Get all local connections and close them. They will
289+
// be automatically be unsubscribed from all channels.
290+
$channelManager->getLocalConnections()
291+
->then(function ($connections) use ($channelManager) {
292+
foreach ($connections as $connection) {
293+
$connection->close();
294+
}
295+
})
296+
->then(function () {
297+
$this->loop->stop();
298+
});
299+
}
252300
}

src/Contracts/ChannelManager.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ public function find($appId, string $channel);
3636
*/
3737
public function findOrCreate($appId, string $channel);
3838

39+
/**
40+
* Get the local connections, regardless of the channel
41+
* they are connected to.
42+
*
43+
* @return \React\Promise\PromiseInterface
44+
*/
45+
public function getLocalConnections(): PromiseInterface;
46+
3947
/**
4048
* Get all channels for a specific app
4149
* for the current instance.

src/Server/WebSocketHandler.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public function __construct(ChannelManager $channelManager)
3939
*/
4040
public function onOpen(ConnectionInterface $connection)
4141
{
42+
if (! $this->connectionCanBeMade($connection)) {
43+
return $connection->close();
44+
}
45+
4246
$this->verifyAppKey($connection)
4347
->verifyOrigin($connection)
4448
->limitConcurrentConnections($connection)
@@ -69,6 +73,10 @@ public function onOpen(ConnectionInterface $connection)
6973
*/
7074
public function onMessage(ConnectionInterface $connection, MessageInterface $message)
7175
{
76+
if (! isset($connection->app)) {
77+
return;
78+
}
79+
7280
Messages\PusherMessageFactory::createForMessage(
7381
$message, $connection, $this->channelManager
7482
)->respond();
@@ -113,6 +121,18 @@ public function onError(ConnectionInterface $connection, Exception $exception)
113121
}
114122
}
115123

124+
/**
125+
* Check if the connection can be made for the
126+
* current server instance.
127+
*
128+
* @param \Ratchet\ConnectionInterface $connection
129+
* @return bool
130+
*/
131+
protected function connectionCanBeMade(ConnectionInterface $connection): bool
132+
{
133+
return $this->channelManager->acceptsNewConnections();
134+
}
135+
116136
/**
117137
* Verify the app key validity.
118138
*

tests/Commands/StartServerTest.php

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,43 @@ class StartServerTest extends TestCase
88
{
99
public function test_does_not_fail_if_building_up()
1010
{
11-
$this->artisan('websockets:serve', ['--test' => true, '--debug' => true]);
11+
$this->loop->futureTick(function () {
12+
$this->loop->stop();
13+
});
14+
15+
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6001]);
16+
17+
$this->assertTrue(true);
18+
}
19+
20+
public function test_pcntl_sigint_signal()
21+
{
22+
$this->loop->futureTick(function () {
23+
$this->newActiveConnection(['public-channel']);
24+
$this->newActiveConnection(['public-channel']);
25+
26+
posix_kill(posix_getpid(), SIGINT);
27+
28+
$this->loop->stop();
29+
});
30+
31+
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6002]);
32+
33+
$this->assertTrue(true);
34+
}
35+
36+
public function test_pcntl_sigterm_signal()
37+
{
38+
$this->loop->futureTick(function () {
39+
$this->newActiveConnection(['public-channel']);
40+
$this->newActiveConnection(['public-channel']);
41+
42+
posix_kill(posix_getpid(), SIGTERM);
43+
44+
$this->loop->stop();
45+
});
46+
47+
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6003]);
1248

1349
$this->assertTrue(true);
1450
}

tests/ConnectionTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,22 @@ public function test_capacity_limit()
108108
->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]])
109109
->assertClosed();
110110
}
111+
112+
public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections()
113+
{
114+
$allowedConnection = $this->newActiveConnection(['test-channel']);
115+
116+
$allowedConnection->assertSentEvent('pusher:connection_established')
117+
->assertSentEvent('pusher_internal:subscription_succeeded');
118+
119+
$this->channelManager->declineNewConnections();
120+
121+
$this->assertFalse(
122+
$this->channelManager->acceptsNewConnections()
123+
);
124+
125+
$this->newActiveConnection(['test-channel'])
126+
->assertNothingSent()
127+
->assertClosed();
128+
}
111129
}

tests/Mocks/Connection.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,18 @@ public function assertNotSentEvent(string $name)
9797
return $this;
9898
}
9999

100+
/**
101+
* Assert that no events occured within the connection.
102+
*
103+
* @return $this
104+
*/
105+
public function assertNothingSent()
106+
{
107+
PHPUnit::assertEquals([], $this->sentData);
108+
109+
return $this;
110+
}
111+
100112
/**
101113
* Assert the connection is closed.
102114
*

tests/PresenceChannelTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace BeyondCode\LaravelWebSockets\Test;
44

55
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
6+
use Ratchet\ConnectionInterface;
67

78
class PresenceChannelTest extends TestCase
89
{
@@ -185,4 +186,22 @@ public function test_statistics_get_collected_for_presenece_channels()
185186
], $statistic->toArray());
186187
});
187188
}
189+
190+
public function test_local_connections_for_private_channels()
191+
{
192+
$this->newPresenceConnection('presence-channel', ['user_id' => 1]);
193+
$this->newPresenceConnection('presence-channel-2', ['user_id' => 2]);
194+
195+
$this->channelManager
196+
->getLocalConnections()
197+
->then(function ($connections) {
198+
$this->assertCount(2, $connections);
199+
200+
foreach ($connections as $connection) {
201+
$this->assertInstanceOf(
202+
ConnectionInterface::class, $connection
203+
);
204+
}
205+
});
206+
}
188207
}

0 commit comments

Comments
 (0)