Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 00e8f3e

Browse files
committed
Add channel storage to LocalDriver to simplify PresenceChannel logic
1 parent 373b993 commit 00e8f3e

File tree

4 files changed

+95
-91
lines changed

4 files changed

+95
-91
lines changed

src/HttpApi/Controllers/FetchChannelsController.php

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,24 @@ public function __invoke(Request $request)
3232
});
3333
}
3434

35-
if (config('websockets.replication.enabled') === true) {
36-
// We want to get the channel user count all in one shot when
37-
// using a replication backend rather than doing individual queries.
38-
// To do so, we first collect the list of channel names.
39-
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
40-
return $channel->getChannelName();
41-
})->toArray();
35+
// We want to get the channel user count all in one shot when
36+
// using a replication backend rather than doing individual queries.
37+
// To do so, we first collect the list of channel names.
38+
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
39+
return $channel->getChannelName();
40+
})->toArray();
4241

43-
/** @var PromiseInterface $memberCounts */
44-
// We ask the replication backend to get us the member count per channel
45-
$memberCounts = app(ReplicationInterface::class)
46-
->channelMemberCounts($request->appId, $channelNames);
42+
/** @var PromiseInterface $memberCounts */
43+
// We ask the replication backend to get us the member count per channel
44+
$memberCounts = app(ReplicationInterface::class)
45+
->channelMemberCounts($request->appId, $channelNames);
4746

48-
// We return a promise since the backend runs async. We get $counts back
49-
// as a key-value array of channel names and their member count.
50-
return $memberCounts->then(function (array $counts) use ($channels, $attributes) {
51-
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) {
52-
return $counts[$channel->getChannelName()];
53-
});
47+
// We return a promise since the backend runs async. We get $counts back
48+
// as a key-value array of channel names and their member count.
49+
return $memberCounts->then(function (array $counts) use ($channels, $attributes) {
50+
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) {
51+
return $counts[$channel->getChannelName()];
5452
});
55-
}
56-
57-
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) {
58-
return $channel->getUserCount();
5953
});
6054
}
6155

src/PubSub/Drivers/LocalClient.php

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010

1111
class LocalClient implements ReplicationInterface
1212
{
13+
/**
14+
* Mapping of the presence JSON data for users in each channel
15+
*
16+
* @var string[][]
17+
*/
18+
protected $channelData = [];
19+
1320
/**
1421
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
1522
*
@@ -31,6 +38,7 @@ public function boot(LoopInterface $loop) : ReplicationInterface
3138
*/
3239
public function publish(string $appId, string $channel, stdClass $payload) : bool
3340
{
41+
// Nothing to do, nobody to publish to
3442
return true;
3543
}
3644

@@ -69,6 +77,7 @@ public function unsubscribe(string $appId, string $channel) : bool
6977
*/
7078
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
7179
{
80+
$this->channelData["$appId:$channel"][$socketId] = $data;
7281
}
7382

7483
/**
@@ -81,6 +90,10 @@ public function joinChannel(string $appId, string $channel, string $socketId, st
8190
*/
8291
public function leaveChannel(string $appId, string $channel, string $socketId)
8392
{
93+
unset($this->channelData["$appId:$channel"][$socketId]);
94+
if (empty($this->channelData["$appId:$channel"])) {
95+
unset($this->channelData["$appId:$channel"]);
96+
}
8497
}
8598

8699
/**
@@ -92,7 +105,14 @@ public function leaveChannel(string $appId, string $channel, string $socketId)
92105
*/
93106
public function channelMembers(string $appId, string $channel) : PromiseInterface
94107
{
95-
return new FulfilledPromise(null);
108+
$members = $this->channelData["$appId:$channel"] ?? [];
109+
110+
// The data is expected as objects, so we need to JSON decode
111+
$members = array_map(function ($user) {
112+
return json_decode($user);
113+
}, $members);
114+
115+
return new FulfilledPromise($members);
96116
}
97117

98118
/**
@@ -104,6 +124,15 @@ public function channelMembers(string $appId, string $channel) : PromiseInterfac
104124
*/
105125
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
106126
{
107-
return new FulfilledPromise(null);
127+
$results = [];
128+
129+
// Count the number of users per channel
130+
foreach ($channelNames as $channel) {
131+
$results[$channel] = isset($this->channelData["$appId:$channel"])
132+
? count($this->channelData["$appId:$channel"])
133+
: 0;
134+
}
135+
136+
return new FulfilledPromise($results);
108137
}
109138
}

src/PubSub/Drivers/RedisClient.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ protected function onMessage(string $redisChannel, string $payload)
9797
// expect the channel name to not include the app ID.
9898
$payload->channel = Str::after($redisChannel, "$appId:");
9999

100-
/* @var $channelManager ChannelManager */
100+
/* @var ChannelManager $channelManager */
101101
$channelManager = app(ChannelManager::class);
102102

103103
// Load the Channel instance, if any

src/WebSockets/Channels/PresenceChannel.php

