Skip to content

Commit 50b81bf

Browse files
committed
优化队列消费者创建新协程消费消息
1 parent dfa5504 commit 50b81bf

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

src/Service/BaseQueueConsumer.php

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Yurun\Swoole\CoPool\CoPool;
1616
use Yurun\Swoole\CoPool\Interfaces\ICoTask;
1717
use Yurun\Swoole\CoPool\Interfaces\ITaskParam;
18+
use function Yurun\Swoole\Coroutine\goWait;
1819

1920
/**
2021
* 队列消费基类.
@@ -89,15 +90,17 @@ public function start(?int $co = null)
8990
}
9091
else
9192
{
92-
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
93-
'queue' => $queue,
94-
'message' => $message,
95-
], $this, ConsumerBeforeConsumeParam::class);
96-
$this->consume($message, $queue);
97-
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
98-
'queue' => $queue,
99-
'message' => $message,
100-
], $this, ConsumerAfterConsumeParam::class);
93+
goWait(function () use ($queue, $message) {
94+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
95+
'queue' => $queue,
96+
'message' => $message,
97+
], $this, ConsumerBeforeConsumeParam::class);
98+
$this->consume($message, $queue);
99+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
100+
'queue' => $queue,
101+
'message' => $message,
102+
], $this, ConsumerAfterConsumeParam::class);
103+
});
101104
}
102105
}
103106
catch (\Throwable $th)

0 commit comments

Comments
 (0)