Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
colors="true"
cacheDirectory=".phpunit.cache"
bootstrap="tests/bootstrap.php"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd">
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/12.0/phpunit.xsd">
<testsuites>
<testsuite name="queue">
<directory>tests/TestCase</directory>
Expand Down
220 changes: 220 additions & 0 deletions src/Command/SubprocessJobRunnerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
<?php
declare(strict_types=1);

/**
* CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
* @link https://cakephp.org CakePHP(tm) Project
* @since 0.1.0
* @license https://opensource.org/licenses/MIT MIT License
*/
namespace Cake\Queue\Command;

use Cake\Command\Command;
use Cake\Console\Arguments;
use Cake\Console\ConsoleIo;
use Cake\Core\ContainerInterface;
use Cake\Log\Engine\ConsoleLog;
use Cake\Log\Log;
use Cake\Queue\Job\Message;
use Cake\Queue\Queue\Processor;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\NullMessage;
use Interop\Queue\Message as QueueMessage;
use Interop\Queue\Processor as InteropProcessor;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use RuntimeException;
use Throwable;

/**
* Subprocess job runner command.
* Executes a single job in an isolated subprocess.
*/
class SubprocessJobRunnerCommand extends Command
{
/**
* @param \Cake\Core\ContainerInterface|null $container DI container instance
*/
public function __construct(
protected readonly ?ContainerInterface $container = null,
) {
}

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
return 'queue subprocess-runner';
}

/**
* Execute a single job from STDIN and output result to STDOUT.
*
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return int
*/
public function execute(Arguments $args, ConsoleIo $io): int
{
$input = $this->readInput($io);

if (empty($input)) {
$this->outputResult($io, [
'success' => false,
'error' => 'No input received',
]);

return self::CODE_ERROR;
}

$data = json_decode($input, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->outputResult($io, [
'success' => false,
'error' => 'Invalid JSON input: ' . json_last_error_msg(),
]);

return self::CODE_ERROR;
}

try {
$result = $this->executeJob($data);
$this->outputResult($io, [
'success' => true,
'result' => $result,
]);

return self::CODE_SUCCESS;
} catch (Throwable $throwable) {
$this->outputResult($io, [
'success' => false,
'result' => InteropProcessor::REQUEUE,
'exception' => [
'class' => get_class($throwable),
'message' => $throwable->getMessage(),
'code' => $throwable->getCode(),
'file' => $throwable->getFile(),
'line' => $throwable->getLine(),
'trace' => $throwable->getTraceAsString(),
],
]);

return self::CODE_SUCCESS;
}
}

/**
* Read input from STDIN or ConsoleIo
*
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return string
*/
protected function readInput(ConsoleIo $io): string
{
$input = '';
while (!feof(STDIN)) {
$chunk = fread(STDIN, 8192);
if ($chunk === false) {
break;
}

$input .= $chunk;
}

return $input;
}

/**
* Execute the job with the provided data.
*
* @param array<string, mixed> $data Job data
* @return string
*/
protected function executeJob(array $data): string
{
$connectionFactory = new NullConnectionFactory();
$context = $connectionFactory->createContext();

$messageClass = $data['messageClass'] ?? NullMessage::class;

// Validate message class for security
if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) {
throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass));
}

$messageBody = json_encode($data['body']);

/** @var \Interop\Queue\Message $queueMessage */
$queueMessage = new $messageClass($messageBody);

if (isset($data['properties']) && is_array($data['properties'])) {
foreach ($data['properties'] as $key => $value) {
$queueMessage->setProperty($key, $value);
}
}

$logger = $this->configureLogging($data);

$message = new Message($queueMessage, $context, $this->container);
$processor = new Processor($logger, $this->container);

$result = $processor->processMessage($message);

// Result is string|object (with __toString)
/** @phpstan-ignore cast.string */
return is_string($result) ? $result : (string)$result;
}

