Skip to content

Commit 0a939e0

Browse files
committed
Added logging of output from worker (this is an option you can enable for both successful and failed tasks)
1 parent 6dfa777 commit 0a939e0

File tree

5 files changed

+94
-12
lines changed

5 files changed

+94
-12
lines changed

Command/WorkerCommand.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ private function runTask(WorkPackage $taskObject)
8787
try {
8888
$command = $this->taskCommandGenerator->generate($taskObject);
8989
} catch (TaskCommandGeneratorException $exception) {
90-
$this->verboseOutput("Invalid command supplied in " . get_class($taskObject));
91-
$this->taskQueueService->markFailed($taskObject);
90+
$logMessage = "Invalid command supplied in " . get_class($taskObject);
91+
$this->verboseOutput($logMessage);
92+
$this->taskQueueService->markFailed($taskObject, $logMessage);
9293
return;
9394
}
9495
$fullCommand = 'exec app/console ' . $command;
@@ -100,6 +101,7 @@ private function runTask(WorkPackage $taskObject)
100101
$hasReceivedSignal = false;
101102
$exitCode = 0;
102103
$exitCodeText = '';
104+
$childOutput = '';
103105
while ($runningCommand) {
104106
$this->handleRunningProcess(
105107
$process,
@@ -111,15 +113,16 @@ private function runTask(WorkPackage $taskObject)
111113
$exitCodeText
112114
);
113115
$incOut = $process->getIncrementalOutput();
116+
$childOutput .= $incOut;
114117
if (strlen($incOut) > 0) {
115118
$this->output->write($incOut);
116119
}
117120
}
118121
if ($failed) {
119122
$this->verboseOutput("FAILED CHILD: " . $exitCode . "::" . $exitCodeText);
120-
$this->taskQueueService->markFailed($taskObject);
123+
$this->taskQueueService->markFailed($taskObject, $childOutput);
121124
} else {
122-
$this->taskQueueService->markDone($taskObject);
125+
$this->taskQueueService->markDone($taskObject, $childOutput);
123126
}
124127
if ($process->isRunning()) {
125128
$process->stop();

DependencyInjection/Configuration.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public function getConfigTreeBuilder()
2424
$rootNode
2525
->children()
2626
->scalarNode('default_tube')->isRequired()->end()
27+
->booleanNode('log_worker_output_on_failure')->defaultTrue()->end()
28+
->booleanNode('log_worker_output_on_success')->defaultFalse()->end()
2729
->end();
2830
return $treeBuilder;
2931
}

Entity/Task.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ class Task
7474
*/
7575
private $data;
7676

77+
/**
78+
* The output supplied by the command
79+
* @var string
80+
*
81+
* @ORM\Column(name="log", type="text", nullable=true)
82+
*/
83+
private $log;
84+
7785
/**
7886
* The tube this task was sent to
7987
* @var string
@@ -171,6 +179,19 @@ public function setStatus($status)
171179
return $this;
172180
}
173181

182+
/**
183+
* Set log
184+
*
185+
* @param string $status
186+
* @return task
187+
*/
188+
public function setLog($log)
189+
{
190+
$this->log = $log;
191+
$this->modified = new \DateTime();
192+
return $this;
193+
}
194+
174195
/**
175196
* Get status
176197
*
@@ -181,7 +202,15 @@ public function getStatus()
181202
return $this->status;
182203
}
183204

184-
205+
/**
206+
* Get log
207+
*
208+
* @return string
209+
*/
210+
public function getLog()
211+
{
212+
return $this->status;
213+
}
185214

186215
/**
187216
* Get created

Service/TaskQueueService.php

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ class TaskQueueService
3737
*/
3838
private $defaultTube;
3939

40+
/**
41+
* Log the worker output on success
42+
*
43+
* @var boolean
44+
*/
45+
private $logWorkerOutputOnSuccess;
46+
47+
/**
48+
* Log the worker output on failure
49+
*
50+
* @var boolean
51+
*/
52+
private $logWorkerOutputOnFailure;
53+
4054
/**
4155
* @var \Doctrine\ORM\EntityManager
4256
*/
@@ -65,6 +79,8 @@ public function __construct(
6579
$this->entityManager = $entityManager;
6680
$this->taskRepo = $this->entityManager->getRepository('WebdevviePheanstalkTaskQueueBundle:Task');
6781
$this->defaultTube = $params['default_tube'];
82+
$this->logWorkerOutputOnSuccess = $params['log_worker_output_on_success'];
83+
$this->logWorkerOutputOnFailure = $params['log_worker_output_on_failure'];
6884
}
6985

