Skip to content

Commit 56edfe6

Browse files
committed
Dispatch CampaignProcessorMessage for processing
1 parent 0f45d36 commit 56edfe6

File tree

5 files changed

+65
-21
lines changed

5 files changed

+65
-21
lines changed

src/Domain/Messaging/Command/ProcessQueueCommand.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88
use Doctrine\ORM\EntityManagerInterface;
99
use PhpList\Core\Domain\Configuration\Model\ConfigOption;
1010
use PhpList\Core\Domain\Configuration\Service\Provider\ConfigProvider;
11+
use PhpList\Core\Domain\Messaging\Message\CampaignProcessorMessage;
1112
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
1213
use PhpList\Core\Domain\Messaging\Repository\MessageRepository;
1314
use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator;
14-
use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor;
1515
use Symfony\Component\Console\Attribute\AsCommand;
1616
use Symfony\Component\Console\Command\Command;
1717
use Symfony\Component\Console\Input\InputInterface;
1818
use Symfony\Component\Console\Output\OutputInterface;
1919
use Symfony\Component\Lock\LockFactory;
20+
use Symfony\Component\Messenger\MessageBusInterface;
2021
use Symfony\Contracts\Translation\TranslatorInterface;
2122
use Throwable;
2223

@@ -29,7 +30,7 @@ class ProcessQueueCommand extends Command
2930
private MessageRepository $messageRepository;
3031
private LockFactory $lockFactory;
3132
private MessageProcessingPreparator $messagePreparator;
32-
private CampaignProcessor $campaignProcessor;
33+
private MessageBusInterface $messageBus;
3334
private ConfigProvider $configProvider;
3435
private TranslatorInterface $translator;
3536
private EntityManagerInterface $entityManager;
@@ -38,7 +39,7 @@ public function __construct(
3839
MessageRepository $messageRepository,
3940
LockFactory $lockFactory,
4041
MessageProcessingPreparator $messagePreparator,
41-
CampaignProcessor $campaignProcessor,
42+
MessageBusInterface $messageBus,
4243
ConfigProvider $configProvider,
4344
TranslatorInterface $translator,
4445
EntityManagerInterface $entityManager,
@@ -47,7 +48,7 @@ public function __construct(
4748
$this->messageRepository = $messageRepository;
4849
$this->lockFactory = $lockFactory;
4950
$this->messagePreparator = $messagePreparator;
50-
$this->campaignProcessor = $campaignProcessor;
51+
$this->messageBus = $messageBus;
5152
$this->configProvider = $configProvider;
5253
$this->translator = $translator;
5354
$this->entityManager = $entityManager;
@@ -80,10 +81,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
8081
$this->entityManager->flush();
8182
} catch (Throwable $throwable) {
8283
$output->writeln($throwable->getMessage());
84+
$lock->release();
8385

8486
return Command::FAILURE;
85-
} finally {
86-
$lock->release();
8787
}
8888

8989
$campaigns = $this->messageRepository->getByStatusAndEmbargo(
@@ -93,7 +93,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
9393

9494
try {
9595
foreach ($campaigns as $campaign) {
96-
$this->campaignProcessor->process($campaign, $output);
96+
$message = new CampaignProcessorMessage(messageId: $campaign->getId());
97+
$this->messageBus->dispatch($message);
9798
}
9899
} catch (Throwable $throwable) {
99100
$output->writeln($throwable->getMessage());
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Messaging\Message;
6+
7+
class CampaignProcessorMessage
8+
{
9+
private int $messageId;
10+
11+
public function __construct(int $messageId)
12+
{
13+
$this->messageId = $messageId;
14+
}
15+
16+
public function getMessageId(): int
17+
{
18+
return $this->messageId;
19+
}
20+
}

src/Domain/Messaging/Service/Processor/CampaignProcessor.php renamed to src/Domain/Messaging/MessageHandler/CampaignProcessorMessageHandler.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,24 @@
22

33
declare(strict_types=1);
44

5-
namespace PhpList\Core\Domain\Messaging\Service\Processor;
5+
namespace PhpList\Core\Domain\Messaging\MessageHandler;
66

77
use Doctrine\ORM\EntityManagerInterface;
8+
use PhpList\Core\Domain\Messaging\Message\CampaignProcessorMessage;
89
use PhpList\Core\Domain\Messaging\Model\Message;
9-
use PhpList\Core\Domain\Messaging\Model\UserMessage;
10-
use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus;
1110
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
11+
use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus;
12+
use PhpList\Core\Domain\Messaging\Model\UserMessage;
13+
use PhpList\Core\Domain\Messaging\Repository\MessageRepository;
1214
use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository;
1315
use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler;
14-
use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer;
1516
use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter;
1617
use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator;
18+
use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer;
19+
use PhpList\Core\Domain\Subscription\Model\Subscriber;
1720
use PhpList\Core\Domain\Subscription\Service\Manager\SubscriberHistoryManager;
1821
use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider;
19-
use PhpList\Core\Domain\Subscription\Model\Subscriber;
2022
use Psr\Log\LoggerInterface;
21-
use Symfony\Component\Console\Output\OutputInterface;
2223
use Symfony\Contracts\Translation\TranslatorInterface;
2324
use Throwable;
2425

@@ -27,7 +28,7 @@
2728
* @SuppressWarnings(PHPMD.StaticAccess)
2829
* @SuppressWarnings(PHPMD.ExcessiveParameterList)
2930
*/
30-
class CampaignProcessor
31+
class CampaignProcessorMessageHandler
3132
{
3233
private RateLimitedCampaignMailer $mailer;
3334
private EntityManagerInterface $entityManager;
@@ -39,6 +40,7 @@ class CampaignProcessor
3940
private RequeueHandler $requeueHandler;
4041
private TranslatorInterface $translator;
4142
private SubscriberHistoryManager $subscriberHistoryManager;
43+
private MessageRepository $messageRepository;
4244

4345
public function __construct(
4446
RateLimitedCampaignMailer $mailer,
@@ -51,6 +53,7 @@ public function __construct(
5153
RequeueHandler $requeueHandler,
5254
TranslatorInterface $translator,
5355
SubscriberHistoryManager $subscriberHistoryManager,
56+
MessageRepository $messageRepository,
5457
) {
5558
$this->mailer = $mailer;
5659
$this->entityManager = $entityManager;
@@ -62,10 +65,21 @@ public function __construct(
6265
$this->requeueHandler = $requeueHandler;
6366
$this->translator = $translator;
6467
$this->subscriberHistoryManager = $subscriberHistoryManager;
68+
$this->messageRepository = $messageRepository;
6569
}
6670

67-
public function process(Message $campaign, ?OutputInterface $output = null): void
71+
public function __invoke(CampaignProcessorMessage $message): void
6872
{
73+
$campaign = $this->messageRepository->findByIdAndStatus($message->getMessageId(), MessageStatus::Submitted);
74+
if (!$campaign) {
75+
$this->logger->warning(
76+
$this->translator->trans('Campaign not found or not in submitted status'),
77+
['campaign_id' => $message->getMessageId()]
78+
);
79+
80+
return;
81+
}
82+
6983
$this->updateMessageStatus($campaign, MessageStatus::Prepared);
7084
$subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign);
7185

@@ -75,7 +89,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
7589
$stoppedEarly = false;
7690

7791
foreach ($subscribers as $subscriber) {
78-
if ($this->timeLimiter->shouldStop($output)) {
92+
if ($this->timeLimiter->shouldStop()) {
7993
$stoppedEarly = true;
8094
break;
8195
}
@@ -92,7 +106,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
92106
if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) {
93107
$this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress);
94108
$this->unconfirmSubscriber($subscriber);
95-
$output?->writeln($this->translator->trans('Invalid email, marking unconfirmed: %email%', [
109+
$this->logger->warning($this->translator->trans('Invalid email, marking unconfirmed: %email%', [
96110
'%email%' => $subscriber->getEmail(),
97111
]));
98112
$this->subscriberHistoryManager->addHistory(
@@ -119,13 +133,13 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
119133
'subscriber_id' => $subscriber->getId(),
120134
'campaign_id' => $campaign->getId(),
121135
]);
122-
$output?->writeln($this->translator->trans('Failed to send to: %email%', [
136+
$this->logger->warning($this->translator->trans('Failed to send to: %email%', [
123137
'%email%' => $subscriber->getEmail(),
124138
]));
125139
}
126140
}
127141

128-
if ($stoppedEarly && $this->requeueHandler->handle($campaign, $output)) {
142+
if ($stoppedEarly && $this->requeueHandler->handle($campaign)) {
129143
$this->entityManager->flush();
130144
return;
131145
}

src/Domain/Messaging/Repository/MessageRepository.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,15 @@ public function getByStatusAndEmbargo(Message\MessageStatus $status, DateTimeImm
8686
->getQuery()
8787
->getResult();
8888
}
89+
90+
public function findByIdAndStatus(int $id, Message\MessageStatus $status)
91+
{
92+
return $this->createQueryBuilder('m')
93+
->where('m.id = :id')
94+
->andWhere('m.status = :status')
95+
->setParameter('id', $id)
96+
->setParameter('status', $status->value)
97+
->getQuery()
98+
->getOneOrNullResult();
99+
}
89100
}

src/Domain/Messaging/Service/Handler/RequeueHandler.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use DateInterval;
88
use DateTime;
9-
use Doctrine\ORM\EntityManagerInterface;
109
use PhpList\Core\Domain\Messaging\Model\Message;
1110
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
1211
use Psr\Log\LoggerInterface;
@@ -17,7 +16,6 @@ class RequeueHandler
1716
{
1817
public function __construct(
1918
private readonly LoggerInterface $logger,
20-
private readonly EntityManagerInterface $entityManager,
2119
private readonly TranslatorInterface $translator,
2220
) {
2321
}

0 commit comments

Comments
 (0)