Skip to content

Commit c6705a2

Browse files
committed
Add psubscribe and punsubscribe support
1 parent b8813b7 commit c6705a2

File tree

3 files changed

+70
-0
lines changed

3 files changed

+70
-0
lines changed

src/CommandInvoker.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,33 @@ public function receive(Connection $connection)
113113
continue;
114114
}
115115

116+
if ($type == 'psubscribe' && count($buffer) == 6) {
117+
$this->resultChannel->push($buffer);
118+
$buffer = null;
119+
continue;
120+
}
121+
122+
if ($type == 'punsubscribe' && count($buffer) == 6) {
123+
$this->resultChannel->push($buffer);
124+
$buffer = null;
125+
continue;
126+
}
127+
128+
if ($type == 'pmessage' && count($buffer) == 9) {
129+
$message = new Message();
130+
$message->pattern = $buffer[4];
131+
$message->channel = $buffer[6];
132+
$message->payload = $buffer[8];
133+
$timerID = Timer::after(30 * 1000, function () use ($message) {
134+
static::error(sprintf('Message channel (%s) is 30 seconds full, disconnected', $message->channel));
135+
$this->interrupt();
136+
});
137+
$this->messageChannel->push($message);
138+
Timer::clear($timerID);
139+
$buffer = null;
140+
continue;
141+
}
142+
116143
if ($type == 'pong' && count($buffer) == 5) {
117144
$this->pingChannel->push('pong');
118145
$buffer = null;

src/Message.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
class Message
1010
{
1111

12+
/**
13+
* @var string
14+
*/
15+
public $pattern;
16+
1217
/**
1318
* @var string
1419
*/

src/Subscriber.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,44 @@ public function unsubscribe(string ...$channels)
125125
}
126126
}
127127

128+
/**
129+
* PSubscribe.
130+
* @throws \Swoole\Exception
131+
* @throws Throwable
132+
*/
133+
public function psubscribe(string ...$channels)
134+
{
135+
$channels = array_map(function ($channel) {
136+
return $this->prefix . $channel;
137+
}, $channels);
138+
$result = $this->commandInvoker->invoke(['psubscribe', ...$channels], count($channels));
139+
foreach ($result as $value) {
140+
if ($value === false) {
141+
$this->commandInvoker->interrupt();
142+
throw new SubscribeException('Psubscribe failed');
143+
}
144+
}
145+
}
146+
147+
/**
148+
* PUnsubscribe.
149+
* @throws \Swoole\Exception
150+
* @throws Throwable
151+
*/
152+
public function punsubscribe(string ...$channels)
153+
{
154+
$channels = array_map(function ($channel) {
155+
return $this->prefix . $channel;
156+
}, $channels);
157+
$result = $this->commandInvoker->invoke(['punsubscribe', ...$channels], count($channels));
158+
foreach ($result as $value) {
159+
if ($value === false) {
160+
$this->commandInvoker->interrupt();
161+
throw new UnsubscribeException('Punsubscribe failed');
162+
}
163+
}
164+
}
165+
128166
/**
129167
* Channel
130168
* @return \Swoole\Coroutine\Channel

0 commit comments

Comments
 (0)