7086
/**
@@ -232,11 +248,15 @@ public function reserveTask($tube = null)
232248
* Deletes a task from the queue
233249
*
234250
* @param WorkPackage $task
251+
* @param string $log
235252
* @throws TaskQueueServiceException
236253
* @return void
237254
*/
238-
public function markDone(WorkPackage $task)
255+
public function markDone(WorkPackage $task, $log)
239256
{
257+
if ($this->logWorkerOutputOnSuccess) {
258+
$this->updateTaskLog($task, $log);
259+
}
240260
$this->updateTaskStatus($task, Task::STATUS_DONE);
241261
$this->beanstalk->delete($task->getPheanstalkJob());
242262
}
@@ -245,15 +265,39 @@ public function markDone(WorkPackage $task)
245265
* Marks a job as failed and deletes it from the beanstalk tube
246266
*
247267
* @param WorkPackage $task
268+
* @param string $log
248269
* @throws TaskQueueServiceException
249270
* @return void
250271
*/
251-
public function markFailed(WorkPackage $task)
272+
public function markFailed(WorkPackage $task, $log)
252273
{
274+
if ($this->logWorkerOutputOnFailure) {
275+
$this->updateTaskLog($task, $log);
276+
}
253277
$this->updateTaskStatus($task, Task::STATUS_FAILED);
254278
$this->beanstalk->delete($task->getPheanstalkJob());
255279
}
256280

281+
/**
282+
* Writes the log to the Task entity
283+
*
284+
* @param WorkPackage $task
285+
* @param string $log
286+
* @return void
287+
* @throws TaskQueueServiceException
288+
*/
289+
private function updateTaskLog(WorkPackage $task, $log)
290+
{
291+
$taskEntity = $task->getTaskEntity();
292+
if ($taskEntity instanceof Task) {
293+
$taskEntity->setLog($log);
294+
//make sure it is stored...
295+
$this->entityManager->flush($taskEntity);
296+
} else {
297+
throw new TaskQueueServiceException("Entity is not of type Task");
298+
}
299+
}
300+
257301
/**
258302
* Updates the task status
259303
*

Tests/Service/TaskQueueServiceTest.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,15 @@ public function testMarkDone()
138138
$workPackage = new WorkPackage($taskEntity, $job, $exampleTask);
139139

140140
$this->entityManager->shouldReceive('persist')->withArgs([$taskEntity]);
141-
$this->entityManager->shouldReceive('flush');
141+
$this->entityManager->shouldReceive('flush')->twice();
142142
$this->pheanStalkProxy->shouldReceive('delete');
143143
$service = new TaskQueueService(
144144
$this->entityManager,
145145
$this->pheanStalkProxy,
146146
$this->serializer,
147147
$this->params
148148
);
149-
$service->markDone($workPackage);
149+
$service->markDone($workPackage, 'log goes here');
150150
}
151151

152152
/**
@@ -165,15 +165,15 @@ public function testMarkFailed()
165165
$workPackage = new WorkPackage($taskEntity, $job, $exampleTask);
166166

167167
$this->entityManager->shouldReceive('persist')->withArgs([$taskEntity]);
168-
$this->entityManager->shouldReceive('flush');
168+
$this->entityManager->shouldReceive('flush')->twice();
169169
$this->pheanStalkProxy->shouldReceive('delete');
170170
$service = new TaskQueueService(
171171
$this->entityManager,
172172
$this->pheanStalkProxy,
173173
$this->serializer,
174174
$this->params
175175
);
176-
$service->markDone($workPackage);
176+
$service->markDone($workPackage, 'log goes here');
177177
}
178178

179179
/**
@@ -348,6 +348,10 @@ public function setupPheanstalkProxyMock()
348348
*/
349349
public function generateFakeParams()
350350
{
351-
$this->params = ["default_tube" => "gtldtube"];
351+
$this->params = array(
352+
"default_tube" => "gtldtube",
353+
"log_worker_output_on_success" => true,
354+
"log_worker_output_on_failure" => true
355+
);
352356
}
353357
}

0 commit comments

Comments
 (0)