Skip to content

Commit a1f668d

Browse files
authored
Merge pull request #2 from doppar/commands
console output improved and before and after callback added for comma…
2 parents 21b5fdf + fe7213d commit a1f668d

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

src/Commands/QueueRunCommand.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ protected function handle(): int
6565
$this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s");
6666

6767
try {
68+
$this->worker->setOnJobProcessing(function ($job) {
69+
$jobClass = get_class($job);
70+
$jobId = $job->getJobId() ?? 'N/A';
71+
$this->info("✔ Processing job [{$jobClass}] (ID: {$jobId})");
72+
});
73+
74+
$this->worker->setOnJobProcessed(function ($job) {
75+
$jobClass = get_class($job);
76+
$jobId = $job->getJobId() ?? 'N/A';
77+
$this->info("✔ Processed job [{$jobClass}] (ID: {$jobId})");
78+
79+
// Flush system output buffer
80+
flush();
81+
});
82+
6883
$this->worker->daemon($queue, [
6984
'sleep' => $sleep,
7085
'maxMemory' => $maxMemory,

src/QueueWorker.php

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ class QueueWorker
4242
*/
4343
protected $maxMemory = 128;
4444

45+
/**
46+
* Callback to be executed **before** a job is processed.
47+
*
48+
* @var callable|null
49+
*/
50+
protected $onJobProcessing;
51+
52+
/**
53+
* Callback to be executed **after** a job has been successfully processed.
54+
*
55+
*
56+
* @var callable|null
57+
*/
58+
protected $onJobProcessed;
59+
4560
/**
4661
* Create a new queue worker.
4762
*
@@ -52,6 +67,28 @@ public function __construct(QueueManager $manager)
5267
$this->manager = $manager;
5368
}
5469

70+
/**
71+
* Set the callback to be executed **before** a job is processed.
72+
*
73+
* @param callable $callback
74+
* @return void
75+
*/
76+
public function setOnJobProcessing(callable $callback): void
77+
{
78+
$this->onJobProcessing = $callback;
79+
}
80+
81+
/**
82+
* Set the callback to be executed **after** a job has been processed.
83+
*
84+
* @param callable $callback
85+
* @return void
86+
*/
87+
public function setOnJobProcessed(callable $callback): void
88+
{
89+
$this->onJobProcessed = $callback;
90+
}
91+
5592
/**
5693
* Run the worker daemon.
5794
*
@@ -125,13 +162,20 @@ protected function processJob(QueueJob $queueJob): void
125162
$job = $this->manager->unserializeJob($queueJob->payload);
126163
$job->attempts = $queueJob->attempts;
127164

165+
if (is_callable($this->onJobProcessing)) {
166+
($this->onJobProcessing)($job);
167+
}
168+
128169
// Execute the job
129170
$this->executeJob($job);
130171

131172
// Delete the job from queue if successful
132173
$this->manager->delete($queueJob);
133174

134-
$this->logInfo("Job {$job->getJobId()} processed successfully");
175+
// Trigger onJobProcessed callback
176+
if (is_callable($this->onJobProcessed)) {
177+
($this->onJobProcessed)($job);
178+
}
135179
} catch (\Throwable $e) {
136180
$this->handleJobException($queueJob, $job ?? null, $e);
137181
}

0 commit comments

Comments
 (0)