Skip to content

Commit 7579d01

Browse files
committed
Add an option specify number of tasks to process for AsyncTaskCommand. Docker container for local tests.
1 parent 06c4ed1 commit 7579d01

File tree

10 files changed

+272
-112
lines changed

10 files changed

+272
-112
lines changed

AsyncComponent.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,12 @@ public function purge($queueName)
6565
{
6666
return $this->transport->purge($queueName);
6767
}
68+
69+
/**
70+
* @return Transport
71+
*/
72+
public function getTransport()
73+
{
74+
return $this->transport;
75+
}
6876
}

Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM php:cli
2+
3+
RUN apt-get update && apt-get install -y redis-server \
4+
rabbitmq-server librabbitmq-dev
5+
RUN php -r "readfile('https://getcomposer.org/installer');" | php
6+
RUN docker-php-ext-install pcntl shmop mbstring
7+
RUN pecl install amqp && echo "extension=amqp.so" >> /usr/local/etc/php/conf.d/amqp.ini
8+
9+
ADD . /var/code
10+
11+
CMD /var/code/tests/docker-test.sh

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,7 @@ For more code examples look into tests:
134134
~~~
135135
vendor/bin/codecept run
136136
~~~
137+
Or in Docker:
138+
~~~
139+
./test.sh
140+
~~~

commands/AsyncWorkerCommand.php

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,77 @@
33
namespace bazilio\async\commands;
44

55
use bazilio\async\models\AsyncTask;
6+
use Spork\ProcessManager;
7+
68

79
class AsyncWorkerCommand extends \yii\console\Controller
810
{
11+
static $state = 1;
12+
protected $child;
13+
914
/**
1015
* @param string|null $queueName
16+
* @param int|null $count tasks to process
1117
*/
12-
public function actionExecute($queueName = null)
18+
public function actionExecute($queueName = null, $count = null)
1319
{
20+
$this->handleSignal();
1421
/** @var AsyncTask $task */
1522
while ($task = \Yii::$app->async->receiveTask($queueName ?: AsyncTask::$queueName)) {
16-
$task->execute();
17-
\Yii::$app->async->acknowledgeTask($task);
23+
$this->checkSignal();
24+
25+
$this->processTask($task);
26+
27+
if (($count !== null && !--$count) || $this->checkSignal()) {
28+
break;
29+
}
1830
}
1931
}
2032

2133
/**
2234
* @param string|null $queueName
35+
* @param int|null $count tasks to process
2336
*/
24-
public function actionDaemon($queueName = null)
37+
public function actionDaemon($queueName = null, $count = null)
2538
{
39+
$this->handleSignal();
40+
2641
/** @var AsyncTask $task */
2742
while ($task = \Yii::$app->async->receiveTask($queueName ?: AsyncTask::$queueName, true)) {
28-
$task->execute();
29-
\Yii::$app->async->acknowledgeTask($task);
43+
$this->checkSignal();
44+
45+
$task::$queueName = $queueName ?: AsyncTask::$queueName;
46+
$this->processTask($task);
47+
48+
if (($count !== null && !--$count) || $this->checkSignal()) {
49+
break;
50+
}
51+
}
52+
}
53+
54+
protected function processTask(AsyncTask $task)
55+
{
56+
$task->execute();
57+
\Yii::$app->async->acknowledgeTask($task);
58+
59+
}
60+
61+
private function handleSignal()
62+
{
63+
pcntl_signal(
64+
SIGTERM,
65+
function ($signo) {
66+
echo "This signal is called. [$signo] \n";
67+
static::$state = -1;
68+
}
69+
);
70+
}
71+
72+
private function checkSignal()
73+
{
74+
pcntl_signal_dispatch();
75+
if (AsyncWorkerCommand::$state == -1) {
76+
return true;
3077
}
3178
}
3279
}

0 commit comments

Comments
 (0)