Skip to content

Commit 6d1aa36

Browse files
committed
feature symfony#57270 [Messenger] Allow to skip message in FailedMessagesRetryCommand (Thibaut Chieux)
This PR was merged into the 7.2 branch. Discussion ---------- [Messenger] Allow to skip message in `FailedMessagesRetryCommand` | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | Fix symfony#50936 | License | MIT When retrying message with the command messenger:failed:retry, we have two options: - Retry : (It will be consumed and re-added to failed queue if it re-fail) - Delete But sometimes we have no clue if we can retry it or not but I now that we don't want to delete it yet. In that case we need to stop the command and go id by id. The interactive command is lost. The command should provide a way to skip a message to continue with the rest of the failed message. Commits ------- 7ec6914 [Messenger] Allow to skip message in FailedMessagesRetryCommand
2 parents 1b13c33 + 7ec6914 commit 6d1aa36

File tree

5 files changed

+86
-3
lines changed

5 files changed

+86
-3
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
1010
* Add `--format` option to the `messenger:stats` command
1111
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
12+
* Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it
1213

1314
7.1
1415
---

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
use Symfony\Component\Console\Style\SymfonyStyle;
2424
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2525
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
26+
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
2627
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2728
use Symfony\Component\Messenger\MessageBusInterface;
2829
use Symfony\Component\Messenger\Stamp\MessageDecodingFailedStamp;
30+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2931
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
3032
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3133
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@@ -68,8 +70,8 @@ protected function configure(): void
6870
6971
<info>php %command.full_name%</info>
7072
71-
The command will interactively ask if each message should be retried
72-
or discarded.
73+
The command will interactively ask if each message should be retried,
74+
discarded or skipped.
7375
7476
Some transports support retrying a specific message id, which comes
7577
from the <info>messenger:failed:show</info> command.
@@ -204,7 +206,8 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
204206

205207
$this->forceExit = true;
206208
try {
207-
$shouldHandle = $shouldForce || 'retry' === $io->choice('Please select an action', ['retry', 'delete'], 'retry');
209+
$choice = $io->choice('Please select an action', ['retry', 'delete', 'skip'], 'retry');
210+
$shouldHandle = $shouldForce || 'retry' === $choice;
208211
} finally {
209212
$this->forceExit = false;
210213
}
@@ -213,6 +216,10 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
213216
return;
214217
}
215218

219+
if ('skip' === $choice) {
220+
$this->eventDispatcher->dispatch(new WorkerMessageSkipEvent($envelope, $envelope->last(SentToFailureTransportStamp::class)->getOriginalReceiverName()));
221+
}
222+
216223
$messageReceivedEvent->shouldHandle(false);
217224
$receiver->reject($envelope);
218225
};
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched when a message was skip.
16+
*
17+
* The event name is the class name.
18+
*/
19+
final class WorkerMessageSkipEvent extends AbstractWorkerMessageEvent
20+
{
21+
}

src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1717
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
1819
use Symfony\Component\Messenger\Stamp\DelayStamp;
1920
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2021
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
@@ -65,10 +66,26 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
6566
$failureSender->send($envelope);
6667
}
6768

69+
public function onMessageSkip(WorkerMessageSkipEvent $event): void
70+
{
71+
if (!$this->failureSenders->has($event->getReceiverName())) {
72+
return;
73+
}
74+
75+
$failureSender = $this->failureSenders->get($event->getReceiverName());
76+
$envelope = $event->getEnvelope()->with(
77+
new SentToFailureTransportStamp($event->getReceiverName()),
78+
new DelayStamp(0),
79+
);
80+
81+
$failureSender->send($envelope);
82+
}
83+
6884
public static function getSubscribedEvents(): array
6985
{
7086
return [
7187
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
88+
WorkerMessageSkipEvent::class => ['onMessageSkip', -100],
7289
];
7390
}
7491
}

src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
2020
use Symfony\Component\Messenger\Envelope;
2121
use Symfony\Component\Messenger\MessageBusInterface;
22+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2223
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2324
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2425

@@ -223,4 +224,40 @@ public function testCompleteIdWithSpecifiedTransport()
223224

224225
$this->assertSame(['2ab50dfa1fbf', '78c2da843723'], $suggestions);
225226
}
227+
228+
public function testSkipRunWithServiceLocator()
229+
{
230+
$failureTransportName = 'failure_receiver';
231+
$originalTransportName = 'original_receiver';
232+
233+
$serviceLocator = $this->createMock(ServiceLocator::class);
234+
$receiver = $this->createMock(ListableReceiverInterface::class);
235+
236+
$dispatcher = new EventDispatcher();
237+
$bus = $this->createMock(MessageBusInterface::class);
238+
239+
$serviceLocator->method('has')->willReturn(true);
240+
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
241+
242+
$receiver->expects($this->once())->method('find')
243+
->willReturn(Envelope::wrap(new \stdClass(), [
244+
new SentToFailureTransportStamp($originalTransportName)
245+
]));
246+
247+
$receiver->expects($this->never())->method('ack');
248+
$receiver->expects($this->once())->method('reject');
249+
250+
$command = new FailedMessagesRetryCommand(
251+
$failureTransportName,
252+
$serviceLocator,
253+
$bus,
254+
$dispatcher
255+
);
256+
257+
$tester = new CommandTester($command);
258+
$tester->setInputs(['skip']);
259+
260+
$tester->execute(['id' => ['10']]);
261+
$this->assertStringContainsString('[OK]', $tester->getDisplay());
262+
}
226263
}

0 commit comments

Comments
 (0)