Skip to content

Commit 331bea9

Browse files
committed
修复消费进程使用同步客户端的问题
1 parent 7b15ae9 commit 331bea9

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

src/Process/QueueConsumerProcess.php

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,22 @@ public function run(\Swoole\Process $process)
5454
$processPool = new \Imi\Process\Pool($options['process']);
5555
$configs = $options['configs'];
5656
$processPool->on('WorkerStart', function(\Imi\Process\Pool\WorkerEventParam $e) use($group, $configs){
57-
Imi::setProcessName('process', [
58-
'processName' => 'QueueConsumer-' . $group,
59-
]);
60-
/** @var \Imi\Queue\Model\QueueConfig[] $configs */
61-
foreach($configs as $config)
62-
{
63-
Coroutine::create(function() use($config){
64-
/** @var \Imi\Queue\Service\BaseQueueConsumer $queueConsumer */
65-
$this->consumers[] = $queueConsumer = App::getBean($config->getConsumer(), $config->getName());
66-
$queueConsumer->start();
67-
});
68-
}
57+
go(function() use($group, $configs){
58+
\Swoole\Runtime::enableCoroutine(true);
59+
App::initWorker();
60+
Imi::setProcessName('process', [
61+
'processName' => 'QueueConsumer-' . $group,
62+
]);
63+
/** @var \Imi\Queue\Model\QueueConfig[] $configs */
64+
foreach($configs as $config)
65+
{
66+
Coroutine::create(function() use($config){
67+
/** @var \Imi\Queue\Service\BaseQueueConsumer $queueConsumer */
68+
$this->consumers[] = $queueConsumer = App::getBean($config->getConsumer(), $config->getName());
69+
$queueConsumer->start();
70+
});
71+
}
72+
});
6973
});
7074
// 工作进程退出事件-可选
7175
$processPool->on('WorkerExit', function(\Imi\Process\Pool\WorkerEventParam $e){

0 commit comments

Comments
 (0)