Skip to content

Commit bf094eb

Browse files
weaverryanfabpot
authored andcommitted
[Messenger] Adding failure transport support
1 parent 36273d4 commit bf094eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1751
-140
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* [BC BREAK] `SendersLocatorInterface` has an additional method:
8+
`getSenderByAlias()`.
9+
* A new `ListableReceiverInterface` was added, which a receiver
10+
can implement (when applicable) to enable listing and fetching
11+
individual messages by id (used in the new "Failed Messages" commands).
12+
* Both `SenderInterface::send()` and `ReceiverInterface::get()`
13+
should now (when applicable) add a `TransportMessageIdStamp`.
714
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
815
* Added optional `MessageCountAwareInterface` that receivers can implement
916
to give information about how many messages are waiting to be processed.
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Helper\Dumper;
16+
use Symfony\Component\Console\Style\SymfonyStyle;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
19+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
20+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
21+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
22+
23+
/**
24+
* @author Ryan Weaver <[email protected]>
25+
*
26+
* @internal
27+
* @experimental in 4.3
28+
*/
29+
abstract class AbstractFailedMessagesCommand extends Command
30+
{
31+
private $receiverName;
32+
private $receiver;
33+
34+
public function __construct(string $receiverName, ReceiverInterface $receiver)
35+
{
36+
$this->receiverName = $receiverName;
37+
$this->receiver = $receiver;
38+
39+
parent::__construct();
40+
}
41+
42+
protected function getReceiverName()
43+
{
44+
return $this->receiverName;
45+
}
46+
47+
/**
48+
* @return mixed|null
49+
*/
50+
protected function getMessageId(Envelope $envelope)
51+
{
52+
/** @var TransportMessageIdStamp $stamp */
53+
$stamp = $envelope->last(TransportMessageIdStamp::class);
54+
55+
return null !== $stamp ? $stamp->getId() : null;
56+
}
57+
58+
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
59+
{
60+
$io->title('Failed Message Details');
61+
62+
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
63+
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
64+
65+
$rows = [
66+
['Class', \get_class($envelope->getMessage())],
67+
];
68+
69+
if (null !== $id = $this->getMessageId($envelope)) {
70+
$rows[] = ['Message Id', $id];
71+
}
72+
73+
if (null === $sentToFailureTransportStamp) {
74+
$io->warning('Message does not appear to have been sent to this transport after failing');
75+
} else {
76+
$rows = array_merge($rows, [
77+
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
78+
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
79+
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
80+
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
81+
]);
82+
}
83+
84+
$io->table([], $rows);
85+
86+
if ($io->isVeryVerbose()) {
87+
$io->title('Message:');
88+
$dump = new Dumper($io);
89+
$io->writeln($dump($envelope->getMessage()));
90+
$io->title('Exception:');
91+
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
92+
} else {
93+
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
94+
}
95+
}
96+
97+
protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
98+
{
99+
if ($receiver instanceof MessageCountAwareInterface) {
100+
if (1 === $receiver->getMessageCount()) {
101+
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
102+
} else {
103+
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
104+
}
105+
}
106+
}
107+
108+
protected function getReceiver(): ReceiverInterface
109+
{
110+
return $this->receiver;
111+
}
112+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Exception\RuntimeException;
15+
use Symfony\Component\Console\Input\InputArgument;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Input\InputOption;
18+
use Symfony\Component\Console\Output\ConsoleOutputInterface;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
use Symfony\Component\Console\Style\SymfonyStyle;
21+
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
22+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
23+
24+
/**
25+
* @author Ryan Weaver <[email protected]>
26+
*
27+
* @experimental in 4.3
28+
*/
29+
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
30+
{
31+
protected static $defaultName = 'messenger:failed:remove';
32+
33+
/**
34+
* {@inheritdoc}
35+
*/
36+
protected function configure(): void
37+
{
38+
$this
39+
->setDefinition([
40+
new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
41+
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
42+
])
43+
->setDescription('Remove a message from the failure transport.')
44+
->setHelp(<<<'EOF'
45+
The <info>%command.name%</info> removes a message that is pending in the failure transport.
46+
47+
<info>php %command.full_name% {id}</info>
48+
49+
The specific id can be found via the messenger:failed:show command.
50+
EOF
51+
)
52+
;
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*/
58+
protected function execute(InputInterface $input, OutputInterface $output)
59+
{
60+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
61+
62+
$receiver = $this->getReceiver();
63+
64+
$shouldForce = $input->getOption('force');
65+
$this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
66+
}
67+
68+
private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
69+
{
70+
if (!$receiver instanceof ListableReceiverInterface) {
71+
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
72+
}
73+
74+
$envelope = $receiver->find($id);
75+
if (null === $envelope) {
76+
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
77+
}
78+
$this->displaySingleMessage($envelope, $io);
79+
80+
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
81+
$receiver->reject($envelope);
82+
83+
$io->success('Message removed.');
84+
} else {
85+
$io->note('Message not removed.');
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)