Skip to content

Commit 84895f4

Browse files
committed
Amazon SNS transport.
0 parents  commit 84895f4

27 files changed

+1678
-0
lines changed

.gitattributes

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/Tests export-ignore
2+
.gitattributes export-ignore
3+
.gitignore export-ignore
4+
.travis.yml export-ignore
5+
phpunit.xml.dist export-ignore

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

.travis.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 10
5+
6+
language: php
7+
8+
php:
9+
- '7.1'
10+
- '7.2'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

LICENSE

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
The MIT License (MIT)
2+
Copyright (c) 2018 Max Kotliar
3+
4+
Permission is hereby granted, free of charge, to any person obtaining a copy
5+
of this software and associated documentation files (the "Software"), to deal
6+
in the Software without restriction, including without limitation the rights
7+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
copies of the Software, and to permit persons to whom the Software is furnished
9+
to do so, subject to the following conditions:
10+
11+
The above copyright notice and this permission notice shall be included in all
12+
copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
THE SOFTWARE.

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<h2 align="center">Supporting Enqueue</h2>
2+
3+
Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
4+
5+
- [Become a sponsor](https://www.patreon.com/makasim)
6+
- [Become our client](http://forma-pro.com/)
7+
8+
---
9+
10+
# Amazon SNS Transport
11+
12+
[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)
13+
[![Build Status](https://travis-ci.org/php-enqueue/sns.png?branch=master)](https://travis-ci.org/php-enqueue/sns)
14+
[![Total Downloads](https://poser.pugx.org/enqueue/sns/d/total.png)](https://packagist.org/packages/enqueue/sns)
15+
[![Latest Stable Version](https://poser.pugx.org/enqueue/sns/version.png)](https://packagist.org/packages/enqueue/sns)
16+
17+
This is an implementation of Queue Interop specification. It allows you to send and consume message using [Amazon SNS](https://aws.amazon.com/sns/) service.
18+
19+
## Resources
20+
21+
* [Site](https://enqueue.forma-pro.com/)
22+
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/sns.md)
23+
* [Questions](https://gitter.im/php-enqueue/Lobby)
24+
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
25+
26+
## License
27+
28+
It is released under the [MIT License](LICENSE).

SnsClient.php

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Sns;
6+
7+
use Aws\MultiRegionClient;
8+
use Aws\Result;
9+
use Aws\Sns\SnsClient as AwsSnsClient;
10+
11+
class SnsClient
12+
{
13+
/**
14+
* @var AwsSnsClient
15+
*/
16+
private $singleClient;
17+
18+
/**
19+
* @var MultiRegionClient
20+
*/
21+
private $multiClient;
22+
23+
/**
24+
* @var callable
25+
*/
26+
private $inputClient;
27+
28+
/**
29+
* @param AwsSnsClient|MultiRegionClient|callable $inputClient
30+
*/
31+
public function __construct($inputClient)
32+
{
33+
$this->inputClient = $inputClient;
34+
}
35+
36+
public function createTopic(array $args): Result
37+
{
38+
return $this->callApi('createTopic', $args);
39+
}
40+
41+
public function publish(array $args): Result
42+
{
43+
return $this->callApi('publish', $args);
44+
}
45+
46+
public function subscribe(array $args): Result
47+
{
48+
return $this->callApi('subscribe', $args);
49+
}
50+
51+
public function getAWSClient(): AwsSnsClient
52+
{
53+
$this->resolveClient();
54+
55+
if ($this->singleClient) {
56+
return $this->singleClient;
57+
}
58+
59+
if ($this->multiClient) {
60+
$mr = new \ReflectionMethod($this->multiClient, 'getClientFromPool');
61+
$mr->setAccessible(true);
62+
$singleClient = $mr->invoke($this->multiClient, $this->multiClient->getRegion());
63+
$mr->setAccessible(false);
64+
65+
return $singleClient;
66+
}
67+
68+
throw new \LogicException('The multi or single client must be set');
69+
}
70+
71+
private function callApi(string $name, array $args): Result
72+
{
73+
$this->resolveClient();
74+
75+
if ($this->singleClient) {
76+
if (false == empty($args['@region'])) {
77+
throw new \LogicException('Cannot send message to another region because transport is configured with single aws client');
78+
}
79+
80+
unset($args['@region']);
81+
82+
return call_user_func([$this->singleClient, $name], $args);
83+
}
84+
85+
if ($this->multiClient) {
86+
return call_user_func([$this->multiClient, $name], $args);
87+
}
88+
89+
throw new \LogicException('The multi or single client must be set');
90+
}
91+
92+
private function resolveClient(): void
93+
{
94+
if ($this->singleClient || $this->multiClient) {
95+
return;
96+
}
97+
98+
$client = $this->inputClient;
99+
if ($client instanceof MultiRegionClient) {
100+
$this->multiClient = $client;
101+
102+
return;
103+
} elseif ($client instanceof AwsSnsClient) {
104+
$this->singleClient = $client;
105+
106+
return;
107+
} elseif (is_callable($client)) {
108+
$client = call_user_func($client);
109+
if ($client instanceof MultiRegionClient) {
110+
$this->multiClient = $client;
111+
112+
return;
113+
}
114+
if ($client instanceof AwsSnsClient) {
115+
$this->singleClient = $client;
116+
117+
return;
118+
}
119+
}
120+
121+
throw new \LogicException(sprintf(
122+
'The input client must be an instance of "%s" or "%s" or a callable that returns one of those. Got "%s"',
123+
AwsSnsClient::class,
124+
MultiRegionClient::class,
125+
is_object($client) ? get_class($client) : gettype($client)
126+
));
127+
}
128+
}

SnsConnectionFactory.php

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Sns;
6+
7+
use Aws\Sdk;
8+
use Aws\Sns\SnsClient as AwsSnsClient;
9+
use Enqueue\Dsn\Dsn;
10+
use Interop\Queue\ConnectionFactory;
11+
use Interop\Queue\Context;
12+
13+
class SnsConnectionFactory implements ConnectionFactory
14+
{
15+
/**
16+
* @var array
17+
*/
18+
private $config;
19+
20+
/**
21+
* @var SnsClient
22+
*/
23+
private $client;
24+
25+
/**
26+
* $config = [
27+
* 'key' => null AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
28+
* 'secret' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
29+
* 'token' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
30+
* 'region' => null, (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions.
31+
* 'version' => '2012-11-05', (string, required) The version of the webservice to utilize
32+
* 'lazy' => true, Enable lazy connection (boolean)
33+
* 'endpoint' => null (string, default=null) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack
34+
* ].
35+
*
36+
* or
37+
*
38+
* sns:
39+
* sns::?key=aKey&secret=aSecret&token=aToken
40+
*
41+
* @param array|string|SnsClient|null $config
42+
*/
43+
public function __construct($config = 'sns:')
44+
{
45+
if ($config instanceof AwsSnsClient) {
46+
$this->client = new SnsClient($config);
47+
$this->config = ['lazy' => false] + $this->defaultConfig();
48+
49+
return;
50+
}
51+
52+
if (empty($config)) {
53+
$config = [];
54+
} elseif (is_string($config)) {
55+
$config = $this->parseDsn($config);
56+
} elseif (is_array($config)) {
57+
if (array_key_exists('dsn', $config)) {
58+
$config = array_replace_recursive($config, $this->parseDsn($config['dsn']));
59+
60+
unset($config['dsn']);
61+
}
62+
} else {
63+
throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSnsClient::class));
64+
}
65+
66+
$this->config = array_replace($this->defaultConfig(), $config);
67+
}
68+
69+
/**
70+
* @return SnsContext
71+
*/
72+
public function createContext(): Context
73+
{
74+
return new SnsContext($this->establishConnection(), $this->config);
75+
}
76+
77+
private function establishConnection(): SnsClient
78+
{
79+
if ($this->client) {
80+
return $this->client;
81+
}
82+
83+
$config = [
84+
'version' => $this->config['version'],
85+
'region' => $this->config['region'],
86+
];
87+
88+
if (isset($this->config['endpoint'])) {
89+
$config['endpoint'] = $this->config['endpoint'];
90+
}
91+
92+
if ($this->config['key'] && $this->config['secret']) {
93+
$config['credentials'] = [
94+
'key' => $this->config['key'],
95+
'secret' => $this->config['secret'],
96+
];
97+
98+
if ($this->config['token']) {
99+
$config['credentials']['token'] = $this->config['token'];
100+
}
101+
}
102+
103+
$establishConnection = function () use ($config) {
104+
return (new Sdk(['Sns' => $config]))->createMultiRegionSns();
105+
};
106+
107+
$this->client = $this->config['lazy'] ?
108+
new SnsClient($establishConnection) :
109+
new SnsClient($establishConnection())
110+
;
111+
112+
return $this->client;
113+
}
114+
115+
private function parseDsn(string $dsn): array
116+
{
117+
$dsn = Dsn::parseFirst($dsn);
118+
119+
if ('sns' !== $dsn->getSchemeProtocol()) {
120+
throw new \LogicException(sprintf(
121+
'The given scheme protocol "%s" is not supported. It must be "sns"',
122+
$dsn->getSchemeProtocol()
123+
));
124+
}
125+
126+
return array_filter(array_replace($dsn->getQuery(), [
127+
'key' => $dsn->getString('key'),
128+
'secret' => $dsn->getString('secret'),
129+
'token' => $dsn->getString('token'),
130+
'region' => $dsn->getString('region'),
131+
'version' => $dsn->getString('version'),
132+
'lazy' => $dsn->getBool('lazy'),
133+
'endpoint' => $dsn->getString('endpoint'),
134+
]), function ($value) { return null !== $value; });
135+
}
136+
137+
private function defaultConfig(): array
138+
{
139+
return [
140+
'key' => null,
141+
'secret' => null,
142+
'token' => null,
143+
'region' => null,
144+
'version' => '2010-03-31',
145+
'lazy' => true,
146+
'endpoint' => null,
147+
];
148+
}
149+
}

0 commit comments

Comments
 (0)