Skip to content

Commit 370b56b

Browse files
committed
优化队列消费者创建新协程消费消息
1 parent 119710f commit 370b56b

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

src/Service/BaseQueueConsumer.php

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Yurun\Swoole\CoPool\CoPool;
1818
use Yurun\Swoole\CoPool\Interfaces\ICoTask;
1919
use Yurun\Swoole\CoPool\Interfaces\ITaskParam;
20+
use function Yurun\Swoole\Coroutine\goWait;
2021

2122
/**
2223
* 队列消费基类.
@@ -81,15 +82,17 @@ public function start(?int $co = null): void
8182
}
8283
else
8384
{
84-
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
85-
'queue' => $queue,
86-
'message' => $message,
87-
], $this, ConsumerBeforeConsumeParam::class);
88-
$this->consume($message, $queue);
89-
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
90-
'queue' => $queue,
91-
'message' => $message,
92-
], $this, ConsumerAfterConsumeParam::class);
85+
goWait(function () use ($queue, $message) {
86+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
87+
'queue' => $queue,
88+
'message' => $message,
89+
], $this, ConsumerBeforeConsumeParam::class);
90+
$this->consume($message, $queue);
91+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
92+
'queue' => $queue,
93+
'message' => $message,
94+
], $this, ConsumerAfterConsumeParam::class);
95+
});
9396
}
9497
}
9598
catch (\Throwable $th)

0 commit comments

Comments
 (0)