/**
* Configure logging to use STDERR to prevent job logs from contaminating STDOUT.
* Reconfigures all CakePHP loggers to write to STDERR with no additional formatting.
*
* @param array<string, mixed> $data Job data
* @return \Psr\Log\LoggerInterface
*/
protected function configureLogging(array $data): LoggerInterface
{
// Drop all existing loggers to prevent duplicate logging
foreach (Log::configured() as $loggerName) {
Log::drop($loggerName);
}

// Configure a single stderr logger
Log::setConfig('default', [
'className' => ConsoleLog::class,
'stream' => 'php://stderr',
]);

$logger = Log::engine('default');
if (!$logger instanceof LoggerInterface) {
$logger = new NullLogger();
}

return $logger;
}

/**
* Output result as JSON to STDOUT.
*
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @param array<string, mixed> $result Result data
* @return void
*/
protected function outputResult(ConsoleIo $io, array $result): void
{
$json = json_encode($result);
if ($json !== false) {
$io->out($json);
}
}
}
34 changes: 31 additions & 3 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension;
use Cake\Queue\Listener\FailedJobsListener;
use Cake\Queue\Queue\Processor;
use Cake\Queue\Queue\SubprocessProcessor;
use Cake\Queue\QueueManager;
use DateTime;
use Enqueue\Consumption\ChainExtension;
Expand Down Expand Up @@ -105,6 +106,11 @@ public function getOptionParser(): ConsoleOptionParser
'default' => null,
'short' => 'a',
]);
$parser->addOption('subprocess', [
'help' => 'Execute jobs in a subprocess. Useful for development to reload code for each job.',
'boolean' => true,
'default' => false,
]);
$parser->setDescription(
'Runs a queue worker that consumes from the named queue.',
);
Expand Down Expand Up @@ -154,12 +160,13 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
* Creates and returns a LoggerInterface object
*
* @param \Cake\Console\Arguments $args Arguments
* @param bool $forceLogger Force logger creation even without verbose flag
* @return \Psr\Log\LoggerInterface
*/
protected function getLogger(Arguments $args): LoggerInterface
protected function getLogger(Arguments $args, bool $forceLogger = false): LoggerInterface
{
$logger = null;
if (!empty($args->getOption('verbose'))) {
if ($forceLogger || !empty($args->getOption('verbose'))) {
$logger = Log::engine((string)$args->getOption('logger'));
}

Expand Down Expand Up @@ -191,7 +198,28 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
$this->abort();
}

return new $processorClass($logger, $this->container);
// Check subprocess mode before instantiating processor
if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) {
if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) {
$io->error('Subprocess mode is only supported with the default Processor class');
$this->abort();
}

$subprocessConfig = array_merge(
$config['subprocess'] ?? [],
[
'enabled' => true,
'logger' => $config['logger'] ?? (string)$args->getOption('logger'),
],
);

$subprocessLogger = $this->getLogger($args, forceLogger: true);
$processor = new SubprocessProcessor($subprocessLogger, $subprocessConfig, $this->container);
} else {
$processor = new $processorClass($logger, $this->container);
}

return $processor;
}

/**
Expand Down
14 changes: 13 additions & 1 deletion src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function process(QueueMessage $message, Context $context): string|object
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);

try {
$response = $this->processMessage($jobMessage);
$response = $this->executeJob($jobMessage, $message);
} catch (Throwable $throwable) {
$message->setProperty('jobException', $throwable);

Expand Down Expand Up @@ -116,6 +116,18 @@ public function process(QueueMessage $message, Context $context): string|object
return InteropProcessor::REQUEUE;
}

/**
* Execute the job and return the response.
*
* @param \Cake\Queue\Job\Message $jobMessage Job message wrapper
* @param \Interop\Queue\Message $queueMessage Original queue message
* @return object|string with __toString method implemented
*/
protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object
{
return $this->processMessage($jobMessage);
}

/**
* @param \Cake\Queue\Job\Message $message Message.
* @return object|string with __toString method implemented
Expand Down
Loading