Skip to content

Commit 62893e3

Browse files
committed
Merge branch 'master' of github.com:webdevvie/pheanstalk-task-queue-bundle
2 parents b30df41 + 973d5d7 commit 62893e3

File tree

9 files changed

+203
-25
lines changed

9 files changed

+203
-25
lines changed

Command/AbstractWorker.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ abstract class AbstractWorker extends ContainerAwareCommand
9494
*/
9595
protected function addDefaultConfiguration()
9696
{
97-
$this->addOption('use-tube', null, InputOption::VALUE_OPTIONAL, 'What tube to send to');
97+
$this->addOption('use-tube', null, InputOption::VALUE_OPTIONAL, 'What tube to work with');
9898
}
9999

100100
/**
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
namespace Webdevvie\PheanstalkTaskQueueBundle\Command;
4+
5+
use Webdevvie\PheanstalkTaskQueueBundle\Command\AbstractWorker;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Input\InputOption;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
use Webdevvie\PheanstalkTaskQueueBundle\Entity\Task;
10+
11+
/**
12+
* Class StopWorkerTenderCommand
13+
* Sends a signal to the worker tender to stop
14+
*
15+
* @package Webdevvie\PheanstalkTaskQueueBundle\Command
16+
* @author John Bakker <[email protected]>
17+
*/
18+
class StopWorkerTenderCommand extends AbstractWorker
19+
{
20+
/**
21+
* {@inheritDoc}
22+
*
23+
* @return void
24+
*/
25+
protected function configure()
26+
{
27+
$this->setName('taskqueue:stop-worker-tender')
28+
->setDescription('Signals the worker tender to stop and waits for it to stop');
29+
$this->addDefaultConfiguration();
30+
}
31+
32+
/**
33+
* {@inheritDoc}
34+
*
35+
* @param InputInterface $input
36+
* @param OutputInterface $output
37+
* @return void
38+
* @throws \InvalidArgumentException
39+
*/
40+
protected function execute(InputInterface $input, OutputInterface $output)
41+
{
42+
$this->initialiseWorker($input, $output);
43+
$processes = $this->findWorkerTenderProcessesForTube($this->tube);
44+
if (count($processes) == 0) {
45+
$output->writeln("<info>Found no processes</info>");
46+
return;
47+
}
48+
$output->writeln('<info>Found ' . count($processes) . ' process(es)');
49+
foreach ($processes as $process) {
50+
$output->writeln('<info>sending kill to:</info> ' . $process);
51+
exec("kill $process");
52+
}
53+
$output->write('<info>Waiting on processes to stop</info> ');
54+
while (count($processes) > 0) {
55+
$processes = $this->findWorkerTenderProcessesForTube($this->tube);
56+
$output->write(".");
57+
sleep(1);
58+
59+
}
60+
$output->writeln("Done!");
61+
}
62+
63+
/**
64+
* Returns a list of processes that match the worker-tender
65+
*
66+
* @param string $tube
67+
* @return array
68+
*/
69+
private function findWorkerTenderProcessesForTube($tube)
70+
{
71+
$processes = array();
72+
$command = "ps ax ";
73+
$command .= "|grep php ";
74+
$command .= "|grep -v stop ";
75+
$command .= "|grep worker-tender ";
76+
if ($tube !== '') {
77+
$command .= "|grep " . escapeshellarg('use-tube=' . $tube);
78+
}
79+
$data = exec($command);
80+
$lines = explode("\n", $data);
81+
foreach ($lines as $line) {
82+
$line = trim($line);
83+
$parts = explode(" ", $line);
84+
if ($parts[0] === '') {
85+
continue;
86+
}
87+
$processes[] = intval($parts[0]);
88+
}
89+
90+
return $processes;
91+
92+
}
93+
}

Command/Tender/ChildProcessContainer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public function isCleanedUp()
197197
*/
198198
public function getOutputLinesAvailable()
199199
{
200-
$outputLines = [];
200+
$outputLines = array();
201201
return $outputLines;
202202
}
203203

