Skip to content

Commit 11efeda

Browse files
authored
feat: support native consumer with cli consumer (golang) (#79)
* feat: support native consumer with cli consumer (golang) * fix: correct yml & register command * fix: use the correct service * fix: correct negative logic
1 parent 33f9ba1 commit 11efeda

File tree

5 files changed

+80
-18
lines changed

5 files changed

+80
-18
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
namespace Markup\JobQueueBundle\Command;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
8+
use Symfony\Component\Console\Input\InputArgument;
9+
use Symfony\Component\Console\Input\InputInterface;
10+
use Symfony\Component\Console\Input\InputOption;
11+
use Symfony\Component\Console\Output\OutputInterface;
12+
13+
/**
14+
* Consumes messages from the Go consumer
15+
*/
16+
class RabbitMqConsumerCommand extends ContainerAwareCommand
17+
{
18+
const STRICT_CODE_ACK = 0;
19+
const STRICT_CODE_REJECT = 3;
20+
const STRICT_CODE_REJECT_REQUEUE = 4;
21+
const STRICT_CODE_NEG_ACK = 5;
22+
const STRICT_CODE_NEG_ACK_REQUEUE = 6;
23+
24+
protected function configure()
25+
{
26+
$this
27+
->addArgument('event', InputArgument::REQUIRED)
28+
->setName('markup:job_queue:rabbitmq_consumer')
29+
->addOption(
30+
'strict-exit-code',
31+
null,
32+
InputOption::VALUE_NONE,
33+
'If strict_exit_code is chosen then this command will return the following exit codes. 0=ACK, 3=REJECT, 4=REJECT & REQUEUE, 5=NEG ACK, 6=NEG ACK & REQUEUE'
34+
);
35+
}
36+
37+
/**
38+
* {inheritDoc}
39+
*/
40+
protected function execute(InputInterface $input, OutputInterface $output)
41+
{
42+
$data = json_decode(base64_decode($input->getArgument('event')), true);
43+
44+
$strict = $input->getOption('strict-exit-code');
45+
46+
/** @var ConsumerInterface $consumer */
47+
$consumer = $this->getContainer()->get('simple_bus.rabbit_mq_bundle_bridge.commands_consumer');
48+
49+
if (!$consumer instanceof ConsumerInterface) {
50+
return 0;
51+
}
52+
53+
$consumerReturn = $consumer->execute(new AMQPMessage($data['body'], $data['properties']));
54+
55+
// if not running in strict mode - always acknowledge the message otherwise it will requeue forever
56+
if (!$strict) {
57+
exit(self::STRICT_CODE_ACK);
58+
}
59+
60+
// If in strict mode then test the return value from the consumer and return an appropriate code
61+
if ($consumerReturn === ConsumerInterface::MSG_REJECT) {
62+
exit(self::STRICT_CODE_REJECT);
63+
}
64+
65+
exit(self::STRICT_CODE_ACK);
66+
}
67+
}

DependencyInjection/Configuration.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ public function getConfigTreeBuilder()
3030
->addDefaultsIfNotSet()
3131
->children()
3232
->integerNode('prefetch_count')
33-
->defaultValue(1)
34-
->min(1)
33+
->defaultValue(5)
34+
->min(5)
35+
->end()
36+
->scalarNode('consumer')
37+
->defaultValue('markup:job_queue:consumer')
3538
->end()
3639
->end()
3740
->end()

Resources/config/commands.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,7 @@ services:
3838
markup_job_queue.command.write_supervisord_config_file:
3939
class: Markup\JobQueueBundle\Command\WriteSupervisordConfigFileCommand
4040
tags:
41-
- { name: console.command, command: 'markup:job_queue:supervisord_config:write'' }
41+
- { name: console.command, command: 'markup:job_queue:supervisord_config:write' }
42+
Markup\JobQueueBundle\Command\RabbitMqConsumerCommand:
43+
tags:
44+
- { name: console.command, command: 'markup:job_queue:rabbitmq_consumer' }

Service/SupervisordConfigFileWriter.php

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,6 @@
77

88
class SupervisordConfigFileWriter
99
{
10-
11-
/**
12-
* Symfony command that receives message from RabbitMQ and processes it (cli consumer)
13-
*/
14-
const CLI_CONSUMPTION_COMMAND = 'markup:job_queue:consumer';
15-
16-
/**
17-
* Symfony command that receives message from RabbitMQ and processes it (php consumer)
18-
*/
19-
const PHP_CONSUMPTION_COMMAND = 'rabbitmq:consumer';
20-
2110
const MODE_PHP = 'php';
2211
const MODE_CLI = 'cli';
2312

@@ -161,7 +150,7 @@ public function getConfigForCliConsumer($uniqueEnvironment, $skipExistsChecks =
161150
'%s -e "%s/console %s --strict-exit-code --env=%s --no-debug" -c %s -V -i --strict-exit-code',
162151
$this->consumerPath,
163152
$kernelPath,
164-
self::CLI_CONSUMPTION_COMMAND,
153+
$topicConfig['consumer'],
165154
$this->kernelEnv,
166155
$cliConfigFile
167156
);
@@ -201,7 +190,7 @@ public function getConfigForPhpConsumer($uniqueEnvironment, $skipExistsChecks =
201190
$consumerCommand = sprintf(
202191
'%s/console %s -m %s %s --env=%s --no-debug',
203192
$this->kernelPath,
204-
self::PHP_CONSUMPTION_COMMAND,
193+
'rabbitmq:consumer',
205194
$topicConfig['prefetch_count'],
206195
$topic,
207196
$this->kernelEnv

Tests/Service/SupervisordConfigFileWriterTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ protected function setUp()
1818
'/etc/rabbitmq-cli-consumer/config'
1919
);
2020

21-
$topicA = ['prefetch_count' => 1];
22-
$topicB = ['prefetch_count' => 2];
21+
$topicA = ['prefetch_count' => 1, 'consumer' => 'markup:job_queue:consumer'];
22+
$topicB = ['prefetch_count' => 2, 'consumer' => 'markup:job_queue:consumer'];
2323
$this->writer->setTopicsConfiguration(['testqueuea' => $topicA, 'testqueueb' => $topicB]);
2424
}
2525

0 commit comments

Comments
 (0)