Skip to content

Commit 7c99096

Browse files
author
Елькин Сергей
committed
wrong task message fix
1 parent 438bb43 commit 7c99096

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

transports/AsyncRedisTransport.php

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class AsyncRedisTransport
1616
*/
1717
protected $connection;
1818

19-
function __construct(Array $connectionConfig)
19+
public function __construct(Array $connectionConfig)
2020
{
2121
$this->connection = \Yii::$app->{$connectionConfig['connection']};
2222
}
@@ -26,6 +26,11 @@ public static function getQueueKey($queueName, $progress = false)
2626
return "queue:$queueName" . ($progress ? ':progress' : null);
2727
}
2828

29+
public static function getChannelKey($queueName)
30+
{
31+
return "channel:$queueName";
32+
}
33+
2934
/**
3035
* @param string $text
3136
* @param string $queueName
@@ -34,7 +39,7 @@ public static function getQueueKey($queueName, $progress = false)
3439
public function send($text, $queueName)
3540
{
3641
$return = $this->connection->executeCommand('LPUSH', [self::getQueueKey($queueName), $text]);
37-
$this->connection->executeCommand('PUBLISH', [self::getQueueKey($queueName), 'new']);
42+
$this->connection->executeCommand('PUBLISH', [self::getChannelKey($queueName), 'new']);
3843
return $return;
3944
}
4045

@@ -70,7 +75,7 @@ public function waitAndReceive($queueName)
7075
$task = $this->receive($queueName);
7176
if (!$task) {
7277
// subscribe to queue events
73-
$this->connection->executeCommand('SUBSCRIBE', [self::getQueueKey($queueName)]);
78+
$this->connection->executeCommand('SUBSCRIBE', [self::getChannelKey($queueName)]);
7479
while (!$task) {
7580
// wait for message
7681
$response = $this->connection->parseResponse('');
@@ -80,12 +85,12 @@ public function waitAndReceive($queueName)
8085
}
8186

8287
// unsubscribe to release redis connection context
83-
$this->connection->executeCommand('UNSUBSCRIBE', [self::getQueueKey($queueName)]);
88+
$this->connection->executeCommand('UNSUBSCRIBE', [self::getChannelKey($queueName)]);
8489
$task = $this->receive($queueName);
8590

8691
// if someone else got our task - subscribe again and wait
8792
if (!$task) {
88-
$this->connection->executeCommand('SUBSCRIBE', [self::getQueueKey($queueName)]);
93+
$this->connection->executeCommand('SUBSCRIBE', [self::getChannelKey($queueName)]);
8994
}
9095
}
9196
}
@@ -121,7 +126,7 @@ public function purge($queueName)
121126
* @return bool
122127
* @throws Exception
123128
*/
124-
function disconnect()
129+
public function disconnect()
125130
{
126131
if ($this->connection) {
127132
$this->connection->close();

0 commit comments

Comments
 (0)