Skip to content

Commit dc162e8

Browse files
committed
fixed tests
1 parent 009f8c8 commit dc162e8

File tree

11 files changed

+292
-53
lines changed

11 files changed

+292
-53
lines changed

src/Builders/AbstractBuilder.php

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,28 @@ abstract class AbstractBuilder
2020

2121
public static bool $debug = false;
2222

23-
/** @var string redis配置 */
24-
protected string $connection = 'default';
25-
26-
/** @var float|null 消费间隔 1ms */
27-
protected ?float $timerInterval = 1;
23+
/**
24+
* @var string[]
25+
*/
26+
protected static array $builderList = [
27+
'queue' => QueueBuilder::class,
28+
'co-queue' => CoQueueBuilder::class,
29+
'group' => GroupBuilder::class,
30+
'co-group' => CoGroupBuilder::class,
31+
'adaptive' => AdaptiveBuilder::class
32+
];
2833

2934
/**
3035
* @var AbstractBuilder[]
3136
*/
3237
private static array $_builders = [];
3338

39+
/** @var string redis配置 */
40+
protected string $connection = 'default';
41+
42+
/** @var float|null 消费间隔 1ms */
43+
protected ?float $timerInterval = 1;
44+
3445
/**
3546
* @var int|null
3647
*/
@@ -81,6 +92,17 @@ public static function instance(): AbstractBuilder
8192
return self::$_builders[$class];
8293
}
8394

95+
/**
96+
* 通过mode获取builder类名
97+
*
98+
* @param string $mode
99+
* @return string|null
100+
*/
101+
public static function getBuilderClass(string $mode): ?string
102+
{
103+
return static::$builderList[$mode] ?? null;
104+
}
105+
84106
/**
85107
* @param string $class
86108
* @return void

src/Builders/CoGroupBuilder.php

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Workbunny\WebmanRqueue\Builders;
4+
5+
use Illuminate\Redis\Connections\Connection;
6+
use Psr\Log\LoggerInterface;
7+
use RedisException;
8+
use support\Log;
9+
use Workbunny\WebmanRqueue\Builders\Traits\MessageQueueMethod;
10+
use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException;
11+
use Workerman\Timer;
12+
use Workerman\Worker;
13+
use function Workbunny\WebmanRqueue\config;
14+
15+
abstract class CoGroupBuilder extends GroupBuilder
16+
{
17+
18+
/** @inheritDoc */
19+
public function onWorkerStart(Worker $worker): void
20+
{
21+
if ($this->getConnection()) {
22+
// del timer
23+
self::$_delTimer = Timer::add($this->getTimerInterval(), function() use ($worker) {
24+
// auto del
25+
$this->del();
26+
});
27+
// check pending
28+
if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) {
29+
$this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) {
30+
// 超时消息自动ack并requeue,消息自动移除
31+
$this->claim($worker, $pendingTimeout);
32+
}));
33+
}
34+
while (1) {
35+
try {
36+
// consume
37+
$this->consume($worker, false);
38+
} catch (WebmanRqueueException $exception) {
39+
// 错误日志
40+
Log::channel('plugin.workbunny.webman-rqueue.warning')?->warning('Consume exception. ', [
41+
'message' => $exception->getMessage(), 'code' => $exception->getCode(),
42+
'file' => $exception->getFile() . ':' . $exception->getLine(),
43+
'trace' => $exception->getTrace()
44+
]);
45+
// 兼容旧版
46+
$this->getLogger()?->warning('Consume exception. ', [
47+
'message' => $exception->getMessage(), 'code' => $exception->getCode()
48+
]);
49+
} finally {
50+
// 协程随机出让 5 - 10 ms
51+
$coInterval = $this->configs['co_interval'] ?? [];
52+
Timer::sleep(($coInterval ? rand($coInterval[0], $coInterval[1]) : rand(5, 10)) / 1000);
53+
}
54+
}
55+
}
56+
}
57+
58+
/** @inheritDoc */
59+
public static function classContent(string $namespace, string $className, bool $isDelay): string
60+
{
61+
$isDelay = $isDelay ? 'true' : 'false';
62+
$name = self::getName("$namespace\\$className");
63+
return <<<doc
64+
<?php declare(strict_types=1);
65+
66+
namespace $namespace;
67+
68+
use Workbunny\WebmanRqueue\Headers;
69+
use Workbunny\WebmanRqueue\Builders\CoGroupBuilder;
70+
use Illuminate\Redis\Connections\Connection;
71+
72+
class $className extends CoGroupBuilder
73+
{
74+
75+
/** @see QueueBuilder::\$configs */
76+
protected array \$configs = [
77+
// 默认由类名自动生成
78+
'queues' => [
79+
'$name'
80+
],
81+
// 默认由类名自动生成
82+
'group' => '$name',
83+
// 是否延迟
84+
'delayed' => $isDelay,
85+
// QOS
86+
'prefetch_count' => 0,
87+
// Queue size
88+
'queue_size' => 0,
89+
// 消息pending超时,毫秒
90+
'pending_timeout' => 0,
91+
// 协程随机出让时间间隔,毫秒
92+
'co_interval' => [5, 10]
93+
];
94+
95+
/** @var float|null 消费长轮询时长/消费间隔 100ms */
96+
protected ?float \$timerInterval = 100.0;
97+
98+
/** @var string redis配置 */
99+
protected string \$connection = 'default';
100+
101+
/** @inheritDoc */
102+
public function handler(string \$id, array \$value, Connection \$connection): bool
103+
{
104+
\$header = new Headers(\$value['_header']);
105+
\$body = \$value['_body'];
106+
// TODO 请重写消费逻辑
107+
echo "请重写 $className::handler\\n";
108+
return true;
109+
}
110+
}
111+
doc;
112+
}
113+
}

