Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/Commands/QueueRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ protected function handle(): int
$this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s");

try {
$this->worker->setOnJobProcessing(function ($job) {
$jobClass = get_class($job);
$jobId = $job->getJobId() ?? 'N/A';
$this->info("✔ Processing job [{$jobClass}] (ID: {$jobId})");
});

$this->worker->setOnJobProcessed(function ($job) {
$jobClass = get_class($job);
$jobId = $job->getJobId() ?? 'N/A';
$this->info("✔ Processed job [{$jobClass}] (ID: {$jobId})");

// Flush system output buffer
flush();
});

$this->worker->daemon($queue, [
'sleep' => $sleep,
'maxMemory' => $maxMemory,
Expand Down
46 changes: 45 additions & 1 deletion src/QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ class QueueWorker
*/
protected $maxMemory = 128;

/**
* Callback to be executed **before** a job is processed.
*
* @var callable|null
*/
protected $onJobProcessing;

/**
* Callback to be executed **after** a job has been successfully processed.
*
*
* @var callable|null
*/
protected $onJobProcessed;

/**
* Create a new queue worker.
*
Expand All @@ -52,6 +67,28 @@ public function __construct(QueueManager $manager)
$this->manager = $manager;
}

/**
* Set the callback to be executed **before** a job is processed.
*
* @param callable $callback
* @return void
*/
public function setOnJobProcessing(callable $callback): void
{
$this->onJobProcessing = $callback;
}

/**
* Set the callback to be executed **after** a job has been processed.
*
* @param callable $callback
* @return void
*/
public function setOnJobProcessed(callable $callback): void
{
$this->onJobProcessed = $callback;
}

/**
* Run the worker daemon.
*
Expand Down Expand Up @@ -125,13 +162,20 @@ protected function processJob(QueueJob $queueJob): void
$job = $this->manager->unserializeJob($queueJob->payload);
$job->attempts = $queueJob->attempts;

if (is_callable($this->onJobProcessing)) {
($this->onJobProcessing)($job);
}

// Execute the job
$this->executeJob($job);

// Delete the job from queue if successful
$this->manager->delete($queueJob);

$this->logInfo("Job {$job->getJobId()} processed successfully");
// Trigger onJobProcessed callback
if (is_callable($this->onJobProcessed)) {
($this->onJobProcessed)($job);
}
} catch (\Throwable $e) {
$this->handleJobException($queueJob, $job ?? null, $e);
}
Expand Down
Loading