Skip to content

Commit fb265c8

Browse files
committed
senses
0 parents  commit fb265c8

16 files changed

+785
-0
lines changed

.gitattributes

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/Tests export-ignore
2+
.gitattributes export-ignore
3+
.gitignore export-ignore
4+
.travis.yml export-ignore
5+
phpunit.xml.dist export-ignore

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

.travis.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 10
5+
6+
language: php
7+
8+
php:
9+
- '7.1'
10+
- '7.2'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

LICENSE

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
The MIT License (MIT)
2+
Copyright (c) 2018 Max Kotliar
3+
4+
Permission is hereby granted, free of charge, to any person obtaining a copy
5+
of this software and associated documentation files (the "Software"), to deal
6+
in the Software without restriction, including without limitation the rights
7+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
copies of the Software, and to permit persons to whom the Software is furnished
9+
to do so, subject to the following conditions:
10+
11+
The above copyright notice and this permission notice shall be included in all
12+
copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
THE SOFTWARE.

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<h2 align="center">Supporting Enqueue</h2>
2+
3+
Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
4+
5+
- [Become a sponsor](https://www.patreon.com/makasim)
6+
- [Become our client](http://forma-pro.com/)
7+
8+
---
9+
10+
# Amazon SNS-SQS Transport
11+
12+
[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)
13+
[![Build Status](https://travis-ci.org/php-enqueue/snsqs.png?branch=master)](https://travis-ci.org/php-enqueue/snsqs)
14+
[![Total Downloads](https://poser.pugx.org/enqueue/snsqs/d/total.png)](https://packagist.org/packages/enqueue/snsqs)
15+
[![Latest Stable Version](https://poser.pugx.org/enqueue/snsqs/version.png)](https://packagist.org/packages/enqueue/snsqs)
16+
17+
This is an implementation of Queue Interop specification. It allows you to send and consume message using [Amazon SNS-SQS](https://aws.amazon.com/snsqs/) service.
18+
19+
## Resources
20+
21+
* [Site](https://enqueue.forma-pro.com/)
22+
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/snsqs.md)
23+
* [Questions](https://gitter.im/php-enqueue/Lobby)
24+
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
25+
26+
## License
27+
28+
It is released under the [MIT License](LICENSE).

SnsQsConnectionFactory.php

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\SnsQs;
6+
7+
use Enqueue\Dsn\Dsn;
8+
use Enqueue\Sns\SnsConnectionFactory;
9+
use Enqueue\Sqs\SqsConnectionFactory;
10+
use Interop\Queue\ConnectionFactory;
11+
use Interop\Queue\Context;
12+
13+
class SnsQsConnectionFactory implements ConnectionFactory
14+
{
15+
/**
16+
* @var string|array|null
17+
*/
18+
private $snsConfig;
19+
20+
/**
21+
* @var string|array|null
22+
*/
23+
private $sqsConfig;
24+
25+
/**
26+
* $config = [
27+
* 'key' => null AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
28+
* 'secret' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
29+
* 'token' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
30+
* 'region' => null, (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions.
31+
* 'version' => '2012-11-05', (string, required) The version of the webservice to utilize
32+
* 'lazy' => true, Enable lazy connection (boolean)
33+
* 'endpoint' => null (string, default=null) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack
34+
* ].
35+
*
36+
* or
37+
*
38+
* snsqs:
39+
* snsqs:?key=aKey&secret=aSecret&token=aToken
40+
*
41+
* @param array|string|null $config
42+
*/
43+
public function __construct($config = 'snsqs:')
44+
{
45+
if (empty($config)) {
46+
$this->snsConfig = [];
47+
$this->sqsConfig = [];
48+
} elseif (is_string($config)) {
49+
$this->parseDsn($config);
50+
} elseif (is_array($config)) {
51+
if (array_key_exists('dsn', $config)) {
52+
$this->parseDsn($config['dsn']);
53+
} else {
54+
if (array_key_exists('sns', $config)) {
55+
$this->snsConfig = $config['sns'];
56+
} else {
57+
$this->snsConfig = $config;
58+
}
59+
60+
if (array_key_exists('sqs', $config)) {
61+
$this->sqsConfig = $config['sqs'];
62+
} else {
63+
$this->sqsConfig = $config;
64+
}
65+
}
66+
} else {
67+
throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSnsClient::class));
68+
}
69+
}
70+
71+
/**
72+
* @return SnsQsContext
73+
*/
74+
public function createContext(): Context
75+
{
76+
return new SnsQsContext(function() {
77+
return (new SnsConnectionFactory($this->snsConfig))->createContext();
78+
}, function() {
79+
return (new SqsConnectionFactory($this->sqsConfig))->createContext();
80+
});
81+
}
82+
83+
private function parseDsn(string $dsn): void
84+
{
85+
$dsn = Dsn::parseFirst($dsn);
86+
87+
if ('snsqs' !== $dsn->getSchemeProtocol()) {
88+
throw new \LogicException(sprintf(
89+
'The given scheme protocol "%s" is not supported. It must be "snsqs"',
90+
$dsn->getSchemeProtocol()
91+
));
92+
}
93+
94+
$this->snsConfig = 'sns:?'.$dsn->getQueryString();
95+
$this->sqsConfig = 'sqs:?'.$dsn->getQueryString();
96+
}
97+
}

SnsQsConsumer.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\SnsQs;
6+
7+
use Enqueue\Sqs\SqsConsumer;
8+
use Enqueue\Sqs\SqsMessage;
9+
use Interop\Queue\Consumer;
10+
use Interop\Queue\Exception\InvalidMessageException;
11+
use Interop\Queue\Message;
12+
use Interop\Queue\Queue;
13+
14+
class SnsQsConsumer implements Consumer
15+
{
16+
/**
17+
* @var SnsQsContext
18+
*/
19+
private $context;
20+
21+
/**
22+
* @var SqsConsumer
23+
*/
24+
private $consumer;
25+
26+
/**
27+
* @var SnsQsQueue
28+
*/
29+
private $queue;
30+
31+
public function __construct(SnsQsContext $context, SqsConsumer $consumer, SnsQsQueue $queue)
32+
{
33+
$this->context = $context;
34+
$this->consumer = $consumer;
35+
$this->queue = $queue;
36+
}
37+
38+
public function getQueue(): Queue
39+
{
40+
return $this->queue;
41+
}
42+
43+
public function receive(int $timeout = 0): ?Message
44+
{
45+
if ($sqsMessage = $this->consumer->receive($timeout)) {
46+
return $this->convertMessage($sqsMessage);
47+
}
48+
49+
return null;
50+
}
51+
52+
public function receiveNoWait(): ?Message
53+
{
54+
if ($sqsMessage = $this->consumer->receiveNoWait()) {
55+
return $this->convertMessage($sqsMessage);
56+
}
57+
58+
return null;
59+
}
60+
61+
/**
62+
* @param SnsQsMessage $message
63+
*/
64+
public function acknowledge(Message $message): void
65+
{
66+
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
67+
68+
$this->consumer->acknowledge($message->getSqsMessage());
69+
}
70+
71+
/**
72+
* @param SnsQsMessage $message
73+
*/
74+
public function reject(Message $message, bool $requeue = false): void
75+
{
76+
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
77+
78+
$this->consumer->reject($message->getSqsMessage(), $requeue);
79+
}
80+
81+
private function convertMessage(SqsMessage $sqsMessage): SnsQsMessage
82+
{
83+
$message = $this->context->createMessage();
84+
$message->setRedelivered($sqsMessage->isRedelivered());
85+
$message->setSqsMessage($sqsMessage);
86+
87+
$data = json_decode($sqsMessage->getBody(), true);
88+
89+
if (isset($data['Message'])) {
90+
$message->setBody((string) $data['Message']);
91+
}
92+
93+
if (isset($data['MessageAttributes']['Headers'])) {
94+
$headersData = json_decode($data['MessageAttributes']['Headers']['Value'], true);
95+
96+
$message->setHeaders($headersData[0]);
97+
$message->setProperties($headersData[1]);
98+
}
99+
100+
return $message;
101+
}
102+
}

0 commit comments

Comments
 (0)