77use Doctrine \ORM \EntityManagerInterface ;
88use PhpList \Core \Domain \Messaging \Model \Message ;
99use PhpList \Core \Domain \Messaging \Service \MessageProcessingPreparator ;
10+ use PhpList \Core \Domain \Common \IspRestrictionsProvider ;
1011use PhpList \Core \Domain \Subscription \Service \Provider \SubscriberProvider ;
1112use Psr \Log \LoggerInterface ;
1213use Symfony \Component \Console \Output \OutputInterface ;
@@ -21,34 +22,78 @@ class CampaignProcessor
2122 private SubscriberProvider $ subscriberProvider ;
2223 private MessageProcessingPreparator $ messagePreparator ;
2324 private LoggerInterface $ logger ;
25+ private ?IspRestrictionsProvider $ ispRestrictionsProvider ;
26+ private ?int $ mailqueueBatchSize ;
27+ private ?int $ mailqueueBatchPeriod ;
28+ private ?int $ mailqueueThrottle ;
2429
2530 public function __construct (
2631 MailerInterface $ mailer ,
2732 EntityManagerInterface $ entityManager ,
2833 SubscriberProvider $ subscriberProvider ,
2934 MessageProcessingPreparator $ messagePreparator ,
3035 LoggerInterface $ logger ,
36+ ?IspRestrictionsProvider $ ispRestrictionsProvider = null ,
37+ ?int $ mailqueueBatchSize = null ,
38+ ?int $ mailqueueBatchPeriod = null ,
39+ ?int $ mailqueueThrottle = null ,
3140 ) {
3241 $ this ->mailer = $ mailer ;
3342 $ this ->entityManager = $ entityManager ;
3443 $ this ->subscriberProvider = $ subscriberProvider ;
3544 $ this ->messagePreparator = $ messagePreparator ;
3645 $ this ->logger = $ logger ;
46+ $ this ->ispRestrictionsProvider = $ ispRestrictionsProvider ;
47+ $ this ->mailqueueBatchSize = $ mailqueueBatchSize ;
48+ $ this ->mailqueueBatchPeriod = $ mailqueueBatchPeriod ;
49+ $ this ->mailqueueThrottle = $ mailqueueThrottle ;
3750 }
3851
3952 public function process (Message $ campaign , ?OutputInterface $ output = null ): void
4053 {
41- $ campaign ->getMetadata ()->setStatus (Message \MessageStatus::Prepared);
42- $ this ->entityManager ->flush ();
43-
54+ $ this ->updateMessageStatus ($ campaign , Message \MessageStatus::Prepared);
55+ $ ispRestrictions = $ this ->ispRestrictionsProvider ->load ();
4456 $ subscribers = $ this ->subscriberProvider ->getSubscribersForMessage ($ campaign );
4557
46- $ campaign ->getMetadata ()->setStatus (Message \MessageStatus::InProcess);
47- $ this ->entityManager ->flush ();
58+ $ cfgBatch = ($ this ->mailqueueBatchSize ?? 0 );
59+ $ ispMax = isset ($ ispRestrictions ->maxBatch ) ? (int )$ ispRestrictions ->maxBatch : null ;
60+
61+ $ cfgPeriod = ($ this ->mailqueueBatchPeriod ?? 0 );
62+ $ ispMinPeriod = ($ ispRestrictions ->minBatchPeriod ?? 0 );
63+
64+ $ cfgThrottle = ($ this ->mailqueueThrottle ?? 0 );
65+ $ ispMinThrottle = (int )($ ispRestrictions ->minThrottle ?? 0 );
66+
67+ if ($ cfgBatch <= 0 ) {
68+ $ batchSize = $ ispMax !== null ? max (0 , $ ispMax ) : 0 ;
69+ } else {
70+ $ batchSize = $ ispMax !== null ? min ($ cfgBatch , max (1 , $ ispMax )) : $ cfgBatch ;
71+ }
72+
73+ $ batchPeriod = max (0 , $ cfgPeriod , $ ispMinPeriod );
74+
75+ $ throttleSec = max (0 , $ cfgThrottle , $ ispMinThrottle );
76+
77+ $ sentInBatch = 0 ;
78+ $ batchStart = microtime (true );
79+
80+ $ this ->updateMessageStatus ($ campaign , Message \MessageStatus::InProcess);
4881
49- // phpcs:ignore Generic.Commenting.Todo
50- // @todo check $ISPrestrictions logic
5182 foreach ($ subscribers as $ subscriber ) {
83+ if ($ batchSize > 0 && $ batchPeriod > 0 && $ sentInBatch >= $ batchSize ) {
84+ $ elapsed = microtime (true ) - $ batchStart ;
85+ $ remaining = (int )ceil ($ batchPeriod - $ elapsed );
86+ if ($ remaining > 0 ) {
87+ $ output ?->writeln(sprintf (
88+ 'Batch limit reached, sleeping %ds to respect MAILQUEUE_BATCH_PERIOD ' ,
89+ $ remaining
90+ ));
91+ sleep ($ remaining );
92+ }
93+ $ batchStart = microtime (true );
94+ $ sentInBatch = 0 ;
95+ }
96+
5297 if (!filter_var ($ subscriber ->getEmail (), FILTER_VALIDATE_EMAIL )) {
5398 continue ;
5499 }
@@ -62,9 +107,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
62107
63108 try {
64109 $ this ->mailer ->send ($ email );
65-
66- // phpcs:ignore Generic.Commenting.Todo
67- // @todo log somewhere that this subscriber got email
110+ $ sentInBatch ++;
68111 } catch (Throwable $ e ) {
69112 $ this ->logger ->error ($ e ->getMessage (), [
70113 'subscriber_id ' => $ subscriber ->getId (),
@@ -73,10 +116,17 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
73116 $ output ?->writeln('Failed to send to: ' . $ subscriber ->getEmail ());
74117 }
75118
76- usleep (100000 );
119+ if ($ throttleSec > 0 ) {
120+ sleep ($ throttleSec );
121+ }
77122 }
78123
79- $ campaign ->getMetadata ()->setStatus (Message \MessageStatus::Sent);
124+ $ this ->updateMessageStatus ($ campaign , Message \MessageStatus::Sent);
125+ }
126+
127+ private function updateMessageStatus (Message $ message , Message \MessageStatus $ status ): void
128+ {
129+ $ message ->getMetadata ()->setStatus ($ status );
80130 $ this ->entityManager ->flush ();
81131 }
82132}
0 commit comments