src/Builders/CoQueueBuilder.php

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Workbunny\WebmanRqueue\Builders;
4+
5+
use Illuminate\Redis\Connections\Connection;
6+
use Psr\Log\LoggerInterface;
7+
use RedisException;
8+
use support\Log;
9+
use Workbunny\WebmanRqueue\Builders\Traits\MessageQueueMethod;
10+
use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException;
11+
use Workerman\Timer;
12+
use Workerman\Worker;
13+
14+
abstract class CoQueueBuilder extends QueueBuilder
15+
{
16+
17+
/** @inheritDoc */
18+
public function onWorkerStart(Worker $worker): void
19+
{
20+
// 初始化temp库
21+
$this->tempInit();
22+
if ($this->getConnection()) {
23+
// requeue timer
24+
$this->tempRequeueInit();
25+
// check pending
26+
if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) {
27+
$this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) {
28+
// 超时消息自动ack并requeue,消息自动移除
29+
$this->claim($worker, $pendingTimeout);
30+
}));
31+
}
32+
33+
while (1) {
34+
try {
35+
// consume
36+
$this->consume($worker);
37+
} catch (WebmanRqueueException $exception) {
38+
Log::channel('plugin.workbunny.webman-rqueue.warning')?->warning('Consume exception. ', [
39+
'message' => $exception->getMessage(), 'code' => $exception->getCode(),
40+
'file' => $exception->getFile() . ':' . $exception->getLine(),
41+
'trace' => $exception->getTrace()
42+
]);
43+
// 兼容旧版
44+
$this->getLogger()?->warning('Consume exception. ', [
45+
'message' => $exception->getMessage(), 'code' => $exception->getCode()
46+
]);
47+
} finally {
48+
// 协程随机出让 5 - 10 ms
49+
$coInterval = $this->configs['co_interval'] ?? [];
50+
Timer::sleep(($coInterval ? rand($coInterval[0], $coInterval[1]) : rand(5, 10)) / 1000);
51+
}
52+
}
53+
}
54+
}
55+
56+
/** @inheritDoc */
57+
public static function classContent(string $namespace, string $className, bool $isDelay): string
58+
{
59+
$isDelay = $isDelay ? 'true' : 'false';
60+
$name = self::getName("$namespace\\$className");
61+
return <<<doc
62+
<?php declare(strict_types=1);
63+
64+
namespace $namespace;
65+
66+
use Workbunny\WebmanRqueue\Headers;
67+
use Workbunny\WebmanRqueue\Builders\CoQueueBuilder;
68+
use Illuminate\Redis\Connections\Connection;
69+
70+
class $className extends CoQueueBuilder
71+
{
72+
73+
/** @see QueueBuilder::\$configs */
74+
protected array \$configs = [
75+
// 默认由类名自动生成
76+
'queues' => [
77+
'$name'
78+
],
79+
// 默认由类名自动生成
80+
'group' => '$name',
81+
// 是否延迟
82+
'delayed' => $isDelay,
83+
// QOS
84+
'prefetch_count' => 0,
85+
// Queue size
86+
'queue_size' => 0,
87+
// 消息pending超时,毫秒
88+
'pending_timeout' => 0,
89+
// 协程随机出让 5 - 10 ms
90+
'co_interval' => [5, 10]
91+
];
92+
93+
/** @var float|null 消费长轮询时长/消费间隔 100ms */
94+
protected ?float \$timerInterval = 100.0;
95+
96+
/** @var string redis配置 */
97+
protected string \$connection = 'default';
98+
99+
/** @inheritDoc */
100+
public function handler(string \$id, array \$value, Connection \$connection): bool
101+
{
102+
\$header = new Headers(\$value['_header']);
103+
\$body = \$value['_body'];
104+
// TODO 请重写消费逻辑
105+
echo "请重写 $className::handler\\n";
106+
return true;
107+
}
108+
}
109+
doc;
110+
}
111+
}