Lines changed: 48 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@
1010

1111
class PresenceChannel extends Channel
1212
{
13+
/**
14+
* Data for the users connected to this channel
15+
*
16+
* Note: If replication is enabled, this will only contain entries
17+
* for the users directly connected to this server instance. Requests
18+
* for data for all users in the channel should be routed through
19+
* ReplicationInterface.
20+
*
21+
* @var string[]
22+
*/
1323
protected $users = [];
1424

1525
/**
@@ -18,21 +28,9 @@ class PresenceChannel extends Channel
1828
*/
1929
public function getUsers(string $appId)
2030
{
21-
if (config('websockets.replication.enabled') === true) {
22-
// Get the members list from the replication backend
23-
return app(ReplicationInterface::class)
24-
->channelMembers($appId, $this->channelName);
25-
}
26-
27-
return $this->users;
28-
}
29-
30-
/**
31-
* @return array
32-
*/
33-
public function getUserCount()
34-
{
35-
return count($this->users);
31+
// Get the members list from the replication backend
32+
return app(ReplicationInterface::class)
33+
->channelMembers($appId, $this->channelName);
3634
}
3735

3836
/**
@@ -51,36 +49,27 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
5149
$channelData = json_decode($payload->channel_data);
5250
$this->users[$connection->socketId] = $channelData;
5351

54-
if (config('websockets.replication.enabled') === true) {
55-
// Add the connection as a member of the channel
56-
app(ReplicationInterface::class)
57-
->joinChannel(
58-
$connection->app->id,
59-
$this->channelName,
60-
$connection->socketId,
61-
json_encode($channelData)
62-
);
63-
64-
// We need to pull the channel data from the replication backend,
65-
// otherwise we won't be sending the full details of the channel
66-
app(ReplicationInterface::class)
67-
->channelMembers($connection->app->id, $this->channelName)
68-
->then(function ($users) use ($connection) {
69-
// Send the success event
70-
$connection->send(json_encode([
71-
'event' => 'pusher_internal:subscription_succeeded',
72-
'channel' => $this->channelName,
73-
'data' => json_encode($this->getChannelData($users)),
74-
]));
75-
});
76-
} else {
77-
// Send the success event
78-
$connection->send(json_encode([
79-
'event' => 'pusher_internal:subscription_succeeded',
80-
'channel' => $this->channelName,
81-
'data' => json_encode($this->getChannelData($this->users)),
82-
]));
83-
}
52+
// Add the connection as a member of the channel
53+
app(ReplicationInterface::class)
54+
->joinChannel(
55+
$connection->app->id,
56+
$this->channelName,
57+
$connection->socketId,
58+
json_encode($channelData)
59+
);
60+
61+
// We need to pull the channel data from the replication backend,
62+
// otherwise we won't be sending the full details of the channel
63+
app(ReplicationInterface::class)
64+
->channelMembers($connection->app->id, $this->channelName)
65+
->then(function ($users) use ($connection) {
66+
// Send the success event
67+
$connection->send(json_encode([
68+
'event' => 'pusher_internal:subscription_succeeded',
69+
'channel' => $this->channelName,
70+
'data' => json_encode($this->getChannelData($users)),
71+
]));
72+
});
8473

8574
$this->broadcastToOthers($connection, (object) [
8675
'event' => 'pusher_internal:member_added',
@@ -97,15 +86,13 @@ public function unsubscribe(ConnectionInterface $connection)
9786
return;
9887
}
9988

100-
if (config('websockets.replication.enabled') === true) {
101-
// Remove the connection as a member of the channel
102-
app(ReplicationInterface::class)
103-
->leaveChannel(
104-
$connection->app->id,
105-
$this->channelName,
106-
$connection->socketId
107-
);
108-
}
89+
// Remove the connection as a member of the channel
90+
app(ReplicationInterface::class)
91+
->leaveChannel(
92+
$connection->app->id,
93+
$this->channelName,
94+
$connection->socketId
95+
);
10996

11097
$this->broadcastToOthers($connection, (object) [
11198
'event' => 'pusher_internal:member_removed',
@@ -124,19 +111,13 @@ public function unsubscribe(ConnectionInterface $connection)
124111
*/
125112
public function toArray(string $appId = null)
126113
{
127-
if (config('websockets.replication.enabled') === true) {
128-
return app(ReplicationInterface::class)
129-
->channelMembers($appId, $this->channelName)
130-
->then(function ($users) {
131-
return array_merge(parent::toArray(), [
132-
'user_count' => count($users),
133-
]);
134-
});
135-
}
136-
137-
return array_merge(parent::toArray(), [
138-
'user_count' => count($this->users),
139-
]);
114+
return app(ReplicationInterface::class)
115+
->channelMembers($appId, $this->channelName)
116+
->then(function ($users) {
117+
return array_merge(parent::toArray(), [
118+
'user_count' => count($users),
119+
]);
120+
});
140121
}
141122

142123
protected function getChannelData(array $users): array

0 commit comments

Comments
 (0)