Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ SENDMAIL_DSN=sendmail://default
MAILER_DSN=native://default

MAIL_CHARSET=utf-8

MAX_ATTEMPTS_DEFAULT=3
3 changes: 2 additions & 1 deletion config/mailer.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@
'dsn' => $_ENV['MAIL_DSN']
]
],
'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8'
'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8',
'max-attempts' => $_ENV['MAX_ATTEMPTS_DEFAULT']
];
20 changes: 14 additions & 6 deletions src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ani2amigos as we have a config file to deal with the envs and load them all at once, you can move this $_ENV['MAX_ATTEMPTS_DEFAULT'] there so we can easily have access to all env setup

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Beanstalkd;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use Pheanstalk\Job as PheanstalkJob;
Expand Down Expand Up @@ -107,21 +108,22 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('BeanstalkdMailJob cannot be a new object to be acknowledged');
}

$pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName);
if ($mailJob->isCompleted()) {
$mailJob->incrementAttempt();

if($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) {

$pheanstalk->delete($mailJob->getPheanstalkJob());
return null;
}

$timestamp = $mailJob->getTimeToSend();
$delay = max(0, $timestamp - time());
// add back to the queue as it wasn't completed maybe due to some transitory error
// could also be failed.
$pheanstalk->release($mailJob->getPheanstalkJob(), Pheanstalk::DEFAULT_PRIORITY, $delay);
$pheanstalk->release($mailJob->getPheanstalkJob());

return null;
}

Expand Down Expand Up @@ -150,4 +152,10 @@ protected function createPayload(MailJobInterface $mailJob)
'message' => $mailJob->getMessage(),
]);
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
23 changes: 20 additions & 3 deletions src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Pdo;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use PDO;
Expand Down Expand Up @@ -82,7 +83,6 @@ public function dequeue()
$query->execute();
$queryResult = $query->fetch(PDO::FETCH_ASSOC);
if ($queryResult) {
//
$sqlText = 'UPDATE `%s` SET `state`=:state WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
Expand All @@ -108,13 +108,24 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('PdoMailJob cannot be a new object to be acknowledged');
}

$mailJob->incrementAttempt();
if($mailJob->getAttempt() > $config['max-attempts']){
$sqlText = 'DELETE FROM mail_queue WHERE id = :id;';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId());

return $query->execute();
}

$sqlText = 'UPDATE `%s`
SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
WHERE `id`=:id';
SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
$query = $this->getConnection()->getInstance()->prepare($sql);
Expand All @@ -123,6 +134,7 @@ public function ack(MailJobInterface $mailJob)
$query->bindValue(':state', $mailJob->getState());
$query->bindValue(':timeToSend', $mailJob->getTimeToSend());
$query->bindValue(':sentTime', $sentTime);

return $query->execute();
}

Expand All @@ -138,4 +150,9 @@ public function isEmpty()
$query->execute();
return intval($query->fetchColumn(0)) === 0;
}

protected static function getConfig()
{
return ConfigReader::get();
}
}
1 change: 1 addition & 0 deletions src/Queue/Backend/QueueStoreAdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ public function ack(MailJobInterface $mailJob);
* @return bool
*/
public function isEmpty();

}
12 changes: 11 additions & 1 deletion src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Da\Mailer\Queue\Backend\RabbitMq;

use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use PhpAmqpLib\Channel\AMQPChannel;
Expand Down Expand Up @@ -111,7 +112,10 @@ public function ack(MailJobInterface $mailJob)
{
/** @var AMQPChannel $chanel */
$chanel = $this->getConnection()->getInstance();
if ($mailJob->isCompleted()) {
$mailJob->incrementAttempt();
$config = self::getConfig();

if ($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) {
$chanel->basic_ack($mailJob->getDeliveryTag(), false);
return;
}
Expand Down Expand Up @@ -144,4 +148,10 @@ protected function createPayload(MailJobInterface $mailJob)
'delivery_tag' => null,
]);
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
16 changes: 15 additions & 1 deletion src/Queue/Backend/Redis/RedisQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Redis;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use phpseclib3\Crypt\Random;
Expand Down Expand Up @@ -95,17 +96,24 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('RedisMailJob cannot be a new object to be acknowledged');
}

$this->removeReserved($mailJob);

if (!$mailJob->isCompleted()) {
if ($mailJob->getTimeToSend() === null || $mailJob->getTimeToSend() < time()) {
$mailJob->setTimeToSend(time() + $this->expireTime);
}
$this->enqueue($mailJob);

$mailJob->incrementAttempt();
if ($mailJob->getAttempt() <= $config['max-attempts']){
$this->enqueue($mailJob);
}
}

}

/**
Expand Down Expand Up @@ -214,4 +222,10 @@ protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
46 changes: 32 additions & 14 deletions src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
namespace Da\Mailer\Queue\Backend\Sqs;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;

class SqsQueueStoreAdapter implements QueueStoreAdapterInterface
{

const DELAY_SECONDS = 10;
/**
* @var string the name of the queue to store the messages
*/
Expand Down Expand Up @@ -64,9 +67,8 @@ public function enqueue(MailJobInterface $mailJob)
{
$result = $this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'MessageBody' => $mailJob->getMessage(),
'DelaySeconds' => $mailJob->getDelaySeconds(),
'Attempt' => $mailJob->getAttempt(),
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS,
]);
$messageId = $result['MessageId'];
return $messageId !== null && is_string($messageId);
Expand All @@ -91,7 +93,6 @@ public function dequeue()
'id' => $result['MessageId'],
'receiptHandle' => $result['ReceiptHandle'],
'message' => $result['Body'],
'attempt' => $result['Attempt'],
]);
}

Expand All @@ -102,22 +103,33 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('SqsMailJob cannot be a new object to be acknowledged');
}

if ($mailJob->getDeleted()) {
$this->getConnection()->getInstance()->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
]);
return true;
} elseif ($mailJob->getVisibilityTimeout() !== null) {
$this->getConnection()->getInstance()->changeMessageVisibility([
$mailJob->incrementAttempt();

$this->getConnection()->getInstance()->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
]);

if (!$mailJob->getDeleted() && $mailJob->getAttempt() <= $config['max-attempts']) {
$this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
'VisibilityTimeout' => $mailJob->getVisibilityTimeout(),
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS,
]);

if($mailJob->getVisibilityTimeout() !== null) {
$this->getConnection()->getInstance()->changeMessageVisibility([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
'VisibilityTimeout' => $mailJob->getVisibilityTimeout(),
]);
}

return true;
}

Expand All @@ -135,4 +147,10 @@ public function isEmpty(): bool
]);
return $response['Attributes']['ApproximateNumberOfMessages'] === 0;
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
6 changes: 4 additions & 2 deletions src/Queue/Backend/Sqs/SqsQueueStoreConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public function connect()
$secret = $this->getConfigurationValue('secret');
$region = $this->getConfigurationValue('region');
$this->instance = new SqsClient([
'key' => $key,
'secret' => $secret,
'credentials' => [
'key' => $key,
'secret' => $secret,
],
'region' => $region,
]);
return $this;
Expand Down
2 changes: 1 addition & 1 deletion tests/Queue/Backend/Pdo/PdoQueueStoreAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function testAcknowledgementToUpdateMailJobs()
$dequedMailJob->markAsNew();
$this->pdoQueueStore->ack($dequedMailJob);
sleep(1);
$this->assertTrue($this->pdoQueueStore->isEmpty() === false);
$this->assertTrue($this->pdoQueueStore->isEmpty() === true);
}

public function testBadMethodCallExceptionOnAck()
Expand Down