Skip to content

Commit 6a854a4

Browse files
committed
imi-queue 消费进程兼容 workerman
1 parent 370b56b commit 6a854a4

12 files changed

+423
-214
lines changed

example/Listener/WorkerStartListener.php

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
use Imi\Event\EventParam;
1010
use Imi\Event\IEventListener;
1111
use Imi\Queue\Model\Message;
12-
use Swoole\Coroutine;
12+
use Imi\Timer\Timer;
1313

1414
/**
1515
* @Listener("IMI.MAIN_SERVER.WORKER.START.APP")
16+
* @Listener("IMI.WORKERMAN.SERVER.WORKER_START")
1617
*/
1718
class WorkerStartListener implements IEventListener
1819
{
@@ -29,24 +30,16 @@ class WorkerStartListener implements IEventListener
2930
public function handle(EventParam $e): void
3031
{
3132
// 每 1 秒投递进 test1 队列
32-
Coroutine::create(function () {
33-
while (true)
34-
{
35-
$message = new Message();
36-
$message->setMessage((string) time());
37-
$this->imiQueue->getQueue('test1')->push($message);
38-
sleep(1);
39-
}
33+
Timer::tick(1000, function () {
34+
$message = new Message();
35+
$message->setMessage((string) time());
36+
$this->imiQueue->getQueue('test1')->push($message);
4037
});
4138
// 每 3 秒投递进 test2 队列
42-
Coroutine::create(function () {
43-
while (true)
44-
{
45-
$message = new Message();
46-
$message->setMessage((string) time());
47-
$this->imiQueue->getQueue('test2')->push($message);
48-
sleep(3);
49-
}
39+
Timer::tick(3000, function () {
40+
$message = new Message();
41+
$message->setMessage((string) time());
42+
$this->imiQueue->getQueue('test2')->push($message);
5043
});
5144
}
5245
}
File renamed without changes.

example/bin/imi-workerman

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/usr/bin/env php
2+
<?php
3+
require_once dirname(__DIR__, 2) . '/vendor/' . 'autoload.php';
4+
require_once dirname(__DIR__, 5) . '/vendor/' . 'autoload.php';
5+
6+
\Imi\App::setDebug(true);
7+
8+
require dirname(\Imi\Util\Imi::getNamespacePath('Imi\Workerman')) . '/bin/imi-workerman';

example/config/config.php

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
declare(strict_types=1);
44

5-
use Imi\Swoole\Server\Type;
5+
use Imi\Util\Imi;
66

77
return [
88
// 项目根命名空间
@@ -21,23 +21,36 @@
2121

2222
// 组件命名空间
2323
'components' => [
24-
'Queue' => 'Imi\Queue',
25-
'Swoole' => 'Imi\Swoole',
24+
'Queue' => 'Imi\Queue',
25+
'Swoole' => 'Imi\Swoole',
26+
'Workerman' => 'Imi\Workerman',
2627
],
2728

2829
// 主服务器配置
2930
'mainServer' => [
3031
'namespace' => 'QueueApp\HttpServer',
31-
'type' => Type::HTTP,
32+
'type' => \Imi\Swoole\Server\Type::HTTP,
3233
'host' => '127.0.0.1',
3334
'port' => 8080,
3435
'configs' => [
3536
'worker_num' => 1,
3637
],
3738
],
3839

40+
// Workerman 服务器配置
41+
'workermanServer' => [
42+
'http' => [
43+
'namespace' => 'QueueApp\HttpServer',
44+
'type' => \Imi\Workerman\Server\Type::HTTP,
45+
'host' => '127.0.0.1',
46+
'port' => 8080,
47+
'configs' => [
48+
],
49+
],
50+
],
51+
3952
// 连接池配置
40-
'pools' => [
53+
'pools' => Imi::checkAppType('swoole') ? [
4154
'redis' => [
4255
'pool' => [
4356
// 协程池类名
@@ -76,7 +89,7 @@
7689
],
7790
// uri资源配置,以分号;分隔多个,参数使用query参数格式,特殊字符需要转码
7891
],
79-
],
92+
] : [],
8093

8194
// 数据库配置
8295
'db' => [
@@ -88,6 +101,18 @@
88101
'redis' => [
89102
// 数默认连接池名
90103
'defaultPool' => 'redis',
104+
'connections' => [
105+
'redis' => [
106+
'host' => imiGetEnv('REDIS_SERVER_HOST', '127.0.0.1'),
107+
'port' => 6379,
108+
// 是否自动序列化变量
109+
'serialize' => false,
110+
// 密码
111+
'password' => null,
112+
// 第几个库
113+
'db' => 0,
114+
],
115+
],
91116
],
92117

93118
// 锁
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Imi\Queue\Partial;
6+
7+
use Imi\App;
8+
use Imi\Bean\Annotation\Partial;
9+
use Imi\Event\Event;
10+
use Swoole\Coroutine;
11+
use Yurun\Swoole\CoPool\CoPool;
12+
use Yurun\Swoole\CoPool\Interfaces\ICoTask;
13+
use Yurun\Swoole\CoPool\Interfaces\ITaskParam;
14+
use function Yurun\Swoole\Coroutine\goWait;
15+
16+
if (\Imi\Util\Imi::checkAppType('swoole'))
17+
{
18+
/**
19+
* @Partial(\Imi\Queue\Service\BaseQueueConsumer::class)
20+
*
21+
* @property \Imi\Queue\Service\QueueService $imiQueue
22+
*/
23+
trait SwooleBaseQueueConsumerPartial
24+
{
25+
/**
26+
* 协程工作池.
27+
*/
28+
private ?CoPool $coPool;
29+
30+
/**
31+
* 开始消费循环.
32+
*/
33+
public function start(?int $co = null): void
34+
{
35+
$this->working = true;
36+
$config = $this->imiQueue->getQueueConfig($this->name);
37+
if (null === $co)
38+
{
39+
$co = $config->getCo();
40+
}
41+
$task = function () use ($config) {
42+
$queue = $this->imiQueue->getQueue($this->name);
43+
do
44+
{
45+
try
46+
{
47+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_POP', [
48+
'queue' => $queue,
49+
], $this, ConsumerBeforePopParam::class);
50+
$message = $queue->pop();
51+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_POP', [
52+
'queue' => $queue,
53+
'message' => $message,
54+
], $this, ConsumerAfterPopParam::class);
55+
if (null === $message)
56+
{
57+
Coroutine::sleep($config->getTimespan());
58+
}
59+
else
60+
{
61+
goWait(function () use ($queue, $message) {
62+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
63+
'queue' => $queue,
64+
'message' => $message,
65+
], $this, ConsumerBeforeConsumeParam::class);
66+
$this->consume($message, $queue);
67+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
68+
'queue' => $queue,
69+
'message' => $message,
70+
], $this, ConsumerAfterConsumeParam::class);
71+
});
72+
}
73+
}
74+
catch (\Throwable $th)
75+
{
76+
App::getBean('ErrorLog')->onException($th);
77+
}
78+
} while ($this->working);
79+
};
80+
if ($co > 0)
81+
{
82+
// @phpstan-ignore-next-line
83+
$this->coPool = $pool = new CoPool($co, $co, new class() implements ICoTask {
84+
/**
85+
* 执行任务
86+
*
87+
* @return mixed
88+
*/
89+
public function run(ITaskParam $param)
90+
{
91+
($param->getData()['task'])();
92+
}
93+
});
94+
$pool->run();
95+
for ($i = 0; $i < $co; ++$i)
96+
{
97+
$pool->addTaskAsync([
98+
'task' => $task,
99+
]);
100+
}
101+
$pool->wait();
102+
}
103+
else
104+
{
105+
($task)();
106+
}
107+
}
108+
109+
/**
110+
* 停止消费.
111+
*/
112+
public function stop(): void
113+
{
114+
$this->working = false;
115+
if ($this->coPool)
116+
{
117+
$this->coPool->stop();
118+
}
119+
}
120+
}
121+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Imi\Queue\Partial;
6+
7+
use Imi\App;
8+
use Imi\Bean\Annotation\Partial;
9+
use Imi\Event\Event;
10+
11+
if (\Imi\Util\Imi::checkAppType('workerman'))
12+
{
13+
/**
14+
* @Partial(Imi\Queue\Service\BaseQueueConsumer::class)
15+
*
16+
* @property \Imi\Queue\Service\QueueService $imiQueue
17+
*/
18+
trait WorkermanBaseQueueConsumerPartial
19+
{
20+
/**
21+
* 开始消费循环.
22+
*/
23+
public function start(?int $co = null): void
24+
{
25+
$this->working = true;
26+
$config = $this->imiQueue->getQueueConfig($this->name);
27+
if (null === $co)
28+
{
29+
$co = $config->getCo();
30+
}
31+
$queue = $this->imiQueue->getQueue($this->name);
32+
do
33+
{
34+
try
35+
{
36+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_POP', [
37+
'queue' => $queue,
38+
], $this, ConsumerBeforePopParam::class);
39+
$message = $queue->pop();
40+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_POP', [
41+
'queue' => $queue,
42+
'message' => $message,
43+
], $this, ConsumerAfterPopParam::class);
44+
if (null === $message)
45+
{
46+
usleep((int) ($config->getTimespan() * 1000000));
47+
}
48+
else
49+
{
50+
Event::trigger('IMI.QUEUE.CONSUMER.BEFORE_CONSUME', [
51+
'queue' => $queue,
52+
'message' => $message,
53+
], $this, ConsumerBeforeConsumeParam::class);
54+
$this->consume($message, $queue);
55+
Event::trigger('IMI.QUEUE.CONSUMER.AFTER_CONSUME', [
56+
'queue' => $queue,
57+
'message' => $message,
58+
], $this, ConsumerAfterConsumeParam::class);
59+
}
60+
}
61+
catch (\Throwable $th)
62+
{
63+
App::getBean('ErrorLog')->onException($th);
64+
}
65+
} while ($this->working);
66+
}
67+
68+
/**
69+
* 停止消费.
70+
*/
71+
public function stop(): void
72+
{
73+
$this->working = false;
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)