Skip to content

Commit c9556c8

Browse files
committed
Merge branch 'main' of github.com:hypervel/components
2 parents 750df03 + 7c4fff7 commit c9556c8

File tree

5 files changed

+261
-1
lines changed

5 files changed

+261
-1
lines changed

src/queue/publish/queue.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
| used by your application. An example configuration is provided for
3939
| each backend supported by Hypervel. You're also free to add more.
4040
|
41-
| Drivers: "sync", "defer". "database", "beanstalkd", "sqs", "redis", "null"
41+
| Drivers: "sync", "coroutine", "defer", "database", "beanstalkd", "sqs", "redis", "null"
4242
|
4343
*/
4444

@@ -47,6 +47,10 @@
4747
'driver' => 'sync',
4848
],
4949

50+
'coroutine' => [
51+
'driver' => 'coroutine',
52+
],
53+
5054
'defer' => [
5155
'driver' => 'defer',
5256
],
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Queue\Connectors;
6+
7+
use Hypervel\Queue\Contracts\Queue;
8+
use Hypervel\Queue\CoroutineQueue;
9+
10+
class CoroutineConnector implements ConnectorInterface
11+
{
12+
/**
13+
* Establish a queue connection.
14+
*/
15+
public function connect(array $config): Queue
16+
{
17+
return new CoroutineQueue($config['after_commit'] ?? false);
18+
}
19+
}

src/queue/src/CoroutineQueue.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Queue;
6+
7+
use Hyperf\Engine\Coroutine;
8+
use Hypervel\Database\TransactionManager;
9+
use Throwable;
10+
11+
class CoroutineQueue extends SyncQueue
12+
{
13+
/**
14+
* The exception callback that should be used for handling uncaught exceptions in defer.
15+
*
16+
* @var null|callable
17+
*/
18+
protected $exceptionCallback;
19+
20+
/**
21+
* Push a new job onto the queue.
22+
*/
23+
public function push(object|string $job, mixed $data = '', ?string $queue = null): mixed
24+
{
25+
if (
26+
$this->shouldDispatchAfterCommit($job)
27+
&& $this->container->has(TransactionManager::class)
28+
) {
29+
return $this->container->get(TransactionManager::class)
30+
->addCallback(
31+
fn () => $this->executeJob($job, $data, $queue)
32+
);
33+
}
34+
35+
$this->executeJob($job, $data, $queue);
36+
37+
return null;
38+
}
39+
40+
/**
41+
* Set the exception callback for the defer queue.
42+
*/
43+
public function setExceptionCallback(?callable $callback): static
44+
{
45+
$this->exceptionCallback = $callback;
46+
47+
return $this;
48+
}
49+
50+
/**
51+
* Defer a new job onto the queue.
52+
*/
53+
protected function executeJob(object|string $job, mixed $data = '', ?string $queue = null): int
54+
{
55+
Coroutine::create(function () use ($job, $data, $queue) {
56+
try {
57+
parent::executeJob($job, $data, $queue);
58+
} catch (Throwable $e) {
59+
if ($this->exceptionCallback) {
60+
($this->exceptionCallback)($e);
61+
}
62+
}
63+
});
64+
65+
return 0;
66+
}
67+
}

src/queue/src/QueueManager.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Hypervel\ObjectPool\Traits\HasPoolProxy;
1212
use Hypervel\Queue\Connectors\BeanstalkdConnector;
1313
use Hypervel\Queue\Connectors\ConnectorInterface;
14+
use Hypervel\Queue\Connectors\CoroutineConnector;
1415
use Hypervel\Queue\Connectors\DatabaseConnector;
1516
use Hypervel\Queue\Connectors\DeferConnector;
1617
use Hypervel\Queue\Connectors\NullConnector;
@@ -284,6 +285,7 @@ protected function registerConnectors(): void
284285
$this->registerBeanstalkdConnector();
285286
$this->registerSqsConnector();
286287
$this->registerDeferConnector();
288+
$this->registerCoroutineConnector();
287289
}
288290

