diff --git a/CHANGELOG.md b/CHANGELOG.md index ac06a6c..155c08f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,8 @@ - This version is a complete rewrite of SlmQueue. It is now splitted in several modules and support both Beanstalkd and Amazon SQS queue systems through SlmQueueBeanstalkd and SlmQueueSqs modules. + +# 2.0.0 + +- Complete rewrite to support latest Laminas framework, Pheanstalk and PHPUnit +- Require PHP 7 diff --git a/Module.php b/Module.php index 4d1d789..c56f9d1 100644 --- a/Module.php +++ b/Module.php @@ -1,68 +1,3 @@ array( - Loader\StandardAutoloader::LOAD_NS => array( - __NAMESPACE__ => __DIR__ . '/src/' . __NAMESPACE__, - ), - ), - ); - } - - /** - * {@inheritDoc} - */ - public function getConfig() - { - return include __DIR__ . '/config/module.config.php'; - } - - /** - * {@inheritDoc} - */ - public function getConsoleBanner(AdapterInterface $console) - { - return 'SlmQueueBeanstalkd'; - } - - /** - * {@inheritDoc} - */ - public function getConsoleUsage(AdapterInterface $console) - { - return array( - 'queue beanstalkd [--timeout=]' => 'Process jobs with beanstalkd', - - array('', 'Queue\'s name to process'), - array('--timeout=', 'Timeout (in seconds) to wait for a job to arrive') - ); - } - - /** - * {@inheritDoc} - */ - public function getModuleDependencies() - { - return array('SlmQueue'); - } -} +require __DIR__ . '/src/Module.php'; \ No newline at end of file diff --git a/README.md b/README.md index e619f1a..4c6e3e7 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Created by Jurian Sluiman and Michaël Gallego Requirements ------------ -* [Zend Framework 2](https://github.com/zendframework/zf2) +* [Laminas MVC Skeleton](https://github.com/laminas/laminas-mvc-skeleton) * [SlmQueue](https://github.com/juriansluiman/SlmQueue) * [Pda Pheanstalk](https://github.com/pda/pheanstalk) @@ -24,7 +24,7 @@ add the following line into your `composer.json` file: ```json "require": { - "slm/queue-beanstalkd": "0.4.*" + "slm/queue-beanstalkd": "2.0.*" } ``` diff --git a/composer.json b/composer.json index fc5f552..1fc83e4 100644 --- a/composer.json +++ b/composer.json @@ -1,53 +1,74 @@ { - "name": "slm/queue-beanstalkd", - "description": "Zend Framework 2 module that integrates with Beanstalkd queuing system", - "license": "BSD-3-Clause", - "type": "library", - "keywords": [ - "zf2", - "queue", - "job", - "beanstalk", - "beanstalkd", - "pheanstalk" - ], - "homepage": "https://github.com/juriansluiman/SlmQueueBeanstalkd", - "authors": [ - { - "name": "Jurian Sluiman", - "email": "jurian@juriansluiman.nl", - "homepage": "http://juriansluiman.nl" - }, - { - "name": "Michaël Gallego", - "email": "mic.gallego@gmail.com", - "homepage": "http://www.michaelgallego.fr" - } - ], - "require": { - "php": ">=5.3.3", - "zendframework/zend-mvc": "~2.2", - "zendframework/zend-servicemanager": "~2.2", - "zendframework/zend-stdlib": "~2.2", - "slm/queue": "0.4.*", - "pda/pheanstalk": "~3.0" + "name": "slm/queue-Beanstalkd", + "description": "Laminas Framework module that integrates Beanstalkd as queuing system", + "license": "BSD-3-Clause", + "type": "library", + "keywords": [ + "laminas", + "mezzio", + "queue", + "job", + "beanstalkd", + "pheanstalk" + ], + "homepage": "https://github.com/JouwWeb/SlmQueueBeanstalkd", + "authors": [ + { + "name": "Stefan Kleff", + "email": "s.kleff@goalio.de", + "homepage": "http://www.goalio.de" }, - "require-dev": { - "zendframework/zendframework": "~2.2", - "squizlabs/php_codesniffer": "1.5.*", - "phpunit/phpunit": "~4.1" - }, - "extra": { - "branch-alias": { - "dev-master": "0.4.x-dev" - } + { + "name": "Bas Kamer", + "email": "baskamer@gmail.com", + "homepage": "https://bushbaby.nl" + } + ], + "require": { + "php": "^7.2", + "laminas/laminas-eventmanager": "^3.2.1", + "laminas/laminas-servicemanager": "^3.3.1", + "laminas/laminas-stdlib": "^3.2", + "slm/queue": "^2.0", + "pda/pheanstalk": "^4.0" + }, + "require-dev": { + "laminas/laminas-config": "^2.6 || ^3.0", + "laminas/laminas-modulemanager": "^2.8", + "laminas/laminas-view": "^2.10", + "laminas/laminas-log": "^2.9", + "laminas/laminas-i18n": "^2.6", + "laminas/laminas-serializer": "^2.8", + "laminas/laminas-mvc": "^2.7.13", + "phpunit/phpunit": "^8.5", + "squizlabs/php_codesniffer": "^3.5", + "php-coveralls/php-coveralls": "^2.0" + }, + "extra": { + "branch-alias": { + "dev-master": "3.1.x-dev" }, - "autoload": { - "psr-0": { - "SlmQueueBeanstalkd": "src/" - }, - "classmap": [ - "./Module.php" - ] + "laminas": { + "module": "SlmQueueBeanstalkd\\Module", + "config-provider": "SlmQueueBeanstalkd\\ConfigProvider", + "component-whitelist": [ + "slm/queue" + ] + } + }, + "autoload": { + "psr-4": { + "SlmQueueBeanstalkd\\": "src/", + "SlmQueueTest\\": "vendor/slm/queue/tests/" + } + }, + "autoload-dev": { + "psr-4": { + "SlmQueueBeanstalkdTest\\": "tests/" } + }, + "scripts": { + "cs-check": "phpcs", + "cs-fix": "phpcbf" + } } diff --git a/config/module.config.php b/config/module.config.php index 1bc2a94..14b7a36 100644 --- a/config/module.config.php +++ b/config/module.config.php @@ -1,44 +1,85 @@ array( - 'factories' => array( - 'SlmQueueBeanstalkd\Options\BeanstalkdOptions' => 'SlmQueueBeanstalkd\Factory\BeanstalkdOptionsFactory', - 'SlmQueueBeanstalkd\Service\PheanstalkService' => 'SlmQueueBeanstalkd\Factory\PheanstalkFactory', - 'SlmQueueBeanstalkd\Worker\BeanstalkdWorker' => 'SlmQueue\Factory\WorkerFactory' - ) - ), - - 'controllers' => array( - 'factories' => array( - 'SlmQueueBeanstalkd\Controller\BeanstalkdWorkerController' => 'SlmQueueBeanstalkd\Factory\BeanstalkdWorkerControllerFactory', - ), - ), +use SlmQueue\Strategy\MaxRunsStrategy; +use SlmQueueBeanstalkd\Factory\BeanstalkdQueueFactory; +use SlmQueue\Factory\WorkerFactory; +use SlmQueueBeanstalkd\Controller\BeanstalkdWorkerController; +use SlmQueueBeanstalkd\Factory\BeanstalkdWorkerControllerFactory; +use SlmQueueBeanstalkd\Strategy\ClearObjectManagerStrategy; +use SlmQueueBeanstalkd\Worker\BeanstalkdWorker; - 'console' => array( - 'router' => array( - 'routes' => array( - 'slm-queue-beanstalked-worker' => array( +return [ + 'service_manager' => [ + 'factories' => [ + BeanstalkdWorker::class => WorkerFactory::class, + \Pheanstalk::class => \SlmQueueBeanstalkd\Factory\PheanstalkFactory::class + ] + ], + 'controllers' => [ + 'factories' => [ + BeanstalkdWorkerController::class => BeanstalkdWorkerControllerFactory::class, + \Pheanstalk::class => \SlmQueueBeanstalkd\Factory\PheanstalkFactory::class + ], + ], + 'console' => [ + 'router' => [ + 'routes' => [ + 'slm-queue-Beanstalkd-worker' => [ 'type' => 'Simple', - 'options' => array( - 'route' => 'queue beanstalkd [--timeout=]', - 'defaults' => array( - 'controller' => 'SlmQueueBeanstalkd\Controller\BeanstalkdWorkerController', + 'options' => [ + 'route' => 'queue Beanstalkd [--timeout=] --start', + 'defaults' => [ + 'controller' => BeanstalkdWorkerController::class, 'action' => 'process' - ), - ), - ), - ), - ), - ), - - 'slm_queue' => array( - 'beanstalkd' => array( - 'connection' => array( - 'host' => '0.0.0.0', - 'port' => 11300, - 'timeout' => 2 - ) + ], + ], + ], + 'slm-queue-Beanstalkd-recover' => [ + 'type' => 'Simple', + 'options' => [ + 'route' => 'queue Beanstalkd --recover [--executionTime=]', + 'defaults' => [ + 'controller' => BeanstalkdWorkerController::class, + 'action' => 'recover' + ], + ], + ], + 'slm-queue-Beanstalkd-stats' => [ + 'type' => 'Simple', + 'options' => [ + 'route' => 'queue Beanstalkd --stats', + 'defaults' => [ + 'controller' => BeanstalkdWorkerController::class, + 'action' => 'stats' + ], + ], + ], + ], + ], + ], + 'slm_queue' => [ + 'worker_strategies' => [ + 'default' => [ + MaxRunsStrategy::class => ['max_runs' => 1] + ] + ], + 'queues' => [ + 'my-beanstalkd-queue' => [ + 'deleted_lifetime' => -1, + 'buried_lifetime' => -1, + ], + ], + 'queue_manager' => [ + 'factories' => [ + 'mailing' => BeanstalkdQueueFactory::class + ] + ], + ], + 'beanstalkd' => array( + 'connection' => array( + 'host' => '0.0.0.0', + 'port' => 11300, + 'timeout' => 2 ) ) -); +]; \ No newline at end of file diff --git a/config/slm_queue_beanstalkd.local.php.dist b/config/slm_queue_beanstalkd.local.php.dist index 1f0479a..8ee123b 100644 --- a/config/slm_queue_beanstalkd.local.php.dist +++ b/config/slm_queue_beanstalkd.local.php.dist @@ -1,40 +1,74 @@ array( - /** - * Configuration for Beanstalkd - */ - 'beanstalkd' => array( - 'connection' => array( - /** - * Connection host - */ - // 'host' => '0.0.0.0', - - /** - * Connection post - */ - // 'port' => 11300, - - /** - * How long, in seconds, the socket will wait for the server to respond to its - * initial connection attempt - */ - // 'timeout' => 2 - ), - ), - /** - * Beanstalkd tubes that should be used for queues - */ - 'queues' => array( - // 'my-queue' => array('tube' => 'beanstalkd-tube-for-my-queue'), - ), - - ), -); +return [ + 'service_manager' => [ + 'factories' => [ + BeanstalkdWorker::class => WorkerFactory::class, + ] + ], + 'controllers' => [ + 'factories' => [ + BeanstalkdWorkerController::class => BeanstalkdWorkerControllerFactory::class, + \Pheanstalk::class => \SlmQueueBeanstalkd\Factory\PheanstalkFactory::class + ], + ], + 'console' => [ + 'router' => [ + 'routes' => [ + 'slm-queue-Beanstalkd-worker' => [ + 'type' => 'Simple', + 'options' => [ + 'route' => 'queue Beanstalkd [--timeout=] --start', + 'defaults' => [ + 'controller' => BeanstalkdWorkerController::class, + 'action' => 'process' + ], + ], + ], + 'slm-queue-Beanstalkd-recover' => [ + 'type' => 'Simple', + 'options' => [ + 'route' => 'queue Beanstalkd --recover [--executionTime=]', + 'defaults' => [ + 'controller' => BeanstalkdWorkerController::class, + 'action' => 'recover' + ], + ], + ], + ], + ], + ], + 'slm_queue' => [ + 'worker_strategies' => [ + 'default' => [ + MaxRunsStrategy::class => ['max_runs' => 1] + ] + ], + 'queues' => [ + 'my-beanstalkd-queue' => [ + 'deleted_lifetime' => -1, + 'buried_lifetime' => -1, + ], + ], + 'queue_manager' => [ + 'factories' => [ + 'mailing' => BeanstalkdQueueFactory::class + ] + ], + ], + 'beanstalkd' => array( + 'connection' => array( + 'host' => '0.0.0.0', + 'port' => 11300, + 'timeout' => 2 + ) + ) +]; \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml index 6eb9eea..45d9c82 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -9,7 +9,6 @@ stopOnFailure="false" processIsolation="false" backupGlobals="false" - syntaxCheck="true" > ./tests diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php new file mode 100644 index 0000000..7204c19 --- /dev/null +++ b/src/ConfigProvider.php @@ -0,0 +1,17 @@ +getConfig(); + + return [ + 'dependencies' => $config['service_manager'], + 'slm_queue' => $config['slm_queue'], + ]; + } +} diff --git a/src/Controller/BeanstalkdWorkerController.php b/src/Controller/BeanstalkdWorkerController.php new file mode 100644 index 0000000..a492f7f --- /dev/null +++ b/src/Controller/BeanstalkdWorkerController.php @@ -0,0 +1,72 @@ +params('queue'); + $executionTime = $this->params('executionTime', 0); + $queue = $this->queuePluginManager->get($queueName); + + if (! $queue instanceof BeanstalkdQueueInterface) { + return sprintf("\nQueue % does not support the recovering of job\n\n", $queueName); + } + + try { + $count = $queue->recover($executionTime); + } catch (ExceptionInterface $exception) { + throw new WorkerProcessException("An error occurred", $exception->getCode(), $exception); + } + + return sprintf( + "\nWork for queue %s is done, %s jobs were recovered\n\n", + $queueName, + $count + ); + } + + public function processAction(): string + { + $options = $this->params()->fromRoute(); + $name = $options['queue']; + $queue = $this->queuePluginManager->get($name); + + try { + $messages = $this->worker->processQueue($queue, $options); + } catch (ExceptionInterface $e) { + throw new WorkerProcessException( + 'Caught exception while processing queue', + $e->getCode(), + $e + ); + } + + return $this->formatOutput($name, $messages); + } + + public function statsAction() { + // @todo + $name = 'mailing'; + $worker = $this->worker; + $queue = $this->queuePluginManager->get($name); + + $stats = $queue->getPheanstalk()->stats(); + + var_dump($stats); + die(); + } +} \ No newline at end of file diff --git a/src/Exeption/ExceptionInterface.php b/src/Exeption/ExceptionInterface.php new file mode 100644 index 0000000..cad02a9 --- /dev/null +++ b/src/Exeption/ExceptionInterface.php @@ -0,0 +1,12 @@ +get('ServiceManager'); + + $config = $container->get('config'); + $queuesConfig = $config['slm_queue']['queues']; + $options = isset($queuesConfig[$requestedName]) ? $queuesConfig[$requestedName] : []; + $queueOptions = new BeanstalkdQueueOptions($options); + + $jobPluginManager = $container->get(JobPluginManager::class); + $pheanstalk = $sm->get(\Pheanstalk::class); + + + return new BeanstalkdQueue($pheanstalk, $requestedName, $jobPluginManager, $queueOptions); + } +} \ No newline at end of file diff --git a/src/Factory/BeanstalkdWorkerControllerFactory.php b/src/Factory/BeanstalkdWorkerControllerFactory.php new file mode 100644 index 0000000..6ecd31f --- /dev/null +++ b/src/Factory/BeanstalkdWorkerControllerFactory.php @@ -0,0 +1,28 @@ +get('ServiceManager'); + + $worker = $sm->get(BeanstalkdWorker::class); + $queuePluginManager = $sm->get(QueuePluginManager::class); + + return new BeanstalkdWorkerController($worker, $queuePluginManager); + } +} \ No newline at end of file diff --git a/src/Factory/PheanstalkConnectionFactory.php b/src/Factory/PheanstalkConnectionFactory.php new file mode 100644 index 0000000..dfb72e4 --- /dev/null +++ b/src/Factory/PheanstalkConnectionFactory.php @@ -0,0 +1,35 @@ +get('config'); + if ($config && isset($config['beanstalkd']['connection'])) { + $options = $config['beanstalkd']['connection']; + } + + $connectionOptions = new ConnectionOptions($options); + + $socketFactory = new SocketFactory( + $connectionOptions->getHost(), + $connectionOptions->getPort(), + $connectionOptions->getTimeout() + ); + + $connection = new Connection($socketFactory); + + return $connection; + } +} \ No newline at end of file diff --git a/src/Factory/PheanstalkFactory.php b/src/Factory/PheanstalkFactory.php new file mode 100644 index 0000000..7497f40 --- /dev/null +++ b/src/Factory/PheanstalkFactory.php @@ -0,0 +1,30 @@ +get('ServiceManager'); + $factory = new PheanstalkConnectionFactory(); + $connection = $factory($sm, \Pheanstalk::class); + +// var_dump($connection); die(); + + $pheanstalk = new Pheanstalk($connection); + + return $pheanstalk; + } +} diff --git a/src/Job/Exception/BuryableException.php b/src/Job/Exception/BuryableException.php new file mode 100644 index 0000000..760a1e9 --- /dev/null +++ b/src/Job/Exception/BuryableException.php @@ -0,0 +1,37 @@ +options = $options; + } + + /** + * Get the options + */ + public function getOptions(): array + { + return $this->options; + } +} \ No newline at end of file diff --git a/src/Job/Exception/ReleasableException.php b/src/Job/Exception/ReleasableException.php new file mode 100644 index 0000000..b500a02 --- /dev/null +++ b/src/Job/Exception/ReleasableException.php @@ -0,0 +1,37 @@ +options = $options; + } + + /** + * Get the options + */ + public function getOptions(): array + { + return $this->options; + } +} \ No newline at end of file diff --git a/src/Job/JobId.php b/src/Job/JobId.php new file mode 100644 index 0000000..3bc9b2b --- /dev/null +++ b/src/Job/JobId.php @@ -0,0 +1,38 @@ +id = $id; + } + + /** + * @return int + */ + public function getId(): int + { + return $this->id; + } + + /** + * @param int $id + */ + public function setId(int $id): void + { + $this->id = $id; + } + + /** + * @var int + */ + protected $id; + +} \ No newline at end of file diff --git a/src/Job/JobInterface.php b/src/Job/JobInterface.php new file mode 100644 index 0000000..196ab6f --- /dev/null +++ b/src/Job/JobInterface.php @@ -0,0 +1,11 @@ + 'A short description of that parameter', + * '-another-parameter' => 'A short description of another parameter', + * ... + * ) + * + * @param AdapterInterface $console + * @return array|string|null + */ + public function getConsoleUsage(Console $console) + { + return [ + 'queue beanstalkd --start [--timeout=]' => 'Process Beanstalkd queue', + 'queue beanstalkd --recover [--executionTime=]' => 'Recover Beanstalkd worker', + ]; + } +} \ No newline at end of file diff --git a/src/Options/BeanstalkdOptions.php b/src/Options/BeanstalkdOptions.php new file mode 100644 index 0000000..35353d0 --- /dev/null +++ b/src/Options/BeanstalkdOptions.php @@ -0,0 +1,36 @@ +connection = new ConnectionOptions($options); + } + + /** + * Get the connection options + * + * @return ConnectionOptions + */ + public function getConnection() + { + return $this->connection; + } +} diff --git a/src/Options/BeanstalkdQueueOptions.php b/src/Options/BeanstalkdQueueOptions.php new file mode 100644 index 0000000..c7a4489 --- /dev/null +++ b/src/Options/BeanstalkdQueueOptions.php @@ -0,0 +1,82 @@ +deletedLifetime; + } + + /** + * @param int $deletedLifetime + */ + public function setDeletedLifetime(int $deletedLifetime): void + { + $this->deletedLifetime = $deletedLifetime; + } + + /** + * @return int + */ + public function getBuriedLifetime(): int + { + return $this->buriedLifetime; + } + + /** + * @param int $buriedLifetime + */ + public function setBuriedLifetime(int $buriedLifetime): void + { + $this->buriedLifetime = $buriedLifetime; + } + + /** + * Get beanstalkd tube name for queue + * @return string + */ + public function getTube() + { + return $this->tube; + } + + /** + * Set beanstalkd tube for queue + * @param string $tube + */ + public function setTube($tube) + { + $this->tube = $tube; + } +} \ No newline at end of file diff --git a/src/Options/ConnectionOptions.php b/src/Options/ConnectionOptions.php new file mode 100644 index 0000000..4819f6a --- /dev/null +++ b/src/Options/ConnectionOptions.php @@ -0,0 +1,89 @@ +host = $host; + } + + /** + * Get the connection host + * + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * Set the connection port + * + * @param int $port + * @return void + */ + public function setPort($port) + { + $this->port = (int) $port; + } + + /** + * Get the connection port + * + * @return int + */ + public function getPort() + { + return $this->port; + } + + /** + * Set the connection timeout + * + * @param int $timeout + * @return void + */ + public function setTimeout($timeout) + { + $this->timeout = (int) $timeout; + } + + /** + * Get the connection timeout + * + * @return int + */ + public function getTimeout() + { + return $this->timeout; + } +} diff --git a/src/Options/JobOptions.php b/src/Options/JobOptions.php new file mode 100644 index 0000000..406a150 --- /dev/null +++ b/src/Options/JobOptions.php @@ -0,0 +1,33 @@ +{$name}; + } + + return $default; + } + + public function getPriority($default = Pheanstalk::DEFAULT_PRIORITY) + { + return $this->getOption('priority', $default); + } + + public function getDelay($default = Pheanstalk::DEFAULT_DELAY) + { + return $this->getOption('delay', $default); + } + + public function getTTR($default = Pheanstalk::DEFAULT_TTR) + { + return $this->getOption('ttr', $default); + } +} \ No newline at end of file diff --git a/src/Queue/BeanstalkdQueue.php b/src/Queue/BeanstalkdQueue.php new file mode 100644 index 0000000..3e465b2 --- /dev/null +++ b/src/Queue/BeanstalkdQueue.php @@ -0,0 +1,189 @@ +pheanstalk = $pheanstalk; + $this->tubeName = $name; + if ($options !== null) { + $this->options = clone $options; + if ($options->getTube()) { + $this->tubeName = $options->getTube(); + } + } + + parent::__construct($name, $jobPluginManager); + } + + + /** + * Get the name of the beanstalkd tube that is used for storing queue + * @return string + */ + public function getTubeName() + { + return $this->tubeName; + } + + public function bury(JobInterface $job, array $options = []): void + { + $jobId = new JobId($job->getId()); + $jobOptions = new JobOptions($options); + $this->pheanstalk->bury( + $jobId, + $jobOptions->getPriority() + ); + } + + public function recover(int $executionTime): int + { + // TODO: Implement recover() method. + throw new \Exception('Recovering is not supported yet.'); + } + + /** + * {@inheritDoc} + */ + public function kick($max) + { + $this->pheanstalk->useTube($this->getTubeName()); + + return $this->pheanstalk->kick($max); + } + + public function release(JobInterface $job, array $options = []): void + { + $jobId = new JobId($job->getId()); + $jobOptions = new JobOptions($options); + $this->pheanstalk->release( + $jobId, + $jobOptions->getPriority(), + $jobOptions->getDelay() + ); + } + + /** + * Get a job from the queue without processing it + * + * @param int $id Job identifier + * @return JobInterface + */ + public function peek(int $id): JobInterface + { + $jobId = new JobId($id); + $info = $this->pheanstalk->peek($jobId); + + $job = $this->unserializeJob($info->getData()); + $job->setId($info->getId()); + + return $job; + } + + /** + * Get a job from the queue without processing it + * + * @param JobInterface $job + * @return JobInterface + */ + public function peekJob(JobInterface $job): JobInterface + { + $jobId = new JobId($job->getId()); + $info = $this->pheanstalk->peek($jobId); + + return $this->unserializeJob($info->getData()); + } + + public function push(JobInterface $job, array $options = []): void + { + $jobOptions = new JobOptions($options); + $theJob = $this->pheanstalk->put( + $this->serializeJob($job), + $jobOptions->getPriority(), + $jobOptions->getDelay(), + $jobOptions->getTTR() + ); + + $job->setId($theJob->getId()); + } + + public function pop(array $options = []): ?JobInterface + { + // @todo: test this +// $jobOptions = new JobOptions($options); + $job = $this->pheanstalk->reserve(); + if (!$job instanceof Job) { + return null; + } + + return $this->unserializeJob($job->getData(), array('__id__' => $job->getId())); + } + + public function delete(JobInterface $job): void + { + $jobId = new JobId($job->getId()); + $this->pheanstalk->delete($jobId); + } + + public function getOptions(): BeanstalkdQueueOptions + { + return $this->options; + } + + public function getPheanstalk() + { + return $this->pheanstalk; + } +} \ No newline at end of file diff --git a/src/Queue/BeanstalkdQueueInterface.php b/src/Queue/BeanstalkdQueueInterface.php new file mode 100644 index 0000000..dae0e56 --- /dev/null +++ b/src/Queue/BeanstalkdQueueInterface.php @@ -0,0 +1,33 @@ +napDuration = $napDuration; + } + + public function getNapDuration(): int + { + return $this->napDuration; + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events, $priority = 1): void + { + $this->listeners[] = $events->attach( + AbstractWorkerEvent::EVENT_PROCESS_IDLE, + [$this, 'onIdle'], + 1 + ); + } + + public function onIdle(ProcessIdleEvent $event): void + { + $queue = $event->getQueue(); + + if ($queue instanceof BeanstalkdQueueInterface) { + sleep($this->napDuration); + } + } +} \ No newline at end of file diff --git a/src/Worker/BeanstalkdWorker.php b/src/Worker/BeanstalkdWorker.php new file mode 100644 index 0000000..dfaa566 --- /dev/null +++ b/src/Worker/BeanstalkdWorker.php @@ -0,0 +1,50 @@ +execute(); + $queue->delete($job); + + return ProcessJobEvent::JOB_STATUS_SUCCESS; + } catch (ReleasableException $exception) { + $queue->release($job, $exception->getOptions()); + + return ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE; + } catch (BuryableException $exception) { + $queue->bury($job, $exception->getOptions()); + + return ProcessJobEvent::JOB_STATUS_FAILURE; + } catch (Exception $exception) { + $queue->bury($job, [ + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString() + ]); + + return ProcessJobEvent::JOB_STATUS_FAILURE; + } + } +} \ No newline at end of file diff --git a/src/Worker/Exception/InvalidQueueException.php b/src/Worker/Exception/InvalidQueueException.php new file mode 100644 index 0000000..19fdec8 --- /dev/null +++ b/src/Worker/Exception/InvalidQueueException.php @@ -0,0 +1,11 @@ +setMetadata('foo', 'bar'); + + return 999; + } + + public function getId(): int + { + $id = parent::getId(); + if (is_int($id)) { + return $id; + } + + throw new \Exception('Unable to get job id.'); + } +} diff --git a/tests/Bootstrap.php b/tests/Bootstrap.php index 3b4dfff..0a38629 100644 --- a/tests/Bootstrap.php +++ b/tests/Bootstrap.php @@ -1,4 +1,5 @@ . + * . */ -if ( - !($loader = @include __DIR__ . '/../vendor/autoload.php') - && !($loader = @include __DIR__ . '/../../../autoload.php') -) { - throw new RuntimeException('vendor/autoload.php could not be found. Did you run `php composer.phar install`?'); +use SlmQueueBeanstalkdTest\Util\ServiceManagerFactory; + +$loader = @include __DIR__ . '/../vendor/autoload.php'; + +if (! $loader) { + $loader = @include __DIR__ . '/../../../autoload.php'; } -/* @var $loader \Composer\Autoload\ClassLoader */ -$loader->add('SlmQueueBeanstalkdTest\\', __DIR__); +if (! $loader) { + throw new RuntimeException('vendor/autoload.php could not be found. Did you run `php composer.phar install`?'); +} -if (!$config = @include __DIR__ . '/TestConfiguration.php') { +if (! $config = @include __DIR__ . '/TestConfiguration.php') { $config = require __DIR__ . '/TestConfiguration.php.dist'; } -\SlmQueueBeanstalkdTest\Util\ServiceManagerFactory::setConfig($config); +ServiceManagerFactory::setConfig($config); \ No newline at end of file diff --git a/tests/ConfigProviderTest.php b/tests/ConfigProviderTest.php new file mode 100644 index 0000000..66f3428 --- /dev/null +++ b/tests/ConfigProviderTest.php @@ -0,0 +1,29 @@ +getConfig(); + $configProvider = new ConfigProvider(); + $config = $configProvider(); + + static::assertEquals($moduleConfig['service_manager'], $config['dependencies']); + static::assertEquals($moduleConfig['slm_queue'], $config['slm_queue']); + } +} \ No newline at end of file diff --git a/tests/Controller/BeanstalkdWorkerControllerTest.php b/tests/Controller/BeanstalkdWorkerControllerTest.php new file mode 100644 index 0000000..b0a8494 --- /dev/null +++ b/tests/Controller/BeanstalkdWorkerControllerTest.php @@ -0,0 +1,76 @@ +serviceManager = ServiceManagerFactory::getServiceManager(); + + + $worker = new SimpleWorker(); + + $eventManager = $worker->getEventManager(); + (new ProcessQueueStrategy())->attach($eventManager); + (new MaxRunsStrategy(['max_runs' => 1]))->attach($eventManager); + $serviceManager = new ServiceManager(); + $config = [ + 'factories' => [ + 'knownQueue' => SimpleQueueFactory::class, + ], + ]; + + $this->queuePluginManager = new QueuePluginManager($serviceManager, $config); + $this->controller = new SimpleController($worker, $this->queuePluginManager); + } + + public function testThrowExceptionIfQueueIsUnknown(): void + { + $manager = $this->serviceManager->get('ControllerManager'); + + $controller = $this->serviceManager->get('ControllerManager')->get(BeanstalkdWorkerController::class); + $routeMatch = new RouteMatch(['queue' => 'unknownQueue']); + $controller->getEvent()->setRouteMatch($routeMatch); + + $this->expectException(ServiceNotFoundException::class); + + $controller->processAction(); + } + + public function testCorrectlyCountJobs() + { + /** @var SimpleQueue $queue */ + $queue = $this->queuePluginManager->get('knownQueue'); + $queue->push(new SimplePheanstalkJob()); + + $routeMatch = new RouteMatch(['queue' => 'knownQueue']); + $this->controller->getEvent()->setRouteMatch($routeMatch); + + $result = $this->controller->processAction(); + static::assertStringContainsString("Finished worker for queue 'knownQueue'", $result); + static::assertStringContainsString("maximum of 1 jobs processed", $result); + } +} \ No newline at end of file diff --git a/tests/Factory/BeanstalkdFactoryTest.php b/tests/Factory/BeanstalkdFactoryTest.php new file mode 100644 index 0000000..6808e57 --- /dev/null +++ b/tests/Factory/BeanstalkdFactoryTest.php @@ -0,0 +1,38 @@ +get('config'); + + $factory = new BeanstalkdQueueFactory(); + $service = $factory($sm, 'my-beanstalkd-queue'); + + static::assertEquals( + $service->getOptions()->getDeletedLifetime(), + $config['slm_queue']['queues']['my-beanstalkd-queue']['deleted_lifetime'] + ); + static::assertEquals( + $service->getOptions()->getBuriedLifetime(), + $config['slm_queue']['queues']['my-beanstalkd-queue']['buried_lifetime'] + ); + } +} \ No newline at end of file diff --git a/tests/Factory/PheanstalkFactoryTest.php b/tests/Factory/PheanstalkFactoryTest.php new file mode 100644 index 0000000..0f8044f --- /dev/null +++ b/tests/Factory/PheanstalkFactoryTest.php @@ -0,0 +1,18 @@ +getEventManager(); + (new ProcessQueueStrategy())->attach($eventManager); + (new MaxRunsStrategy(['max_runs' => 1]))->attach($eventManager); + $serviceManager = new ServiceManager(); + $config = [ + 'factories' => [ + 'knownQueue' => SimpleQueueFactory::class, + ], + ]; + + $this->queuePluginManager = new QueuePluginManager($serviceManager, $config); + $this->controller = new SimpleController($worker, $this->queuePluginManager); + + $this->serviceManager = $serviceManager; + } + + public function testCreateConnectionOptions() + { + $connectionOptions = new ConnectionOptions(); + + $this->assertEquals('0.0.0.0', $connectionOptions->getHost()); + $this->assertEquals(11300, $connectionOptions->getPort()); + $this->assertEquals(2, $connectionOptions->getTimeout()); + } +} \ No newline at end of file diff --git a/tests/Queue/BeanstalkdQueueTest.php b/tests/Queue/BeanstalkdQueueTest.php new file mode 100644 index 0000000..3685131 --- /dev/null +++ b/tests/Queue/BeanstalkdQueueTest.php @@ -0,0 +1,86 @@ +queueName = 'testQueueName'; + $this->pheanstalk = $this->getMockBuilder('Pheanstalk\Pheanstalk') + ->disableOriginalConstructor() + ->getMock(); + + $this->pluginManager = $this->getMockBuilder('SlmQueue\Job\JobPluginManager') + ->disableOriginalConstructor() + ->getMock(); + + $this->queue = new BeanstalkdQueue($this->pheanstalk, $this->queueName, $this->pluginManager); + } + + public function testTubeNameGetter() + { + $tubeName = $this->queueName; + $result = $this->queue->getTubeName(); + $this->assertEquals($result, $tubeName); + } + + public function testSuccessfulKickWithSelectedTube() + { + $maxKick = 10; + $queueName = $this->queueName; + $pheanstalk = $this->pheanstalk; + + $pheanstalk->expects($this->once()) + ->method('useTube') + ->with($this->equalTo($queueName)) + ->will($this->returnValue($pheanstalk)); + + $pheanstalk->expects($this->once()) + ->method('kick') + ->with($this->equalTo($maxKick)) + ->will($this->returnValue($maxKick)); + + $result = $this->queue->kick($maxKick); + $this->assertEquals($result, $maxKick); + } + + public function testPopPreservesMetadata() + { + $pheanstalk = $this->pheanstalk; + $queueName = $this->queueName; + $pluginManager = $this->pluginManager; + + $job = new SimplePheanstalkJob; + $job->setMetadata('foo', 'bar'); + + $pheanstalkJob = new PheanstalkJob(1, $this->queue->serializeJob($job)); + + $pheanstalk->expects($this->once()) + ->method('reserve') + ->will($this->returnValue($pheanstalkJob)); + + $pluginManager->expects($this->once()) + ->method('get') + ->with(get_class($job)) + ->will($this->returnValue($job)); + + $result = $this->queue->pop(); + + $this->assertEquals($result, $job); + $this->assertEquals('bar', $job->getMetadata('foo')); + } +} diff --git a/tests/Queue/LiveTest.php b/tests/Queue/LiveTest.php new file mode 100644 index 0000000..a3c0868 --- /dev/null +++ b/tests/Queue/LiveTest.php @@ -0,0 +1,69 @@ +queue = $service; + } + + public function testPushAndPeek() + { + $job1 = new SimplePheanstalkJob(); + $job1->setId(1); + + $job2 = new SimplePheanstalkJob(); + $job2->setId(2); + + $this->queue->push($job1); + $this->queue->push($job2); + + $info = $this->queue->peek(2); + + static::assertInstanceOf(\SlmQueue\Job\JobInterface::class, $info); + static::assertIsInt($info->getId()); + static::assertEquals(2, $info->getId()); + } + + public function testPop() + { + $job1 = new SimplePheanstalkJob(); + $this->queue->push($job1); + + $job = $this->queue->pop(); + static::assertNotNull($job); + static::assertEquals(1, $job ? $job->getId() : null); + } + + public function x_testRelease() { + $pop = $this->queue->pop(); + + $this->queue->release($pop); + } + + public function testDelete() { + $job2 = new SimplePheanstalkJob(); + $this->queue->push($job2); + + $id = $job2->getId(); + $peek = $this->queue->peek($id); + + $this->queue->delete($peek); + + $this->expectException(JobNotFoundException::class); + $this->queue->peek($id); + } +} \ No newline at end of file diff --git a/tests/Util/ServiceManagerFactory.php b/tests/Util/ServiceManagerFactory.php new file mode 100644 index 0000000..f10c0b6 --- /dev/null +++ b/tests/Util/ServiceManagerFactory.php @@ -0,0 +1,77 @@ +. + */ + +namespace SlmQueueBeanstalkdTest\Util; + +use Laminas\Mvc\Service\ServiceListenerFactory; +use Laminas\Mvc\Service\ServiceManagerConfig; +use Laminas\ServiceManager\ServiceManager; + +/** + * Utility used to retrieve a freshly bootstrapped application's service manager + * + * @license MIT + * @link http://www.Beanstalkd-project.org/ + * @author Marco Pivetta + */ +class ServiceManagerFactory +{ + /** + * @var array + */ + protected static $config; + + /** + * @param array $config + */ + public static function setConfig(array $config): void + { + static::$config = $config; + } + + /** + * Builds a new service manager + */ + public static function getServiceManager(): ServiceManager + { + $serviceManagerConfig = new ServiceManagerConfig( + isset(static::$config['service_manager']) ? static::$config['service_manager'] : [] + ); + /* + * get array for new ServiceManager + */ + $config = (method_exists($serviceManagerConfig, 'toArray') + && method_exists(ServiceManager::class, 'configure')) ? + $serviceManagerConfig->toArray() : $serviceManagerConfig; + + $serviceManager = new ServiceManager($config); + $serviceManager->setService('ApplicationConfig', static::$config); + $serviceManager->setAllowOverride(true); + $serviceManager->setFactory('ServiceListener', ServiceListenerFactory::class); + $serviceManager->setAllowOverride(false); + + /** @var $moduleManager \Laminas\ModuleManager\ModuleManager */ + $moduleManager = $serviceManager->get('ModuleManager'); + $moduleManager->loadModules(); + +// $serviceManager->setAllowOverride(true); + return $serviceManager; + } +} \ No newline at end of file diff --git a/tests/Worker/BeanstalkdWorkerTest.php b/tests/Worker/BeanstalkdWorkerTest.php new file mode 100644 index 0000000..65d46a5 --- /dev/null +++ b/tests/Worker/BeanstalkdWorkerTest.php @@ -0,0 +1,65 @@ +worker = new BeanstalkdWorker($this->getMockBuilder('Laminas\EventManager\EventManagerInterface')->getMock()); + } + + public function testReturnsUnknownIfNotABeanstalkdQueue() + { + $queue = $this->getMockBuilder('SlmQueue\Queue\QueueInterface')->getMock(); + $job = $this->getMockBuilder('SlmQueue\Job\JobInterface')->getMock(); + + $status = $this->worker->processJob($job, $queue); + + $this->assertEquals(ProcessJobEvent::JOB_STATUS_UNKNOWN, $status); + } + + public function testDeleteJobOnSuccess() + { + $queue = $this->getMockBuilder(BeanstalkdQueueInterface::class)->getMock(); + $job = $this->getMockBuilder('SlmQueue\Job\JobInterface')->getMock(); + + $job->expects($this->once()) + ->method('execute'); + + $queue->expects($this->once()) + ->method('delete') + ->with($job); + + $status = $this->worker->processJob($job, $queue); + $this->assertEquals(ProcessJobEvent::JOB_STATUS_SUCCESS, $status); + } + + public function testDoNotDeleteJobOnFailure() + { + $queue = $this->getMockBuilder(BeanstalkdQueueInterface::class)->getMock(); + $job = $this->getMockBuilder('SlmQueue\Job\JobInterface')->getMock(); + + $job->expects($this->once()) + ->method('execute') + ->will($this->throwException(new ReleasableException())); + + $queue->expects($this->never()) + ->method('delete'); + + $status = $this->worker->processJob($job, $queue); + $this->assertEquals(ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE, $status); + } +} diff --git a/tests/testing.config.php b/tests/testing.config.php index 7e75e1f..4494dcc 100644 --- a/tests/testing.config.php +++ b/tests/testing.config.php @@ -16,17 +16,51 @@ * and is licensed under the MIT license. For more information, see * . */ + +use SlmQueue\Strategy\MaxRunsStrategy; +use SlmQueueBeanstalkd\Factory\BeanstalkdQueueFactory; +use SlmQueue\Factory\WorkerFactory; +use SlmQueueBeanstalkd\Controller\BeanstalkdWorkerController; +use SlmQueueBeanstalkd\Factory\BeanstalkdWorkerControllerFactory; +use SlmQueueBeanstalkd\Strategy\ClearObjectManagerStrategy; +use SlmQueueBeanstalkd\Worker\BeanstalkdWorker; + return array( - 'slm_queue' => array( - 'worker' => array( - // Limit runs to 1 in test environment - 'max_runs' => 1 - ), + 'slm_queue' => [ + 'worker_strategies' => [ + 'default' => [ + MaxRunsStrategy::class => ['max_runs' => 1] + ] + ], + 'queues' => [ + 'my-beanstalkd-queue' => [ + 'deleted_lifetime' => -1, + 'buried_lifetime' => -1, + ], + ], + 'queue_manager' => [ + 'factories' => [ + 'mailing' => BeanstalkdQueueFactory::class + ] + ], + ], - 'queue_manager' => array( - 'factories' => array( - 'newsletter' => 'SlmQueueBeanstalkd\Factory\BeanstalkdQueueFactory' - ) + 'service_manager' => [ + 'factories' => [ + BeanstalkdWorker::class => WorkerFactory::class, + \Pheanstalk::class => \SlmQueueBeanstalkd\Factory\PheanstalkFactory::class + ] + ], + 'controllers' => [ + 'factories' => [ + BeanstalkdWorkerController::class => BeanstalkdWorkerControllerFactory::class, + ], + ], + 'beanstalkd' => array( + 'connection' => array( + 'host' => '0.0.0.0', + 'port' => 11300, + 'timeout' => 2 ) ) );