src/Builders/GroupBuilder.php

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ abstract class GroupBuilder extends AbstractBuilder
2929
protected array $configs = [];
3030

3131
/** @var int|null 自动移除定时器 */
32-
private static ?int $_delTimer = null;
32+
protected static ?int $_delTimer = null;
3333

3434
public function __construct(?LoggerInterface $logger = null)
3535
{
@@ -46,21 +46,21 @@ public function __construct(?LoggerInterface $logger = null)
4646
/** @inheritDoc */
4747
public function onWorkerStart(Worker $worker): void
4848
{
49-
if($this->getConnection()){
49+
if ($this->getConnection()) {
50+
// del timer
51+
self::$_delTimer = Timer::add($this->getTimerInterval(), function() use ($worker) {
52+
// auto del
53+
$this->del();
54+
});
55+
// check pending
56+
if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) {
57+
$this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) {
58+
// 超时消息自动ack并requeue,消息自动移除
59+
$this->claim($worker, $pendingTimeout);
60+
}));
61+
}
5062
// consume timer
5163
self::setMainTimer(Timer::add($this->getTimerInterval() / 1000, function () use ($worker) {
52-
// del timer
53-
self::$_delTimer = Timer::add($this->getTimerInterval(), function() use ($worker) {
54-
// auto del
55-
$this->del();
56-
});
57-
// check pending
58-
if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) {
59-
$this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) {
60-
// 超时消息自动ack并requeue,消息自动移除
61-
$this->claim($worker, $pendingTimeout);
62-
}));
63-
}
6464
try {
6565
// consume
6666
$this->consume($worker, false);
@@ -75,7 +75,6 @@ public function onWorkerStart(Worker $worker): void
7575
$this->getLogger()?->warning('Consume exception. ', [
7676
'message' => $exception->getMessage(), 'code' => $exception->getCode()
7777
]);
78-
7978
}
8079
}));
8180
}
@@ -91,7 +90,7 @@ public function onWorkerStop(Worker $worker): void
9190
echo $e->getMessage() . PHP_EOL;
9291
}
9392
}
94-
if(self::getMainTimer()) {
93+
if (self::getMainTimer()) {
9594
Timer::del(self::getMainTimer());
9695
}
9796
if ($this->getPendingTimer()) {

src/Builders/Traits/MessageQueueMethod.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public function consume(Worker $worker, bool $del = true): bool
404404
$queueStreams[$queueName] = '>';
405405
}
406406
// group read
407-
if($res = $client->xReadGroup(
407+
if ($res = $client->xReadGroup(
408408
$groupName, $consumerName, $queueStreams, $builderConfig->getPrefetchCount(),
409409
$this->getTimerInterval() >= 1.0 ? (int)$this->getTimerInterval() : null
410410
)) {

src/Commands/AbstractCommand.php

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Symfony\Component\Console\Command\Command;
66
use Symfony\Component\Console\Output\OutputInterface;
7+
use Workbunny\WebmanRqueue\Builders\AbstractBuilder;
78
use Workbunny\WebmanRqueue\Builders\AdaptiveBuilder;
89
use Workbunny\WebmanRqueue\Builders\GroupBuilder;
910
use Workbunny\WebmanRqueue\Builders\QueueBuilder;
@@ -14,22 +15,13 @@ abstract class AbstractCommand extends Command
1415
public static string $baseProcessPath = 'process/workbunny/rqueue';
1516
public static string $baseNamespace = 'process\workbunny\rqueue';
1617

17-
/**
18-
* @var string[]
19-
*/
20-
protected array $builderList = [
21-
'queue' => QueueBuilder::class,
22-
'group' => GroupBuilder::class,
23-
'adaptive' => AdaptiveBuilder::class
24-
];
25-
2618
/**
2719
* @param string $name
2820
* @return string|null
2921
*/
3022
protected function getBuilder(string $name): ?string
3123
{
32-
return $this->builderList[$name] ?? null;
24+
return AbstractBuilder::getBuilderClass($name);
3325
}
3426

3527
protected function info(OutputInterface $output, string $message): void

tests/GroupBuilderTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Redis;
66
use RedisException;
7+
use Webman\Config;
78
use Workbunny\Tests\Builders\TestGroupBuilder;
89
use Workbunny\Tests\Builders\TestGroupBuilderDelayed;
910
use Workbunny\WebmanRqueue\Builders\AbstractBuilder;
@@ -18,7 +19,7 @@ final class GroupBuilderTest extends BaseTestCase
1819
protected function setUp(): void
1920
{
2021
AbstractBuilder::$debug = true;
21-
require_once __DIR__ . '/functions.php';
22+
Config::load(__DIR__ . '/config', ['route']);
2223
$this->_builder = new TestGroupBuilder();
2324
$this->_builderDelayed = new TestGroupBuilderDelayed();
2425
parent::setUp();

0 commit comments

Comments
 (0)