Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit bf049a3

Browse files
committed
Added easy extendable methods to change hash names for Redis
1 parent 139608f commit bf049a3

File tree

2 files changed

+112
-39
lines changed

2 files changed

+112
-39
lines changed

src/ChannelManagers/RedisChannelManager.php

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class RedisChannelManager extends LocalChannelManager
5656
*/
5757
public function __construct(LoopInterface $loop, $factoryClass = null)
5858
{
59-
parent::construct($loop, $factoryClass);
59+
parent::__construct($loop, $factoryClass);
6060

6161
$this->loop = $loop;
6262

@@ -87,7 +87,7 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
8787
public function getGlobalChannels($appId): PromiseInterface
8888
{
8989
return $this->publishClient->smembers(
90-
$this->getRedisKey($appId, null, ['channels'])
90+
$this->getChannelsRedisHash($appId)
9191
);
9292
}
9393

@@ -214,7 +214,7 @@ public function unsubscribeFromApp($appId): PromiseInterface
214214
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
215215
{
216216
return $this->publishClient
217-
->hget($this->getRedisKey($appId, $channelName, ['stats']), 'connections')
217+
->hget($this->getStatsRedisHash($appId, $channelName), 'connections')
218218
->then(function ($count) {
219219
return is_null($count) ? 0 : (int) $count;
220220
});
@@ -237,7 +237,7 @@ public function broadcastAcrossServers($appId, ?string $socketId, string $channe
237237
$payload->serverId = $serverId ?: $this->getServerId();
238238

239239
return $this->publishClient
240-
->publish($this->getRedisKey($appId, $channel), json_encode($payload))
240+
->publish($this->getRedisTopicName($appId, $channel), json_encode($payload))
241241
->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
242242
return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
243243
});
@@ -293,7 +293,7 @@ public function userLeftPresenceChannel(ConnectionInterface $connection, stdClas
293293
public function getChannelMembers($appId, string $channel): PromiseInterface
294294
{
295295
return $this->publishClient
296-
->hgetall($this->getRedisKey($appId, $channel, ['users']))
296+
->hgetall($this->getUsersRedisHash($appId, $channel))
297297
->then(function ($list) {
298298
return collect(Helpers::redisListToArray($list))->map(function ($user) {
299299
return json_decode($user);
@@ -311,7 +311,7 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
311311
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
312312
{
313313
return $this->publishClient->hget(
314-
$this->getRedisKey($connection->app->id, $channel, ['users']), $connection->socketId
314+
$this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId
315315
);
316316
}
317317

@@ -328,7 +328,7 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
328328

329329
foreach ($channelNames as $channel) {
330330
$this->publishClient->hlen(
331-
$this->getRedisKey($appId, $channel, ['users'])
331+
$this->getUsersRedisHash($appId, $channel)
332332
);
333333
}
334334

@@ -349,7 +349,7 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
349349
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
350350
{
351351
return $this->publishClient->smembers(
352-
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
352+
$this->getUserSocketsRedisHash($appId, $channelName, $userId)
353353
);
354354
}
355355

@@ -498,7 +498,7 @@ public function getRedisClient()
498498
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
499499
{
500500
return $this->publishClient->hincrby(
501-
$this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment
501+
$this->getStatsRedisHash($appId, $channel), 'connections', $increment
502502
);
503503
}
504504

@@ -527,7 +527,7 @@ public function addConnectionToSet(ConnectionInterface $connection, $moment = nu
527527
$moment = $moment ? Carbon::parse($moment) : Carbon::now();
528528

529529
return $this->publishClient->zadd(
530-
$this->getRedisKey(null, null, ['sockets']),
530+
$this->getSocketsRedisHash(),
531531
$moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
532532
);
533533
}
@@ -541,7 +541,7 @@ public function addConnectionToSet(ConnectionInterface $connection, $moment = nu
541541
public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
542542
{
543543
return $this->publishClient->zrem(
544-
$this->getRedisKey(null, null, ['sockets']),
544+
$this->getSocketsRedisHash(),
545545
"{$connection->app->id}:{$connection->socketId}"
546546
);
547547
}
@@ -563,7 +563,7 @@ public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $stric
563563
}
564564

565565
return $this->publishClient
566-
->zrangebyscore($this->getRedisKey(null, null, ['sockets']), $start, $stop)
566+
->zrangebyscore($this->getSocketsRedisHash(), $start, $stop)
567567
->then(function ($list) {
568568
return collect($list)->mapWithKeys(function ($appWithSocket) {
569569
[$appId, $socketId] = explode(':', $appWithSocket);
@@ -583,7 +583,7 @@ public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $stric
583583
public function addChannelToSet($appId, string $channel): PromiseInterface
584584
{
585585
return $this->publishClient->sadd(
586-
$this->getRedisKey($appId, null, ['channels']), $channel
586+
$this->getChannelsRedisHash($appId), $channel
587587
);
588588
}
589589

@@ -597,7 +597,7 @@ public function addChannelToSet($appId, string $channel): PromiseInterface
597597
public function removeChannelFromSet($appId, string $channel): PromiseInterface
598598
{
599599
return $this->publishClient->srem(
600-
$this->getRedisKey($appId, null, ['channels']), $channel
600+
$this->getChannelsRedisHash($appId), $channel
601601
);
602602
}
603603

@@ -613,7 +613,7 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface
613613
public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
614614
{
615615
return $this->publishClient->hset(
616-
$this->getRedisKey($appId, $channel, ['users']), $key, $data
616+
$this->getUsersRedisHash($appId, $channel), $key, $data
617617
);
618618
}
619619

@@ -628,7 +628,7 @@ public function storeUserData($appId, string $channel = null, string $key, $data
628628
public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
629629
{
630630
return $this->publishClient->hdel(
631-
$this->getRedisKey($appId, $channel, ['users']), $key
631+
$this->getUsersRedisHash($appId, $channel), $key
632632
);
633633
}
634634

@@ -641,7 +641,7 @@ public function removeUserData($appId, string $channel = null, string $key): Pro
641641
*/
642642
public function subscribeToTopic($appId, string $channel = null): PromiseInterface
643643
{
644-
$topic = $this->getRedisKey($appId, $channel);
644+
$topic = $this->getRedisTopicName($appId, $channel);
645645

646646
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
647647
'serverId' => $this->getServerId(),
@@ -660,7 +660,7 @@ public function subscribeToTopic($appId, string $channel = null): PromiseInterfa
660660
*/
661661
public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
662662
{
663-
$topic = $this->getRedisKey($appId, $channel);
663+
$topic = $this->getRedisTopicName($appId, $channel);
664664

665665
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
666666
'serverId' => $this->getServerId(),
@@ -682,7 +682,7 @@ public function unsubscribeFromTopic($appId, string $channel = null): PromiseInt
682682
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
683683
{
684684
return $this->publishClient->sadd(
685-
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
685+
$this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
686686
);
687687
}
688688

@@ -698,7 +698,7 @@ protected function addUserSocket($appId, string $channel, stdClass $user, string
698698
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
699699
{
700700
return $this->publishClient->srem(
701-
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
701+
$this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
702702
);
703703
}
704704

@@ -729,6 +729,79 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
729729
return $hash;
730730
}
731731

732+
/**
733+
* Get the statistics Redis hash.
734+
*
735+
* @param string|int $appId
736+
* @param string|null $channel
737+
* @return string
738+
*/
739+
public function getStatsRedisHash($appId, string $channel = null): string
740+
{
741+
return $this->getRedisKey($appId, $channel, ['stats']);
742+
}
743+
744+
/**
745+
* Get the sockets Redis hash used to store all sockets ids.
746+
*
747+
* @return string
748+
*/
749+
public function getSocketsRedisHash(): string
750+
{
751+
return $this->getRedisKey(null, null, ['sockets']);
752+
}
753+
754+
/**
755+
* Get the channels Redis hash for a specific app id, used
756+
* to store existing channels.
757+
*
758+
* @param string|int $appId
759+
* @return string
760+
*/
761+
public function getChannelsRedisHash($appId): string
762+
{
763+
return $this->getRedisKey($appId, null, ['channels']);
764+
}
765+
766+
/**
767+
* Get the Redis hash for storing presence channels users.
768+
*
769+
* @param string|int $appId
770+
* @param string|null $channel
771+
* @return string
772+
*/
773+
public function getUsersRedisHash($appId, string $channel = null): string
774+
{
775+
return $this->getRedisKey($appId, $channel, ['users']);
776+
}
777+
778+
/**
779+
* Get the Redis hash for storing socket ids
780+
* for a specific presence channels user.
781+
*
782+
* @param string|int $appId
783+
* @param string|null $channel
784+
* @param string|int|null $userId
785+
* @return string
786+
*/
787+
public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string
788+
{
789+
return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']);
790+
}
791+
792+
/**
793+
* Get the Redis topic name for PubSub
794+
* used to transfer info between servers.
795+
*
796+
* @param string|int $appId
797+
* @param string|null $channel
798+
* @return string
799+
*/
800+
public function getRedisTopicName($appId, string $channel = null): string
801+
{
802+
return $this->getRedisKey($appId, $channel);
803+
}
804+
732805
/**
733806
* Get a new RedisLock instance to avoid race conditions.
734807
*

0 commit comments

Comments
 (0)