289291
/**
@@ -359,4 +361,14 @@ protected function registerDeferConnector(): void
359361
return new DeferConnector();
360362
});
361363
}
364+
365+
/**
366+
* Register the Coroutine queue connector.
367+
*/
368+
protected function registerCoroutineConnector(): void
369+
{
370+
$this->addConnector('coroutine', function () {
371+
return new CoroutineConnector();
372+
});
373+
}
362374
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Tests\Queue;
6+
7+
use Exception;
8+
use Hyperf\Di\Container;
9+
use Hyperf\Di\Definition\DefinitionSource;
10+
use Hypervel\Database\TransactionManager;
11+
use Hypervel\Queue\Contracts\QueueableEntity;
12+
use Hypervel\Queue\Contracts\ShouldQueueAfterCommit;
13+
use Hypervel\Queue\CoroutineQueue;
14+
use Hypervel\Queue\InteractsWithQueue;
15+
use Hypervel\Queue\Jobs\SyncJob;
16+
use Mockery as m;
17+
use PHPUnit\Framework\TestCase;
18+
use Psr\EventDispatcher\EventDispatcherInterface;
19+
20+
use function Hyperf\Coroutine\run;
21+
22+
/**
23+
* @internal
24+
* @coversNothing
25+
*/
26+
class QueueCoroutineQueueTest extends TestCase
27+
{
28+
public function testPushShouldCoroutine()
29+
{
30+
unset($_SERVER['__coroutine.test']);
31+
32+
$coroutine = new CoroutineQueue();
33+
$coroutine->setConnectionName('coroutine');
34+
$container = $this->getContainer();
35+
$coroutine->setContainer($container);
36+
$coroutine->setConnectionName('coroutine');
37+
38+
run(fn () => $coroutine->push(CoroutineQueueTestHandler::class, ['foo' => 'bar']));
39+
40+
$this->assertInstanceOf(SyncJob::class, $_SERVER['__coroutine.test'][0]);
41+
$this->assertEquals(['foo' => 'bar'], $_SERVER['__coroutine.test'][1]);
42+
}
43+
44+
public function testFailedJobGetsHandledWhenAnExceptionIsThrown()
45+
{
46+
unset($_SERVER['__coroutine.failed']);
47+
48+
$result = null;
49+
50+
$coroutine = new CoroutineQueue();
51+
$coroutine->setExceptionCallback(function ($exception) use (&$result) {
52+
$result = $exception;
53+
});
54+
$coroutine->setConnectionName('coroutine');
55+
$container = $this->getContainer();
56+
$events = m::mock(EventDispatcherInterface::class);
57+
$events->shouldReceive('dispatch')->times(3);
58+
$container->set(EventDispatcherInterface::class, $events);
59+
$coroutine->setContainer($container);
60+
61+
run(function () use ($coroutine) {
62+
$coroutine->push(FailingCoroutineQueueTestHandler::class, ['foo' => 'bar']);
63+
});
64+
65+
$this->assertInstanceOf(Exception::class, $result);
66+
$this->assertTrue($_SERVER['__coroutine.failed']);
67+
}
68+
69+
public function testItAddsATransactionCallbackForAfterCommitJobs()
70+
{
71+
$coroutine = new CoroutineQueue();
72+
$container = $this->getContainer();
73+
$transactionManager = m::mock(TransactionManager::class);
74+
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
75+
$container->set(TransactionManager::class, $transactionManager);
76+
77+
$coroutine->setContainer($container);
78+
run(fn () => $coroutine->push(new CoroutineQueueAfterCommitJob()));
79+
}
80+
81+
public function testItAddsATransactionCallbackForInterfaceBasedAfterCommitJobs()
82+
{
83+
$coroutine = new CoroutineQueue();
84+
$container = $this->getContainer();
85+
$transactionManager = m::mock(TransactionManager::class);
86+
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
87+
$container->set(TransactionManager::class, $transactionManager);
88+
89+
$coroutine->setContainer($container);
90+
run(fn () => $coroutine->push(new CoroutineQueueAfterCommitInterfaceJob()));
91+
}
92+
93+
protected function getContainer(): Container
94+
{
95+
return new Container(
96+
new DefinitionSource([])
97+
);
98+
}
99+
}
100+
101+
class CoroutineQueueTestEntity implements QueueableEntity
102+
{
103+
public function getQueueableId(): mixed
104+
{
105+
return 1;
106+
}
107+
108+
public function getQueueableConnection(): ?string
109+
{
110+
return null;
111+
}
112+
113+
public function getQueueableRelations(): array
114+
{
115+
return [];
116+
}
117+
}
118+
119+
class CoroutineQueueTestHandler
120+
{
121+
public function fire($job, $data)
122+
{
123+
$_SERVER['__coroutine.test'] = func_get_args();
124+
}
125+
}
126+
127+
class FailingCoroutineQueueTestHandler
128+
{
129+
public function fire($job, $data)
130+
{
131+
throw new Exception();
132+
}
133+
134+
public function failed()
135+
{
136+
$_SERVER['__coroutine.failed'] = true;
137+
}
138+
}
139+
140+
class CoroutineQueueAfterCommitJob
141+
{
142+
use InteractsWithQueue;
143+
144+
public $afterCommit = true;
145+
146+
public function handle()
147+
{
148+
}
149+
}
150+
151+
class CoroutineQueueAfterCommitInterfaceJob implements ShouldQueueAfterCommit
152+
{
153+
use InteractsWithQueue;
154+
155+
public function handle()
156+
{
157+
}
158+
}

0 commit comments

Comments
 (0)