Skip to content

Commit 566cd03

Browse files
committed
新增消费者弹出、消费前后置事件
1 parent 7c9e2f6 commit 566cd03

9 files changed

+177
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Listener;
3+
4+
use Imi\Queue\Event\Param\ConsumerAfterConsumeParam;
5+
6+
/**
7+
* 消费者消费消息后置事件
8+
*/
9+
interface IConsumerAfterConsumeListener
10+
{
11+
/**
12+
* 事件处理方法
13+
* @param ConsumerAfterConsumeParam $e
14+
* @return void
15+
*/
16+
public function handle(ConsumerAfterConsumeParam $e);
17+
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Listener;
3+
4+
use Imi\Queue\Event\Param\ConsumerAfterPopParam;
5+
6+
/**
7+
* 消费者弹出消息后置事件
8+
*/
9+
interface IConsumerAfterPopListener
10+
{
11+
/**
12+
* 事件处理方法
13+
* @param ConsumerAfterPopParam $e
14+
* @return void
15+
*/
16+
public function handle(ConsumerAfterPopParam $e);
17+
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Listener;
3+
4+
use Imi\Queue\Event\Param\ConsumerBeforeConsumeParam;
5+
6+
/**
7+
* 消费者消费消息前置事件
8+
*/
9+
interface IConsumerBeforeConsumeListener
10+
{
11+
/**
12+
* 事件处理方法
13+
* @param ConsumerBeforeConsumeParam $e
14+
* @return void
15+
*/
16+
public function handle(ConsumerBeforeConsumeParam $e);
17+
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Listener;
3+
4+
use Imi\Queue\Event\Param\ConsumerBeforePopParam;
5+
6+
/**
7+
* 消费者弹出消息前置事件
8+
*/
9+
interface IConsumerBeforePopListener
10+
{
11+
/**
12+
* 事件处理方法
13+
* @param ConsumerBeforePopParam $e
14+
* @return void
15+
*/
16+
public function handle(ConsumerBeforePopParam $e);
17+
18+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
namespace Imi\Queue\Event\Param;
3+
4+
use Imi\Event\EventParam;
5+
6+
/**
7+
* 消费者消费消息后置事件参数
8+
*/
9+
class ConsumerAfterConsumeParam extends EventParam
10+
{
11+
/**
12+
* 队列对象
13+
*
14+
* @var \Imi\Queue\Driver\IQueueDriver
15+
*/
16+
public $queue;
17+
18+
/**
19+
* 消息
20+
*
21+
* @var \Imi\Queue\Contract\IMessage
22+
*/
23+
public $message;
24+
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
namespace Imi\Queue\Event\Param;
3+
4+
use Imi\Event\EventParam;
5+
6+
/**
7+
* 消费者弹出消息后置事件参数
8+
*/
9+
class ConsumerAfterPopParam extends EventParam
10+
{
11+
/**
12+
* 队列对象
13+
*
14+
* @var \Imi\Queue\Driver\IQueueDriver
15+
*/
16+
public $queue;
17+
18+
/**
19+
* 消息
20+
*
21+
* @var \Imi\Queue\Contract\IMessage
22+
*/
23+
public $message;
24+
25+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Param;
3+
4+
use Imi\Event\EventParam;
5+
6+
/**
7+
* 消费者消费消息前置事件参数
8+
*/
9+
class ConsumerBeforeConsumeParam extends EventParam
10+
{
11+
/**
12+
* 队列对象
13+
*
14+
* @var \Imi\Queue\Driver\IQueueDriver
15+
*/
16+
public $queue;
17+
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
namespace Imi\Queue\Event\Param;
3+
4+
use Imi\Event\EventParam;
5+
6+
/**
7+
* 消费者弹出消息前置事件参数
8+
*/
9+
class ConsumerBeforePopParam extends EventParam
10+
{
11+
/**
12+
* 队列对象
13+
*
14+
* @var \Imi\Queue\Driver\IQueueDriver
15+
*/
16+
public $queue;
17+
18+
}

src/Service/BaseQueueConsumer.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
<?php
22
namespace Imi\Queue\Service;
33

4+
use Imi\Event\Event;
45
use Swoole\Coroutine;
56
use Imi\Aop\Annotation\Inject;
67
use Yurun\Swoole\CoPool\CoPool;
78
use Imi\Queue\Contract\IMessage;
89
use Imi\Queue\Driver\IQueueDriver;
910
use Yurun\Swoole\CoPool\Interfaces\ICoTask;
1011
use Yurun\Swoole\CoPool\Interfaces\ITaskParam;
12+
use Imi\Queue\Event\Param\ConsumerAfterPopParam;
13+
use Imi\Queue\Event\Param\ConsumerBeforePopParam;
14+
use Imi\Queue\Event\Param\ConsumerAfterConsumeParam;
15+
use Imi\Queue\Event\Param\ConsumerBeforeConsumeParam;
1116

1217
/**
1318
* 队列消费基类
@@ -64,14 +69,28 @@ public function start(?int $co = null)
6469
$task = function() use($config){
6570
$queue = $this->imiQueue->getQueue($this->name);
6671
do {
72+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_POP', [
73+
'queue' => $queue,
74+
], $this, ConsumerBeforePopParam::class);
6775
$message = $queue->pop();
76+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_POP', [
77+
'queue' => $queue,
78+
'message' => $message,
79+
], $this, ConsumerAfterPopParam::class);
6880
if(null === $message)
6981
{
7082
Coroutine::sleep($config->getTimespan());
7183
}
7284
else
7385
{
86+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
87+
'queue' => $queue,
88+
], $this, ConsumerBeforeConsumeParam::class);
7489
$this->consume($message, $queue);
90+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
91+
'queue' => $queue,
92+
'message' => $message,
93+
], $this, ConsumerAfterConsumeParam::class);
7594
}
7695
} while($this->working);
7796
};

0 commit comments

Comments
 (0)