@@ -301,9 +301,9 @@ private function sendTERMSignal()
301301
*/
302302
public function getReadyForBed()
303303
{
304-
$busyStatusses = [ChildProcessContainer::STATUS_SLEEPY,
304+
$busyStatusses = array(ChildProcessContainer::STATUS_SLEEPY,
305305
ChildProcessContainer::STATUS_BUSY_BUT_SLEEPY
306-
];
306+
);
307307
if (in_array($this->status, $busyStatusses)) {
308308
//ready for bed
309309
return;

Command/WorkerCommand.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ private function runTask(WorkPackage $taskObject)
8787
try {
8888
$command = $this->taskCommandGenerator->generate($taskObject);
8989
} catch (TaskCommandGeneratorException $exception) {
90-
$this->verboseOutput("Invalid command supplied in " . get_class($taskObject));
91-
$this->taskQueueService->markFailed($taskObject);
90+
$logMessage = "Invalid command supplied in " . get_class($taskObject);
91+
$this->verboseOutput($logMessage);
92+
$this->taskQueueService->markFailed($taskObject, $logMessage);
9293
return;
9394
}
9495
$fullCommand = 'exec app/console ' . $command;
@@ -100,6 +101,7 @@ private function runTask(WorkPackage $taskObject)
100101
$hasReceivedSignal = false;
101102
$exitCode = 0;
102103
$exitCodeText = '';
104+
$childOutput = '';
103105
while ($runningCommand) {
104106
$this->handleRunningProcess(
105107
$process,
@@ -111,15 +113,16 @@ private function runTask(WorkPackage $taskObject)
111113
$exitCodeText
112114
);
113115
$incOut = $process->getIncrementalOutput();
116+
$childOutput .= $incOut;
114117
if (strlen($incOut) > 0) {
115118
$this->output->write($incOut);
116119
}
117120
}
118121
if ($failed) {
119122
$this->verboseOutput("FAILED CHILD: " . $exitCode . "::" . $exitCodeText);
120-
$this->taskQueueService->markFailed($taskObject);
123+
$this->taskQueueService->markFailed($taskObject, $childOutput);
121124
} else {
122-
$this->taskQueueService->markDone($taskObject);
125+
$this->taskQueueService->markDone($taskObject, $childOutput);
123126
}
124127
if ($process->isRunning()) {
125128
$process->stop();

Command/WorkerTenderCommand.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* one time per "tube"* You can supply several parameters to this command. Please refer to those parameters
1919
* for more information about customising your worker-tender experience
2020
*
21-
* @author John Bakker <[email protected]
21+
* @author John Bakker <[email protected]>
2222
*/
2323
class WorkerTenderCommand extends AbstractWorker
2424
{
@@ -66,7 +66,7 @@ protected function configure()
6666
'Max time to keep a worker in seconds',
6767
6000000
6868
);
69-
$this->addDefaultConfiguration();
69+
$this->addOption('use-tube', null, InputOption::VALUE_REQUIRED, 'What tube to work with');
7070
}
7171

7272
/**
@@ -235,8 +235,10 @@ public function moreOrLess($total, $busy, $available)
235235
*/
236236
private function findDisposableWorkers()
237237
{
238+
$disposableStatusses = array(ChildProcessContainer::STATUS_READY, ChildProcessContainer::STATUS_ALIVE);
238239
foreach ($this->family as &$child) {
239-
if (in_array($child->status, array(ChildProcessContainer::STATUS_READY, ChildProcessContainer::STATUS_ALIVE))) {
240+
241+
if (in_array($child->status, $disposableStatusses)) {
240242
if ($child->getAge() < 10) {
241243
//less than ten seconds old keep it alive a bit to let its do its job
242244
continue;

DependencyInjection/Configuration.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
/**
99
* This is the class that validates and merges configuration from your app/config files
10-
*
11-
* To learn more see {@link http://symfony.com/doc/current/cookbook/bundles/extension.html#cookbook-bundles-extension-config-class}
1210
*/
1311
class Configuration implements ConfigurationInterface
1412
{
@@ -26,6 +24,8 @@ public function getConfigTreeBuilder()
2624
$rootNode
2725
->children()
2826
->scalarNode('default_tube')->isRequired()->end()
27+
->booleanNode('log_worker_output_on_failure')->defaultTrue()->end()
28+
->booleanNode('log_worker_output_on_success')->defaultFalse()->end()
2929
->end();
3030
return $treeBuilder;
3131
}

Entity/Task.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ class Task
7474
*/
7575
private $data;
7676

77+
/**
78+
* The output supplied by the command
79+
* @var string
80+
*
81+
* @ORM\Column(name="log", type="text", nullable=true)
82+
*/
83+
private $log;
84+
7785
/**
7886
* The tube this task was sent to
7987
* @var string
@@ -171,6 +179,19 @@ public function setStatus($status)
171179
return $this;
172180
}
173181

182+
/**
183+
* Set log
184+
*
185+
* @param string $status
186+
* @return task
187+
*/
188+
public function setLog($log)
189+
{
190+
$this->log = $log;
191+
$this->modified = new \DateTime();
192+
return $this;
193+
}
194+
174195
/**
175196
* Get status
176197
*
@@ -181,7 +202,15 @@ public function getStatus()
181202
return $this->status;
182203
}
183204

184-
205+
/**
206+
* Get log
207+
*
208+
* @return string
209+
*/
210+
public function getLog()
211+
{
212+
return $this->status;
213+
}
185214

186215
/**
187216
* Get created

Service/TaskQueueService.php

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ class TaskQueueService
3737
*/
3838
private $defaultTube;
3939

40+
/**
41+
* Log the worker output on success
42+
*
43+
* @var boolean
44+
*/
45+
private $logWorkerOutputOnSuccess;
46+
47+
/**
48+
* Log the worker output on failure
49+
*
50+
* @var boolean
51+
*/
52+
private $logWorkerOutputOnFailure;
53+
4054
/**
4155
* @var \Doctrine\ORM\EntityManager
4256
*/
@@ -65,6 +79,8 @@ public function __construct(
6579
$this->entityManager = $entityManager;
6680
$this->taskRepo = $this->entityManager->getRepository('WebdevviePheanstalkTaskQueueBundle:Task');
6781
$this->defaultTube = $params['default_tube'];
82+
$this->logWorkerOutputOnSuccess = $params['log_worker_output_on_success'];
83+
$this->logWorkerOutputOnFailure = $params['log_worker_output_on_failure'];
6884
}
6985

7086
/**
@@ -86,8 +102,7 @@ public function getDefaultTube()
86102
public function getStatusOfTaskWithId($taskId)
87103
{
88104
$task = $this->taskRepo->find($taskId);
89-
if(!($task instanceof Task))
90-
{
105+
if (!($task instanceof Task)) {
91106
//the task was not found
92107
return Task::STATUS_GONE;
93108
}
@@ -169,7 +184,7 @@ public function cleanUpTasks($timePeriod)
169184
t.created <= :older'
170185
);
171186
$query->setParameter('status', array(Task::STATUS_DONE));
172-
$query->setParameter('older', date("Y-m-d H:i:s", time()-$timePeriod));
187+
$query->setParameter('older', date("Y-m-d H:i:s", time() - $timePeriod));
173188
$query->execute();
174189
}
175190

@@ -206,7 +221,9 @@ public function reserveTask($tube = null)
206221
throw new TaskQueueServiceException("Invalid format in TaskQueue {$tube} ");
207222
} catch (\ReflectionException $exception) {
208223
$this->beanstalk->delete($inTask);
209-
throw new TaskQueueServiceException("Invalid format in TaskQueue {$tube} class ".$parts[0].' is unknown');
224+
throw new TaskQueueServiceException(
225+
"Invalid format in TaskQueue {$tube} class " . $parts[0] . ' is unknown'
226+
);
210227
}
211228
if (!($taskObject instanceof TaskDescriptionInterface)) {
212229
$this->beanstalk->delete($inTask);
@@ -231,11 +248,15 @@ public function reserveTask($tube = null)
231248
* Deletes a task from the queue
232249
*
233250
* @param WorkPackage $task
251+
* @param string $log
234252
* @throws TaskQueueServiceException
235253
* @return void
236254
*/
237-
public function markDone(WorkPackage $task)
255+
public function markDone(WorkPackage $task, $log)
238256
{
257+
if ($this->logWorkerOutputOnSuccess) {
258+
$this->updateTaskLog($task, $log);
259+
}
239260
$this->updateTaskStatus($task, Task::STATUS_DONE);
240261
$this->beanstalk->delete($task->getPheanstalkJob());
241262
}
@@ -244,16 +265,42 @@ public function markDone(WorkPackage $task)
244265
* Marks a job as failed and deletes it from the beanstalk tube
245266
*
246267
* @param WorkPackage $task
268+
* @param string $log
247269
* @throws TaskQueueServiceException
248270
* @return void
249271
*/
250-
public function markFailed(WorkPackage $task)
272+
public function markFailed(WorkPackage $task, $log)
251273
{
274+
if ($this->logWorkerOutputOnFailure) {
275+
$this->updateTaskLog($task, $log);
276+
}
252277
$this->updateTaskStatus($task, Task::STATUS_FAILED);
253278
$this->beanstalk->delete($task->getPheanstalkJob());
254279
}
255280

256281
/**
282+
* Writes the log to the Task entity
283+
*
284+
* @param WorkPackage $task
285+
* @param string $log
286+
* @return void
287+
* @throws TaskQueueServiceException
288+
*/
289+
private function updateTaskLog(WorkPackage $task, $log)
290+
{
291+
$taskEntity = $task->getTaskEntity();
292+
if ($taskEntity instanceof Task) {
293+
$taskEntity->setLog($log);
294+
//make sure it is stored...
295+
$this->entityManager->flush($taskEntity);
296+
} else {
297+
throw new TaskQueueServiceException("Entity is not of type Task");
298+
}
299+
}
300+
301+
/**
302+
* Updates the task status
303+
*
257304
* @param WorkPackage $task
258305
* @param string $status
259306
* @return void

0 commit comments

Comments
 (0)