Skip to content

Commit a110dac

Browse files
committed
修复消息队列消费进程问题
1 parent dbcfe9a commit a110dac

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed
Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
<?php
2+
23
namespace Imi\Queue\Process;
34

5+
use Imi\Aop\Annotation\Inject;
46
use Imi\App;
5-
use Imi\Log\Log;
7+
use Imi\Process\Annotation\Process;
8+
use Imi\Process\BaseProcess;
69
use Imi\Util\Imi;
710
use Swoole\Coroutine;
8-
use Imi\Process\BaseProcess;
9-
use Imi\Aop\Annotation\Inject;
10-
use Imi\Process\Annotation\Process;
11+
use Swoole\Event;
1112

1213
/**
13-
* 队列消费进程
14-
*
14+
* 队列消费进程.
15+
*
1516
* @Process(name="QueueConsumer", unique=true, co=false)
1617
*/
1718
class QueueConsumerProcess extends BaseProcess
@@ -24,7 +25,7 @@ class QueueConsumerProcess extends BaseProcess
2425
protected $imiQueue;
2526

2627
/**
27-
* 消费者列表
28+
* 消费者列表.
2829
*
2930
* @var \Imi\Queue\Service\BaseQueueConsumer[]
3031
*/
@@ -34,53 +35,53 @@ public function run(\Swoole\Process $process)
3435
{
3536
$imiQueue = $this->imiQueue;
3637
$processGroups = [];
37-
foreach($imiQueue->getList() as $name => $arrayConfig)
38+
foreach ($imiQueue->getList() as $name => $arrayConfig)
3839
{
3940
$config = $imiQueue->getQueueConfig($name);
40-
if(!$config->getAutoConsumer())
41+
if (!$config->getAutoConsumer())
4142
{
4243
continue;
4344
}
4445
$group = $config->getProcessGroup();
4546
$process = $config->getProcess();
46-
if(!isset($processGroups[$group]) || $process > $processGroups[$group]['process'])
47+
if (!isset($processGroups[$group]) || $process > $processGroups[$group]['process'])
4748
{
4849
$processGroups[$group]['process'] = $process;
4950
}
5051
$processGroups[$group]['configs'][] = $config;
5152
}
52-
foreach($processGroups as $group => $options)
53+
foreach ($processGroups as $group => $options)
5354
{
5455
$processPool = new \Imi\Process\Pool($options['process']);
5556
$configs = $options['configs'];
56-
$processPool->on('WorkerStart', function(\Imi\Process\Pool\WorkerEventParam $e) use($group, $configs){
57-
go(function() use($group, $configs){
57+
$processPool->on('WorkerStart', function (\Imi\Process\Pool\WorkerEventParam $e) use ($group, $configs) {
58+
go(function () use ($group, $configs) {
5859
\Swoole\Runtime::enableCoroutine(true);
5960
App::initWorker();
6061
Imi::setProcessName('process', [
61-
'processName' => 'QueueConsumer-' . $group,
62+
'processName' => 'QueueConsumer-' . $group,
6263
]);
6364
/** @var \Imi\Queue\Model\QueueConfig[] $configs */
64-
foreach($configs as $config)
65+
foreach ($configs as $config)
6566
{
66-
Coroutine::create(function() use($config){
67-
/** @var \Imi\Queue\Service\BaseQueueConsumer $queueConsumer */
67+
Coroutine::create(function () use ($config) {
68+
/* @var \Imi\Queue\Service\BaseQueueConsumer $queueConsumer */
6869
$this->consumers[] = $queueConsumer = App::getBean($config->getConsumer(), $config->getName());
6970
$queueConsumer->start();
7071
});
7172
}
7273
});
7374
});
7475
// 工作进程退出事件-可选
75-
$processPool->on('WorkerExit', function(\Imi\Process\Pool\WorkerEventParam $e){
76+
$processPool->on('WorkerExit', function (\Imi\Process\Pool\WorkerEventParam $e) {
7677
// 做一些释放操作
77-
foreach($this->consumers as $consumer)
78+
foreach ($this->consumers as $consumer)
7879
{
7980
$consumer->stop();
8081
}
8182
});
8283
$processPool->start();
8384
}
85+
Event::wait();
8486
}
85-
8687
}

0 commit comments

Comments
 (0)